"""Shadow 재활 시뮬레이션 v3 - 실제 전략 필터 포함. 전략 조건: 1. 추세: 현재가 vs 2h 전 가격 >= TREND_MIN_GAIN_PCT% 2. 모멘텀: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 * VOL_MULT 3. 15분 워치리스트: 첫 신호 후 15분 재확인 """ import os, time from datetime import datetime, timedelta from collections import defaultdict from dotenv import load_dotenv load_dotenv() import oracledb import pyupbit # ── 파라미터 ────────────────────────────────────────── STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100 TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8")) TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100 TREND_HOURS = 2 TREND_MIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # Neutral 기준 MA_PERIOD = 20 LOCAL_VOL_HOURS = 5 VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) CONFIRM_MINUTES = 15 FEE = 0.0005 REHABILITATE_WINS = 2 # ── DB 연결 ─────────────────────────────────────────── def get_conn(): return oracledb.connect( user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'), dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET') ) def load_price_history(cur, ticker, from_dt, to_dt): """price_history 전체 로드.""" cur.execute(""" SELECT price, recorded_at FROM price_history WHERE ticker = :t AND recorded_at BETWEEN :f AND :e ORDER BY recorded_at """, t=ticker, f=from_dt, e=to_dt) return cur.fetchall() # [(price, dt), ...] # ── pyupbit 과거 데이터 캐시 ─────────────────────────── _daily_cache = {} # (ticker, date_str) → df _hourly_cache = {} # (ticker, hour_str) → df def get_ma20(ticker, as_of_dt): """as_of_dt 기준 MA20 (일봉 종가).""" date_str = as_of_dt.strftime("%Y-%m-%d") key = (ticker, date_str) if key not in _daily_cache: try: df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD + 2, to=as_of_dt.strftime("%Y-%m-%d 09:00:00")) _daily_cache[key] = df time.sleep(0.1) except Exception as e: _daily_cache[key] = None df = _daily_cache[key] if df is None or len(df) < MA_PERIOD: return None return df["close"].iloc[-MA_PERIOD:].mean() def get_volume_ratio(ticker, as_of_dt): """최근 1h 거래량 / 로컬 5h 평균. (직전 완성봉 기준)""" # 시간봉은 해당 시각의 이전 7h 데이터 필요 hour_str = as_of_dt.strftime("%Y-%m-%d %H:00:00") key = (ticker, hour_str) if key not in _hourly_cache: try: df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS + 3, to=as_of_dt.strftime("%Y-%m-%d %H:%M:%S")) _hourly_cache[key] = df time.sleep(0.1) except Exception as e: _hourly_cache[key] = None df = _hourly_cache[key] if df is None or len(df) < LOCAL_VOL_HOURS + 1: return 0.0 recent_vol = df["volume"].iloc[-2] local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() if local_avg <= 0: return 0.0 return recent_vol / local_avg # ── 전략 조건 체크 ──────────────────────────────────── def check_trend(prices, idx): """현재 idx 기준 2h 전(12틱 전) 대비 +TREND_MIN_PCT% 이상.""" lookback = TREND_HOURS * 6 # 10분봉 기준 2h = 12틱 if idx < lookback: return False current = prices[idx][0] past = prices[idx - lookback][0] if past <= 0: return False gain = (current - past) / past * 100 return gain >= TREND_MIN_PCT def check_momentum(ticker, current_price, as_of_dt): """현재가 > MA20 AND 거래량 비율 >= VOL_MULT.""" ma20 = get_ma20(ticker, as_of_dt) if ma20 is None: return False if current_price <= ma20: return False vol_ratio = get_volume_ratio(ticker, as_of_dt) return vol_ratio >= VOL_MULT # ── 단일 포지션 시뮬레이션 ──────────────────────────── def simulate_position(prices, buy_idx, buy_price): """buy_idx 이후 가격으로 포지션 시뮬레이션.""" buy_dt = prices[buy_idx][1] peak = buy_price for price, ts in prices[buy_idx + 1:]: if price > peak: peak = price elapsed_h = (ts - buy_dt).total_seconds() / 3600 pnl = (price - buy_price) / buy_price drop_from_peak = (peak - price) / peak if drop_from_peak >= STOP_LOSS_PCT: net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return (net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net) if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT: net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return (net > 0, price, ts, f"타임스탑({elapsed_h:.1f}h)", net) last_price, last_ts = prices[-1] net = (last_price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return (net > 0, last_price, last_ts, "데이터종료", net) # ── Shadow → 재활 → 실제 진입 시뮬레이션 ───────────── def run_full_sim(cur, ticker, wf_trigger_dt, end_dt): """ 1. WF차단 시점부터 shadow 포지션 (전략 필터 적용) 2. REHABILITATE_WINS 연승 → WF 해제 3. 해제 이후 실제 진입 성과 """ prices = load_price_history(cur, ticker, wf_trigger_dt, end_dt) if not prices: print(f" 가격 데이터 없음") return print(f" 가격 데이터: {len(prices)}개 ({prices[0][1].strftime('%m-%d %H:%M')} ~ {prices[-1][1].strftime('%m-%d %H:%M')})") # ── Phase 1: Shadow (WF 해제까지) ──────────────── shadow_wins = 0 rehab_dt = None watchlist_dt = None # 신호 첫 발생 시각 in_position = False pos_buy_idx = None pos_buy_price = None shadow_trade_no = 0 idx = 0 while idx < len(prices) and rehab_dt is None: current_price, current_dt = prices[idx] if in_position: # 포지션 청산 체크 is_win, sell_price, sell_dt, reason, pnl = simulate_position(prices, pos_buy_idx, pos_buy_price) # 청산 시각에 해당하는 idx로 점프 sell_idx = next((i for i, (_, ts) in enumerate(prices) if ts >= sell_dt), len(prices)-1) shadow_trade_no += 1 if is_win: shadow_wins += 1 else: shadow_wins = 0 mark = "✅" if is_win else "❌" print(f" [Shadow#{shadow_trade_no}] {pos_buy_price:.4f}→{sell_price:.4f}원 " f"| {mark} {pnl:+.2f}% | {reason} | 연속승={shadow_wins}/{REHABILITATE_WINS}" f" ({sell_dt.strftime('%m-%d %H:%M')})") if shadow_wins >= REHABILITATE_WINS: rehab_dt = sell_dt idx = sell_idx break in_position = False watchlist_dt = None idx = sell_idx continue # 전략 조건 체크 trend_ok = check_trend(prices, idx) if trend_ok: mom_ok = check_momentum(ticker, current_price, current_dt) else: mom_ok = False if trend_ok and mom_ok: if watchlist_dt is None: watchlist_dt = current_dt # 첫 신호 elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60: # 15분 재확인 → shadow 진입 in_position = True pos_buy_idx = idx pos_buy_price = current_price watchlist_dt = None # 청산은 다음 루프에서 else: watchlist_dt = None # 조건 깨지면 초기화 idx += 1 if rehab_dt is None: print(f"\n ⛔ 전략 필터 적용 시 데이터 범위 내 재활 실패 (shadow_wins={shadow_wins})") return print(f"\n 🎉 WF 해제: {rehab_dt.strftime('%m-%d %H:%M')} ({shadow_trade_no}번 shadow 거래)") # ── Phase 2: 실제 진입 (재활 이후) ─────────────── print(f"\n ── 재활 후 실제 진입 ──") post_prices = [(p, dt) for p, dt in prices if dt >= rehab_dt] if not post_prices: print(" 재활 이후 데이터 없음") return real_wins = 0 real_total = 0 real_pnl_sum = 0.0 watchlist_dt = None in_position = False pos_buy_idx_g = None # global idx in post_prices pos_buy_price = None idx2 = 0 while idx2 < len(post_prices): current_price, current_dt = post_prices[idx2] if in_position: is_win, sell_price, sell_dt, reason, pnl = simulate_position(post_prices, pos_buy_idx_g, pos_buy_price) sell_idx2 = next((i for i, (_, ts) in enumerate(post_prices) if ts >= sell_dt), len(post_prices)-1) real_total += 1 if is_win: real_wins += 1 real_pnl_sum += pnl mark = "✅" if is_win else "❌" print(f" 실제#{real_total}: {pos_buy_price:.4f}→{sell_price:.4f}원 " f"| {mark} {pnl:+.2f}% | {reason} ({sell_dt.strftime('%m-%d %H:%M')})") in_position = False watchlist_dt = None idx2 = sell_idx2 continue trend_ok = check_trend(post_prices, idx2) if trend_ok: mom_ok = check_momentum(ticker, current_price, current_dt) else: mom_ok = False if trend_ok and mom_ok: if watchlist_dt is None: watchlist_dt = current_dt elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60: in_position = True pos_buy_idx_g = idx2 pos_buy_price = current_price watchlist_dt = None else: watchlist_dt = None idx2 += 1 if real_total == 0: print(" 재활 후 전략 조건 충족 진입 없음") else: wr = real_wins / real_total * 100 print(f"\n 📊 재활 후: {real_total}건 | 승률={wr:.0f}% | 누적={real_pnl_sum:+.2f}%") return {'trades': real_total, 'wins': real_wins, 'wr': wr, 'pnl': real_pnl_sum} def main(): conn = get_conn() cur = conn.cursor() # price_history 최대 시각 cur.execute("SELECT MAX(recorded_at) FROM price_history") end_dt = cur.fetchone()[0] summary = {} for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']: cur.execute(""" SELECT traded_at FROM trade_results WHERE ticker = :t ORDER BY traded_at """, t=ticker) rows = cur.fetchall() wf_dt = rows[4][0] print(f"\n{'='*62}") print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')}") print(f"{'='*62}") r = run_full_sim(cur, ticker, wf_dt, end_dt) if r: summary[ticker] = r print(f"\n{'='*62}") print("전체 요약 (전략 필터 적용)") print(f"{'='*62}") for ticker, r in summary.items(): print(f"{ticker}: {r['trades']}건 | 승률={r['wr']:.0f}% | 누적={r['pnl']:+.2f}%") conn.close() if __name__ == "__main__": main()