"""모멘텀 필터 유/무 비교 시뮬레이션. A안: 추세(2h +5%) + 15분 워치리스트 (모멘텀 없음) B안: 추세(2h +5%) + 모멘텀 + 15분 워치리스트 (현행) """ import os as _os, sys as _sys _sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) import os, time from dotenv import load_dotenv load_dotenv() import oracledb import pyupbit STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100 TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8")) TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100 TREND_MIN_PCT = 5.0 MA_PERIOD = 20 LOCAL_VOL_HOURS = 5 VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) CONFIRM_MINUTES = 15 FEE = 0.0005 _daily_cache = {} _hourly_cache = {} def get_conn(): return oracledb.connect( user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'), dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET')) def load_prices(cur, ticker, from_dt): cur.execute("""SELECT price, recorded_at FROM price_history WHERE ticker=:t AND recorded_at>=:f ORDER BY recorded_at""", t=ticker, f=from_dt) return cur.fetchall() def get_ma20(ticker, dt): key = (ticker, dt.strftime("%Y-%m-%d")) if key not in _daily_cache: try: df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD+2, to=dt.strftime("%Y-%m-%d 09:00:00")) _daily_cache[key] = df time.sleep(0.1) except: _daily_cache[key] = None df = _daily_cache[key] if df is None or len(df) < MA_PERIOD: return None return df["close"].iloc[-MA_PERIOD:].mean() def get_vol_ratio(ticker, dt): key = (ticker, dt.strftime("%Y-%m-%d %H")) if key not in _hourly_cache: try: df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS+3, to=dt.strftime("%Y-%m-%d %H:%M:%S")) _hourly_cache[key] = df time.sleep(0.1) except: _hourly_cache[key] = None df = _hourly_cache[key] if df is None or len(df) < LOCAL_VOL_HOURS+1: return 0.0 rv = df["volume"].iloc[-2] la = df["volume"].iloc[-(LOCAL_VOL_HOURS+1):-2].mean() return rv/la if la > 0 else 0.0 def check_trend(prices, idx): lb = 12 # 2h = 12 * 10min if idx < lb: return False curr, past = prices[idx][0], prices[idx-lb][0] return past > 0 and (curr-past)/past*100 >= TREND_MIN_PCT def check_momentum(ticker, price, dt): ma = get_ma20(ticker, dt) if ma is None or price <= ma: return False return get_vol_ratio(ticker, dt) >= VOL_MULT def simulate_pos(prices, buy_idx, buy_price): buy_dt = prices[buy_idx][1] peak = buy_price for price, ts in prices[buy_idx+1:]: if price > peak: peak = price elapsed_h = (ts - buy_dt).total_seconds() / 3600 pnl = (price - buy_price) / buy_price if (peak - price) / peak >= STOP_LOSS_PCT: net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT: net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return net > 0, price, ts, f"타임스탑", net lp, lt = prices[-1] net = (lp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return net > 0, lp, lt, "데이터종료", net def run_scenario(prices, ticker, use_momentum, label): wins = losses = 0 total_pnl = 0.0 watchlist_dt = None in_pos = False buy_idx = buy_price = None idx = 0 trades = [] while idx < len(prices): price, dt = prices[idx] if in_pos: is_win, sp, sdt, reason, pnl = simulate_pos(prices, buy_idx, buy_price) next_idx = next((i for i,(_, ts) in enumerate(prices) if ts > sdt), len(prices)) if is_win: wins += 1 else: losses += 1 total_pnl += pnl trades.append((is_win, buy_price, sp, pnl, dt, sdt, reason)) in_pos = False watchlist_dt = None idx = next_idx continue trend_ok = check_trend(prices, idx) mom_ok = check_momentum(ticker, price, dt) if (use_momentum and trend_ok) else True if trend_ok and mom_ok: if watchlist_dt is None: watchlist_dt = dt elif (dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60: in_pos = True buy_idx = idx buy_price = price watchlist_dt = None else: watchlist_dt = None idx += 1 total = wins + losses wr = wins/total*100 if total else 0 return {'label': label, 'total': total, 'wins': wins, 'losses': losses, 'wr': wr, 'pnl': total_pnl, 'trades': trades} def print_result(r): print(f"\n [{r['label']}]") print(f" 총 {r['total']}건 | 승률={r['wr']:.0f}% ({r['wins']}승 {r['losses']}패) | 누적={r['pnl']:+.2f}%") for i, (iw, bp, sp, pnl, bdt, sdt, reason) in enumerate(r['trades'], 1): mark = "✅" if iw else "❌" print(f" #{i}: {bp:.4f}→{sp:.4f}원 | {mark} {pnl:+.2f}% | {reason}" f" ({bdt.strftime('%m-%d %H:%M')}→{sdt.strftime('%m-%d %H:%M')})") def main(): conn = get_conn() cur = conn.cursor() cur.execute("SELECT MAX(recorded_at) FROM price_history") end_dt = cur.fetchone()[0] print("=" * 62) print("모멘텀 필터 유/무 비교 (WF차단 발동 이후 전 기간)") print("A안: 추세+워치리스트만 B안: 추세+모멘텀+워치리스트(현행)") print("=" * 62) summary = [] for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']: cur.execute("SELECT traded_at FROM trade_results WHERE ticker=:t ORDER BY traded_at", t=ticker) rows = cur.fetchall() wf_dt = rows[4][0] prices = load_prices(cur, ticker, wf_dt) print(f"\n{'─'*62}") print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')} 데이터: {len(prices)}틱") rA = run_scenario(prices, ticker, use_momentum=False, label="A: 추세+워치만") rB = run_scenario(prices, ticker, use_momentum=True, label="B: 추세+모멘텀+워치(현행)") print_result(rA) print_result(rB) summary.append((ticker, rA, rB)) print(f"\n{'='*62}") print(f"{'종목':<12} {'A안 거래':>6} {'A안 승률':>8} {'A안 PnL':>10} │ {'B안 거래':>6} {'B안 승률':>8} {'B안 PnL':>10}") print(f"{'─'*62}") for ticker, rA, rB in summary: print(f"{ticker:<12} {rA['total']:>6}건 {rA['wr']:>6.0f}% {rA['pnl']:>+9.2f}% │" f" {rB['total']:>6}건 {rB['wr']:>6.0f}% {rB['pnl']:>+9.2f}%") conn.close() if __name__ == "__main__": main()