feat: add market regime filter and compound reinvestment

- Add market_regime.py: BTC/ETH/SOL/XRP weighted 2h trend score
  Bull(≥+1.5%) / Neutral / Bear(<-1%) regime detection with 10min cache
- strategy.py: dynamic TREND/VOL thresholds based on current regime
  Bull: 3%/1.5x, Neutral: 5%/2.0x, Bear: 8%/3.5x
- price_collector.py: always include leader coins in price history
- trader.py: compound reinvestment (profit added to budget, floor at initial)
- notify.py: regime info in hourly report, P&L icons (/, 💚/🔴)
- main.py: hourly status at top-of-hour, filter positions held 1h+
- backtest.py: timestop/combo comparison modes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-01 10:14:36 +09:00
parent 035b3e2f30
commit 83a229dd26
8 changed files with 423 additions and 46 deletions

View File

@@ -70,6 +70,8 @@ COMPARE_TOP50 = "--top50-cmp" in sys.argv # 종목수 + 거래대금
COMPARE_TREND = "--trend-cmp" in sys.argv # 추세 상한선 비교 모드
COMPARE_TICKER = "--ticker-cmp" in sys.argv # 종목 승률 필터 비교 모드
COMPARE_WF = "--walkforward-cmp" in sys.argv # walk-forward 필터 비교 모드
COMPARE_TIMESTOP = "--timestop-cmp" in sys.argv # 타임스탑 조건 비교 모드
COMPARE_COMBO = "--combo-cmp" in sys.argv # 추세+거래량 조합 비교 모드
if "--10m" in sys.argv:
DEFAULT_INTERVAL = "minute10"
@@ -574,6 +576,8 @@ def simulate(
volume_mult: float = VOLUME_MULT, # 거래대금 급증 기준 배수
wf_min_wr: float = 0.0, # walk-forward: 직전 N건 승률 임계값 (0=비활성)
wf_window: int = 5, # walk-forward: 승률 계산 윈도우 크기
time_stop_h: float = TIME_STOP_H, # 타임스탑 기준 시간 (0=비활성)
time_stop_gain: float = TIME_STOP_GAIN, # 타임스탑 최소 수익률
) -> list[dict]:
"""단일 종목 전략 시뮬레이션.
@@ -582,6 +586,7 @@ def simulate(
volume_mult : 진입 조건 — 전일 거래량 > N일 평균 × volume_mult
wf_min_wr : walk-forward 필터 — 직전 wf_window건 승률이 이 값 미만이면 진입 차단
윈도우가 채워지기 전(워밍업)에는 필터 미적용
time_stop_h : 타임스탑 기준 시간 (0 이하 = 비활성)
"""
_hard_stop = hard_stop if hard_stop is not None else stop_loss
trades: list[dict] = []
@@ -614,7 +619,7 @@ def simulate(
reason = "trailing_stop"
elif _hard_stop < 999 and drop_buy >= _hard_stop:
reason = "hard_stop"
elif elapsed >= TIME_STOP_H and pnl < TIME_STOP_GAIN:
elif time_stop_h > 0 and elapsed >= time_stop_h and pnl < time_stop_gain:
reason = "time_stop"
if reason:
@@ -851,6 +856,8 @@ def run_scenario(
daily_features: Optional[dict] = None, # 미리 계산된 daily_feat {ticker: df}
wf_min_wr: float = 0.0, # walk-forward 승률 임계값
wf_window: int = 5, # walk-forward 윈도우
time_stop_h: float = TIME_STOP_H, # 타임스탑 기준 시간 (0=비활성)
time_stop_gain: float = TIME_STOP_GAIN, # 타임스탑 최소 수익률
) -> list[dict]:
"""주어진 파라미터로 전체 종목 시뮬레이션 (데이터는 외부에서 주입).
@@ -868,6 +875,8 @@ def run_scenario(
"reentry": reentry_above_sell,
"wf_min_wr": round(wf_min_wr, 3),
"wf_window": wf_window,
"time_stop_h": time_stop_h,
"time_stop_gain": round(time_stop_gain, 4),
}
if use_db_cache:
@@ -894,6 +903,8 @@ def run_scenario(
volume_mult=volume_mult,
wf_min_wr=wf_min_wr,
wf_window=wf_window,
time_stop_h=time_stop_h,
time_stop_gain=time_stop_gain,
)
all_trades.extend(trades)
@@ -1531,8 +1542,142 @@ def main_ticker_filter_cmp(interval: str = DEFAULT_INTERVAL) -> None:
print(f"{row[0]:<12} 승률 {row[1]:.0f}%")
def main_timestop_cmp(interval: str = DEFAULT_INTERVAL) -> None:
"""타임스탑 조건 비교: 트레일링 1.5%+8h(현행) vs 10%+8h vs 10%+24h vs 10%+없음."""
cfg = INTERVAL_CONFIG[interval]
label = cfg["label"]
trend = cfg["trend"]
print(f"\n{'='*60}")
print(f" 타임스탑 조건 비교 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}")
print(f" (A) 트레일링 1.5% + 타임스탑 8h ← 현재 설정")
print(f" (B) 트레일링 10% + 타임스탑 8h")
print(f" (C) 트레일링 10% + 타임스탑 24h")
print(f" (D) 트레일링 10% + 타임스탑 없음")
print(f"{'='*60}")
print("\n▶ 종목 데이터 로드 중 (DB 캐시 우선)...")
tickers = get_top_tickers(TOP_N)
print(f"{tickers}")
data = fetch_all_data(tickers, interval)
print(f" 사용 종목: {list(data.keys())}")
daily_features = {t: build_daily_features(f["daily"]) for t, f in data.items()}
common = dict(
reentry_above_sell=True, interval_cd=interval,
use_db_cache=True, daily_features=daily_features,
trend_min_gain=trend,
)
print("\n▶ A: 트레일링 1.5% + 타임스탑 8h (현재 설정) ...")
trades_a = run_scenario(data, stop_loss=0.015, time_stop_h=8, **common)
print(f" 완료 ({len(trades_a)}건)")
print("\n▶ B: 트레일링 10% + 타임스탑 8h ...")
trades_b = run_scenario(data, stop_loss=0.10, time_stop_h=8, **common)
print(f" 완료 ({len(trades_b)}건)")
print("\n▶ C: 트레일링 10% + 타임스탑 24h ...")
trades_c = run_scenario(data, stop_loss=0.10, time_stop_h=24, **common)
print(f" 완료 ({len(trades_c)}건)")
print("\n▶ D: 트레일링 10% + 타임스탑 없음 ...")
trades_d = run_scenario(data, stop_loss=0.10, time_stop_h=0, **common)
print(f" 완료 ({len(trades_d)}건)")
compare_report([
("1.5%+8h(현행)", trades_a),
("10%+8h", trades_b),
("10%+24h", trades_c),
("10%+타임없음", trades_d),
], title=f"트레일링 + 타임스탑 조합 비교 ({label})")
report(trades_a, f"A: 트레일링 1.5% + 타임스탑 8h | {label}")
report(trades_b, f"B: 트레일링 10% + 타임스탑 8h | {label}")
report(trades_c, f"C: 트레일링 10% + 타임스탑 24h | {label}")
report(trades_d, f"D: 트레일링 10% + 타임스탑 없음 | {label}")
def main_combo_cmp(interval: str = DEFAULT_INTERVAL) -> None:
"""추세 임계값 + 거래량 배수 조합 비교.
목적: 거래 빈도를 줄여 수수료 부담을 낮추는 최적 조합 탐색.
A (현행): trend=1.0%, vol=2x
B: trend=1.5%, vol=2x (추세만 강화)
C: trend=1.0%, vol=3x (거래량만 강화)
D: trend=1.5%, vol=3x (둘 다 강화)
E: trend=2.0%, vol=3x (최강 강화)
"""
cfg = INTERVAL_CONFIG[interval]
label = cfg["label"]
base_trend = cfg["trend"] # 15분봉 기준 1.0%
combos = [
("A 현행 1.0%/2x", base_trend, 2.0),
("B 추세1.5%/2x", base_trend * 1.5, 2.0),
("C 추세1.0%/3x", base_trend, 3.0),
("D 추세1.5%/3x", base_trend * 1.5, 3.0),
("E 추세2.0%/3x", base_trend * 2.0, 3.0),
]
print(f"\n{'='*64}")
print(f" 추세+거래량 조합 비교 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}")
print(f" 목적: 거래 빈도↓ → 수수료 부담↓ → 순수익↑")
print(f" FEE = 0.05% × 2 = 0.1%/건 | 현행 ~85건/월 = 월 8.5% 수수료")
print(f"{'='*64}")
print("\n▶ 종목 데이터 로드 중 (DB 캐시 우선)...")
tickers = get_top_tickers(TOP_N)
data = fetch_all_data(tickers, interval)
print(f" 사용 종목: {list(data.keys())}")
daily_features = {t: build_daily_features(f["daily"]) for t, f in data.items()}
results = []
for combo_label, trend, vol in combos:
print(f"\n{combo_label} (trend={trend*100:.1f}%, vol={vol:.0f}x) ...")
trades = run_scenario(
data, stop_loss=0.015,
trend_min_gain=trend, volume_mult=vol,
reentry_above_sell=True,
interval_cd=interval, use_db_cache=True,
daily_features=daily_features,
)
n = len(trades)
fee_pct = n * 0.1 # 총 수수료 부담 (%)
print(f" 완료 ({n}건 | 예상 수수료 {fee_pct:.1f}%)")
results.append((combo_label, trades))
compare_report(results, title=f"추세+거래량 조합 비교 ({label})")
# 수수료 분석 추가 출력
print(" [수수료 분석]")
print(f" {'조합':<20} {'거래수':>6} {'월평균':>6} {'총수수료':>8} {'수수료 전':>10} {'수수료 후':>10}")
print(f" {'-'*64}")
for (clabel, trades) in results:
if not trades:
continue
df = pd.DataFrame(trades)
n = len(df)
span_days = (df["exit"].max() - df["entry"].min()).total_seconds() / 86400
per_month = n / (span_days / 30) if span_days > 0 else 0
fee_total = n * 0.1
net_pnl = df["pnl_pct"].sum()
gross_pnl = net_pnl + fee_total # 수수료 전 추정
print(f" {clabel:<20} {n:>6}{per_month:>5.0f}/월 {fee_total:>7.1f}% "
f"{gross_pnl:>+9.1f}% {net_pnl:>+9.1f}%")
print()
for clabel, trades in results:
report(trades, clabel)
def main() -> None:
if COMPARE_TOP50:
if COMPARE_COMBO:
main_combo_cmp(DEFAULT_INTERVAL)
elif COMPARE_TIMESTOP:
main_timestop_cmp(DEFAULT_INTERVAL)
elif COMPARE_TOP50:
main_top50_cmp(DEFAULT_INTERVAL)
elif COMPARE_HARDSTOP:
main_hard_stop_cmp(DEFAULT_INTERVAL)

110
core/market_regime.py Normal file
View File

@@ -0,0 +1,110 @@
"""시장 레짐(Bull/Neutral/Bear) 판단.
BTC·ETH·SOL·XRP 가중 평균 2h 추세로 레짐을 결정하고
매수 조건 파라미터(trend_pct, vol_mult)를 동적으로 반환한다.
계산된 현재가는 price_history DB에 저장해 재활용한다.
"""
from __future__ import annotations
import logging
import time
import pyupbit
from .price_db import get_price_n_hours_ago, insert_prices
logger = logging.getLogger(__name__)
# 대장 코인 가중치
LEADERS: dict[str, float] = {
"KRW-BTC": 0.40,
"KRW-ETH": 0.30,
"KRW-SOL": 0.15,
"KRW-XRP": 0.15,
}
TREND_HOURS = 2 # 2h 추세 기준
BULL_THRESHOLD = 1.5 # score ≥ 1.5% → Bull
BEAR_THRESHOLD = -1.0 # score < -1.0% → Bear
# 레짐별 매수 조건 파라미터
REGIME_PARAMS: dict[str, dict] = {
"bull": {"trend_pct": 3.0, "vol_mult": 1.5, "emoji": "🟢"},
"neutral": {"trend_pct": 5.0, "vol_mult": 2.0, "emoji": "🟡"},
"bear": {"trend_pct": 8.0, "vol_mult": 3.5, "emoji": "🔴"},
}
# 10분 캐시 (스캔 루프마다 API 호출 방지)
_cache: dict = {}
_cache_ts: float = 0.0
_CACHE_TTL = 600
def get_regime() -> dict:
"""현재 시장 레짐 반환.
Returns:
{
'name': 'bull' | 'neutral' | 'bear',
'score': float, # 가중 평균 2h 추세(%)
'trend_pct': float, # 매수 추세 임계값
'vol_mult': float, # 거래량 배수 임계값
'emoji': str,
}
"""
global _cache, _cache_ts
if _cache and (time.time() - _cache_ts) < _CACHE_TTL:
return _cache
score = 0.0
current_prices: dict[str, float] = {}
for ticker, weight in LEADERS.items():
try:
current = pyupbit.get_current_price(ticker)
if not current:
continue
current_prices[ticker] = current
# DB에서 2h 전 가격 조회 → 없으면 API 캔들로 대체
past = get_price_n_hours_ago(ticker, TREND_HOURS)
if past is None:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=4)
if df is not None and len(df) >= 3:
past = float(df["close"].iloc[-3])
if past:
trend = (current - past) / past * 100
score += trend * weight
logger.debug(f"[레짐] {ticker} {trend:+.2f}% (기여 {trend*weight:+.3f})")
except Exception as e:
logger.warning(f"[레짐] {ticker} 오류: {e}")
# 현재가 DB 저장 (다음 레짐 계산 및 추세 판단에 재활용)
if current_prices:
try:
insert_prices(current_prices)
except Exception as e:
logger.warning(f"[레짐] 가격 저장 오류: {e}")
# 레짐 결정
if score >= BULL_THRESHOLD:
name = "bull"
elif score < BEAR_THRESHOLD:
name = "bear"
else:
name = "neutral"
params = REGIME_PARAMS[name]
result = {"name": name, "score": round(score, 3), **params}
logger.info(
f"[레짐] score={score:+.3f}% → {params['emoji']} {name.upper()} "
f"(TREND≥{params['trend_pct']}% / VOL≥{params['vol_mult']}x)"
)
_cache = result
_cache_ts = time.time()
return result

View File

@@ -28,21 +28,36 @@ def _send(text: str) -> None:
logger.error(f"Telegram 알림 실패: {e}")
def notify_buy(ticker: str, price: float, amount: float, invested_krw: int) -> None:
def notify_buy(
ticker: str, price: float, amount: float, invested_krw: int,
max_budget: int = 0, per_position: int = 0,
) -> None:
budget_line = (
f"운용예산: {max_budget:,}원 (포지션당 {per_position:,}원)\n"
if max_budget else ""
)
_send(
f"📈 <b>[매수]</b> {ticker}\n"
f"가격: {price:,.0f}\n"
f"수량: {amount}\n"
f"투자금: {invested_krw:,}"
f"투자금: {invested_krw:,}\n"
f"{budget_line}"
)
def notify_sell(ticker: str, price: float, pnl_pct: float, reason: str) -> None:
emoji = "" if pnl_pct >= 0 else "🔴"
def notify_sell(
ticker: str, price: float, pnl_pct: float, reason: str,
krw_profit: float = 0.0, fee_krw: float = 0.0,
cum_profit: float = 0.0,
) -> None:
trade_emoji = "" if pnl_pct >= 0 else ""
cum_emoji = "💚" if cum_profit >= 0 else "🔴"
_send(
f"{emoji} <b>[매도]</b> {ticker}\n"
f"{trade_emoji} <b>[매도]</b> {ticker}\n"
f"가격: {price:,.0f}\n"
f"수익률: {pnl_pct:+.1f}%\n"
f"수익률: {pnl_pct:+.2f}%\n"
f"실손익: {krw_profit:+,.0f}원 (수수료 {fee_krw:,.0f}원)\n"
f"{cum_emoji} 누적손익: {cum_profit:+,.0f}\n"
f"사유: {reason}"
)
@@ -51,19 +66,50 @@ def notify_error(message: str) -> None:
_send(f"⚠️ <b>[오류]</b>\n{message}")
def notify_status(positions: dict) -> None:
"""1시간마다 포지션 현황 요약 전송."""
def notify_status(
positions: dict,
max_budget: int = 0,
per_position: int = 0,
cum_profit: float = 0.0,
) -> None:
"""정각마다 시장 레짐 + 1시간 이상 보유 포지션 현황 전송."""
from datetime import datetime
import pyupbit
from .market_regime import get_regime
now = datetime.now().strftime("%H:%M")
cum_sign = "+" if cum_profit >= 0 else ""
if not positions:
_send(f"📊 <b>[{now} 현황]</b>\n보유 포지션 없음 — 매수 신호 대기 중")
# 시장 레짐
regime = get_regime()
regime_line = (
f"{regime['emoji']} 시장: {regime['name'].upper()} "
f"(score {regime['score']:+.2f}%) "
f"| 조건 TREND≥{regime['trend_pct']}% / VOL≥{regime['vol_mult']}x\n"
)
# 1시간 이상 보유 포지션만 필터
long_positions = {
ticker: pos for ticker, pos in positions.items()
if (datetime.now() - pos["entry_time"]).total_seconds() >= 3600
}
cum_emoji = "💚" if cum_profit >= 0 else "🔴"
budget_info = (
f"💰 운용예산: {max_budget:,}원 | 포지션당: {per_position:,}\n"
f"{cum_emoji} 누적손익: {cum_sign}{cum_profit:,.0f}\n"
if max_budget else ""
)
# 포지션 없어도 레짐 정보는 전송
header = f"📊 <b>[{now} 현황]</b>\n{regime_line}{budget_info}"
if not long_positions:
_send(header + "1h+ 보유 포지션 없음")
return
lines = [f"📊 <b>[{now} 현황]</b>"]
for ticker, pos in positions.items():
lines = [header]
for ticker, pos in long_positions.items():
current = pyupbit.get_current_price(ticker)
if not current:
continue
@@ -73,9 +119,9 @@ def notify_status(positions: dict) -> None:
elapsed = (datetime.now() - pos["entry_time"]).total_seconds() / 3600
emoji = "📈" if pnl >= 0 else "📉"
lines.append(
f"\n{emoji} <b>{ticker}</b>\n"
f"{emoji} <b>{ticker}</b>\n"
f" 현재가: {current:,.0f}\n"
f" 수익률: {pnl:+.1f}%\n"
f" 수익률: {pnl:+.2f}%\n"
f" 최고가 대비: -{drop:.1f}%\n"
f" 보유: {elapsed:.1f}h"
)

View File

@@ -9,6 +9,7 @@ import pyupbit
import requests
from .market import get_top_tickers
from .market_regime import LEADERS
from .price_db import cleanup_old_prices, insert_prices, insert_prices_with_time
logger = logging.getLogger(__name__)
@@ -27,6 +28,10 @@ def backfill_prices(hours: int = 48) -> None:
if not tickers:
logger.warning("[백필] 종목 목록 없음, 스킵")
return
# 대장 코인 항상 포함
for leader in LEADERS:
if leader not in tickers:
tickers = tickers + [leader]
count = hours + 2 # 여유 있게 요청
total_rows = 0
@@ -59,6 +64,10 @@ def run_collector(interval: int = COLLECT_INTERVAL) -> None:
tickers = get_top_tickers()
if not tickers:
continue
# 대장 코인은 top20 밖이어도 항상 포함
for leader in LEADERS:
if leader not in tickers:
tickers = tickers + [leader]
resp = requests.get(
"https://api.upbit.com/v1/ticker",
params={"markets": ",".join(tickers)},

View File

@@ -206,6 +206,15 @@ def record_trade(
)
def get_cumulative_krw_profit() -> float:
"""전체 거래 누적 KRW 손익 반환 (수수료 차감 후). 데이터 없으면 0."""
with _conn() as conn:
cur = conn.cursor()
cur.execute("SELECT SUM(krw_profit) FROM trade_results WHERE krw_profit IS NOT NULL")
row = cur.fetchone()
return float(row[0]) if row and row[0] is not None else 0.0
def load_recent_wins(ticker: str, n: int = 5) -> list[bool]:
"""직전 N건 거래의 승/패 리스트 반환 (오래된 순). 없으면 빈 리스트."""
sql = """

View File

@@ -7,22 +7,23 @@ import os
import pyupbit
from .market import get_current_price, get_ohlcv
from .market_regime import get_regime
from .price_db import get_price_n_hours_ago
logger = logging.getLogger(__name__)
# 추세 판단: 현재 기준 N시간 전 DB 가격 대비 +M% 이상이면 상승 중
TREND_HOURS = float(os.getenv("TREND_HOURS", "12"))
TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "3"))
TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # 레짐이 없을 때 기본값
# 모멘텀: MA 기간, 거래량 급증 배수
MA_PERIOD = 20
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "1.2")) # 로컬 5h 평균 대비
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) # 레짐이 없을 때 기본값
LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h)
def check_trend(ticker: str) -> bool:
"""상승 추세 조건: 현재가가 DB에 저장된 N시간 전 가격 대비 +M% 이상."""
def check_trend(ticker: str, min_gain_pct: float) -> bool:
"""상승 추세 조건: 현재가가 DB에 저장된 N시간 전 가격 대비 +min_gain_pct% 이상."""
past_price = get_price_n_hours_ago(ticker, TREND_HOURS)
if past_price is None:
logger.debug(f"[추세] {ticker} {TREND_HOURS:.0f}h 전 가격 없음 (수집 중)")
@@ -33,22 +34,22 @@ def check_trend(ticker: str) -> bool:
return False
gain_pct = (current - past_price) / past_price * 100
result = gain_pct >= TREND_MIN_GAIN_PCT
result = gain_pct >= min_gain_pct
if result:
logger.info(
f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.2f} "
f"현재={current:,.2f} (+{gain_pct:.1f}%)"
f"현재={current:,.2f} (+{gain_pct:.1f}%{min_gain_pct}%)"
)
else:
logger.debug(
f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={TREND_MIN_GAIN_PCT:+.0f}%)"
f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={min_gain_pct:+.0f}%)"
)
return result
def check_momentum(ticker: str) -> bool:
"""모멘텀 조건: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 × 1.2.
def check_momentum(ticker: str, vol_mult: float) -> bool:
"""모멘텀 조건: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 × vol_mult.
23h 평균은 낮 시간대 고거래량이 포함돼 새벽에 항상 미달하므로,
로컬 5h 평균(같은 시간대 컨텍스트)과 비교한다.
@@ -79,24 +80,28 @@ def check_momentum(ticker: str) -> bool:
recent_vol = df_hour["volume"].iloc[-2] # 직전 완성된 1h 봉
local_avg = df_hour["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 LOCAL_VOL_HOURS h 평균
vol_ok = local_avg > 0 and recent_vol >= local_avg * VOLUME_MULTIPLIER
vol_ok = local_avg > 0 and recent_vol >= local_avg * vol_mult
ratio = recent_vol / local_avg if local_avg > 0 else 0
if vol_ok:
logger.info(
f"[모멘텀↑] {ticker} 현재={current:,.0f} MA20={ma:,.0f} "
f"1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} ({ratio:.2f}x)"
f"1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} ({ratio:.2f}x{vol_mult}x)"
)
else:
logger.debug(
f"[모멘텀✗] {ticker} 1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} "
f"({ratio:.2f}x < {VOLUME_MULTIPLIER}x)"
f"({ratio:.2f}x < {vol_mult}x)"
)
return vol_ok
def should_buy(ticker: str) -> bool:
"""Strategy C: 실시간 상승 추세 AND 거래량 모멘텀 모두 충족 시 True."""
if not check_trend(ticker):
"""Strategy C + 시장 레짐: 레짐별 동적 임계값으로 추세 AND 모멘텀 판단."""
regime = get_regime()
trend_pct = regime["trend_pct"]
vol_mult = regime["vol_mult"]
if not check_trend(ticker, trend_pct):
return False
return check_momentum(ticker)
return check_momentum(ticker, vol_mult)

View File

@@ -17,15 +17,39 @@ from .price_db import (
delete_position, load_positions, upsert_position,
ensure_trade_results_table, record_trade, load_recent_wins,
ensure_sell_prices_table, upsert_sell_price, load_sell_prices,
get_cumulative_krw_profit,
)
load_dotenv()
logger = logging.getLogger(__name__)
MAX_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 총 운용 한도
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수
PER_POSITION = MAX_BUDGET // MAX_POSITIONS # 종목당 투자금
INITIAL_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 초기 원금 (고정)
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수
# 복리 적용 예산 (매도 후 재계산) — 수익 발생 시만 증가, 손실 시 원금 유지
MAX_BUDGET = INITIAL_BUDGET
PER_POSITION = INITIAL_BUDGET // MAX_POSITIONS
def _recalc_compound_budget() -> None:
"""누적 수익을 반영해 MAX_BUDGET / PER_POSITION 재계산.
수익이 발생한 만큼만 예산에 더함 (손실 시 원금 아래로 내려가지 않음).
매도 완료 후 호출.
"""
global MAX_BUDGET, PER_POSITION
try:
cum_profit = get_cumulative_krw_profit()
effective = INITIAL_BUDGET + max(int(cum_profit), 0)
MAX_BUDGET = effective
PER_POSITION = effective // MAX_POSITIONS
logger.info(
f"[복리] 누적수익={cum_profit:+,.0f}원 | "
f"운용예산={MAX_BUDGET:,}원 | 포지션당={PER_POSITION:,}"
)
except Exception as e:
logger.warning(f"[복리] 예산 재계산 실패 (이전 값 유지): {e}")
# Walk-forward 필터 설정
WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기
@@ -103,6 +127,15 @@ def get_positions() -> dict:
return _positions
def get_budget_info() -> dict:
"""현재 복리 예산 정보 반환 (main.py 등 외부에서 동적 조회용)."""
return {
"max_budget": MAX_BUDGET,
"per_position": PER_POSITION,
"initial": INITIAL_BUDGET,
}
def restore_positions() -> None:
"""시작 시 Oracle DB + Upbit 잔고를 교차 확인하여 포지션 복원.
trade_results 테이블도 이 시점에 생성 (없으면).
@@ -115,6 +148,9 @@ def restore_positions() -> None:
except Exception as e:
logger.warning(f"trade_results 테이블 생성 실패 (무시): {e}")
# 시작 시 복리 예산 복원 (이전 세션 수익 반영)
_recalc_compound_budget()
try:
ensure_sell_prices_table()
except Exception as e:
@@ -278,7 +314,8 @@ def buy(ticker: str) -> bool:
f"[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | "
f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}"
)
notify_buy(ticker, actual_price, amount, order_krw)
notify_buy(ticker, actual_price, amount, order_krw,
max_budget=MAX_BUDGET, per_position=PER_POSITION)
return True
except Exception as e:
logger.error(f"매수 예외 {ticker}: {e}")
@@ -387,7 +424,12 @@ def sell(ticker: str, reason: str = "") -> bool:
f"[매도] {ticker} @ {actual_sell_price:,.4f}원 | "
f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (수수료 {fee:,.0f}원) | 사유={reason}"
)
notify_sell(ticker, actual_sell_price, pnl, reason)
try:
cum = get_cumulative_krw_profit() + krw_profit
except Exception:
cum = 0.0
notify_sell(ticker, actual_sell_price, pnl, reason,
krw_profit=krw_profit, fee_krw=fee, cum_profit=cum)
_last_sell_prices[ticker] = actual_sell_price
try:
upsert_sell_price(ticker, actual_sell_price)
@@ -406,6 +448,8 @@ def sell(ticker: str, reason: str = "") -> bool:
delete_position(ticker)
except Exception as e:
logger.error(f"포지션 DB 삭제 실패 {ticker}: {e}")
# 복리 예산 재계산: 수익 발생분만 다음 투자에 반영
_recalc_compound_budget()
return True
except Exception as e:
logger.error(f"매도 예외 {ticker}: {e}")

29
main.py
View File

@@ -20,23 +20,32 @@ logging.basicConfig(
from core.monitor import run_monitor
from core.notify import notify_error, notify_status
from core.price_collector import backfill_prices, run_collector
from core.trader import get_positions, restore_positions
from core.price_db import get_cumulative_krw_profit
from core.trader import get_positions, get_budget_info, restore_positions
from daemon.runner import run_scanner
STATUS_INTERVAL = 3600 # 1시간마다 요약 전송
def run_status_reporter(interval: int = STATUS_INTERVAL) -> None:
"""주기적으로 포지션 현황을 Telegram으로 전송."""
def run_status_reporter() -> None:
"""매 정각마다 1시간 이상 보유 포지션 현황 전송."""
import datetime as _dt
logger = logging.getLogger("status")
logger.info(f"상태 리포터 시작 (주기={interval//60})")
time.sleep(interval) # 첫 전송은 1시간 후
logger.info("상태 리포터 시작 (매 정각 트리거)")
while True:
now = _dt.datetime.now()
# 다음 정각까지 대기
secs_to_next_hour = (60 - now.minute) * 60 - now.second
time.sleep(secs_to_next_hour)
try:
notify_status(dict(get_positions()))
budget = get_budget_info()
cum = get_cumulative_krw_profit()
notify_status(
dict(get_positions()),
max_budget=budget["max_budget"],
per_position=budget["per_position"],
cum_profit=cum,
)
except Exception as e:
logger.error(f"상태 리포트 오류: {e}")
time.sleep(interval)
def main() -> None:
@@ -55,7 +64,7 @@ def main() -> None:
)
monitor_thread.start()
# 1시간 주기 상태 리포트 스레드
# 매 정각 상태 리포트 스레드 (1시간 이상 보유 포지션만)
status_thread = threading.Thread(
target=run_status_reporter, daemon=True, name="status"
)