"""볼륨 강도 기반 레짐+F&G 오버라이드 시뮬 — 1년치. 우선순위 로직: 1순위: vol_ratio ≥ VOL_OVERRIDE_THRESH → 레짐/F&G 무관 즉시 진입 허용 2순위: F&G < FNG_MIN_ENTRY → 차단 3순위: 레짐 BEAR → 차단 4순위: 일반 vol-lead 로직 비교 구성: 1. 필터 없음 2. F&G≥41 + BEAR차단N5 (현재 전략 레짐 적용) 3. F&G≥41 + BEAR차단N5 + vol≥5x 오버라이드 (레짐+F&G 동시 오버라이드) 4. F&G≥41 + BEAR차단N5 + vol≥4x 오버라이드 5. F&G≥41 + BEAR차단N5 + vol≥3x 오버라이드 데이터: data/sim1y_cache.pkl / data/fng_1y.json """ import os as _os, sys as _sys _sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) import json import pickle from pathlib import Path import pandas as pd from dotenv import load_dotenv load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" TOP_N = 20 BUDGET = 15_000_000 MIN_BUDGET = BUDGET * 3 // 10 MAX_POS = 3 FEE = 0.0005 TIME_STOP_MIN_PCT = 3.0 ATR_MULT = 1.5 ATR_MIN = 0.010 ATR_MAX = 0.020 VOL_MULT_DEFAULT = 2.0 VOL_MULT_BULL = 1.5 QUIET_PCT = 2.0 THRESH = 4.8 LOCAL_VOL_N = 7 QUIET_N = 3 SIGNAL_TO_N = 12 ATR_N = 7 TS_N = 12 BEAR_THRESHOLD = -0.5 BULL_THRESHOLD = 1.5 REGIME_N = 5 FNG_MIN_ENTRY = 41 WF_WINDOW = 4 WF_MIN_WIN_RATE = 0.01 WF_SHADOW_WINS = 2 REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, "KRW-SOL": 0.15, "KRW-XRP": 0.15} def resample_40m(df): return (df.resample("40min") .agg({"open":"first","high":"max","low":"min", "close":"last","volume":"sum"}) .dropna(subset=["close"])) def build_regime_series(dfs40): weighted = None for ticker, w in REGIME_WEIGHTS.items(): if ticker not in dfs40: continue pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100 weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) return weighted if weighted is not None else pd.Series(dtype=float) def calc_atr(df, buy_idx): sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx] if len(sub) < 3: return ATR_MIN try: avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean() return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) except Exception: return ATR_MIN def simulate_pos(df, buy_idx, buy_price, stop_pct): peak = buy_price for i in range(buy_idx + 1, len(df)): row = df.iloc[i] if row["high"] > peak: peak = row["high"] if row["low"] <= peak * (1 - stop_pct): sp = peak * (1 - stop_pct) pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[i], pnl pnl_now = (row["close"] - buy_price) / buy_price * 100 if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT: pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[i], pnl last = df.iloc[-1]["close"] pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[-1], pnl def run_strategy(df, ticker, regime_series, fng_map, use_fng, use_regime, vol_override_thresh): """ 우선순위 구조: ① 포지션 청산 체크 ② 볼륨 스파이크 감지 → 신호 기록 (F&G/레짐 무관, 항상 실행) ③ 진입 시점에서: vol_strong(sig_vr≥thresh) → F&G+레짐 필터 전부 건너뜀 아니면 → F&G≥41 AND 레짐 BEAR 아닐 때만 진입 허용 """ trades = [] sig_i = sig_p = sig_vr = None in_pos = False buy_idx = buy_price = stop_pct = None i = max(LOCAL_VOL_N + 2, QUIET_N + 1) while i < len(df): ts = df.index[i] row = df.iloc[i] cur = row["close"] # ── ① 포지션 청산 ──────────────────────────────── if in_pos: is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct) next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker)) in_pos = False; sig_i = sig_p = sig_vr = None; i = next_i continue # 신호 타임아웃 if sig_i is not None and (i - sig_i) > SIGNAL_TO_N: sig_i = sig_p = sig_vr = None # ── ② 신호 없을 때: 축적 감지 (필터 무관, 항상) ── # F&G=14 극공포여도 vol 스파이크면 신호 기록 → ③에서 override 결정 if sig_i is None: vol_p = df.iloc[i-1]["volume"] vol_avg = df.iloc[i-1-LOCAL_VOL_N:i-1]["volume"].mean() vol_r = vol_p / vol_avg if vol_avg > 0 else 0 close_qh = df.iloc[i-QUIET_N]["close"] chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 if chg_qh < QUIET_PCT and vol_r >= VOL_MULT_DEFAULT: sig_i = i; sig_p = cur; sig_vr = vol_r i += 1 continue # 신호가 이하 하락 → 초기화 if cur < sig_p: sig_i = sig_p = sig_vr = None i += 1 continue # ── ③ 진입 체크 — vol_strong이면 필터 전부 스킵 ── vol_strong = (vol_override_thresh > 0 and sig_vr is not None and sig_vr >= vol_override_thresh) if not vol_strong: # F&G 필터 (신호 유지, 진입만 보류) if use_fng and fng_map: fv = fng_map.get(ts.strftime("%Y-%m-%d"), 50) if fv < FNG_MIN_ENTRY: i += 1; continue # 레짐 필터 (신호 유지, 진입만 보류) if use_regime and not regime_series.empty and ts in regime_series.index: v = regime_series.loc[ts] score = float(v) if not pd.isna(v) else 0.0 if score < BEAR_THRESHOLD: i += 1; continue move_pct = (cur - sig_p) / sig_p * 100 if move_pct >= THRESH: in_pos = True; buy_idx = i; buy_price = cur stop_pct = calc_atr(df, i); sig_i = sig_p = sig_vr = None i += 1 return trades def apply_wf(trades): history = []; shadow = 0; blocked = False; accepted = []; cnt = 0 for t in trades: is_win = int(t[0]) if not blocked: accepted.append(t); history.append(is_win) if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: blocked = True; shadow = 0 else: cnt += 1 if is_win: shadow += 1 if shadow >= WF_SHADOW_WINS: blocked = False; history = []; shadow = 0 else: shadow = 0 return accepted, cnt def apply_max_pos(trades): open_exits = []; accepted = []; skipped = [] for t in trades: buy_dt, sell_dt = t[2], t[3] open_exits = [s for s in open_exits if s > buy_dt] if len(open_exits) < MAX_POS: open_exits.append(sell_dt); accepted.append(t) else: skipped.append(t) return accepted, skipped def run_compound(accepted): portfolio = float(BUDGET); total_krw = 0.0; monthly = {}; trade_log = [] for is_win, pnl, buy_dt, sell_dt, ticker in accepted: pos_size = max(portfolio, MIN_BUDGET) / MAX_POS krw_profit = pos_size * pnl / 100 portfolio = max(portfolio + krw_profit, MIN_BUDGET) total_krw += krw_profit ym = buy_dt.strftime("%Y-%m") if ym not in monthly: monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0} monthly[ym]["trades"] += 1; monthly[ym]["wins"] += int(is_win) monthly[ym]["pnl_krw"] += krw_profit trade_log.append({"portfolio": portfolio}) wins = sum(1 for t in accepted if t[0]) peak = BUDGET; max_dd = 0.0 for t in trade_log: peak = max(peak, t["portfolio"]) max_dd = max(max_dd, (peak - t["portfolio"]) / peak * 100) return {"portfolio": portfolio, "total_krw": total_krw, "roi_pct": (portfolio-BUDGET)/BUDGET*100, "total": len(accepted), "wins": wins, "wr": wins/len(accepted)*100 if accepted else 0, "monthly": monthly, "max_dd": max_dd} def sim_one(dfs40, regime_series, fng_map, use_fng, use_regime, vol_override): all_trades = []; wf_total = 0 for ticker, df40 in dfs40.items(): raw = run_strategy(df40, ticker, regime_series, fng_map, use_fng, use_regime, vol_override) filtered, blocked = apply_wf(raw) wf_total += blocked; all_trades.extend(filtered) all_trades.sort(key=lambda x: x[2]) accepted, skipped = apply_max_pos(all_trades) return run_compound(accepted), wf_total, len(skipped) def print_monthly(result, label): print(f"\n ── 월별 상세: {label}") print(f" {'월':^8} │ {'거래':>4} {'승률':>5} │ {'월수익(KRW)':>13} {'누적(KRW)':>14}") cum = 0.0 for ym, m in sorted(result["monthly"].items()): wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0 cum += m["pnl_krw"] flag = " ✓" if m["pnl_krw"] > 0 else "" print(f" {ym:^8} │ {m['trades']:>4}건 {wr:>4.0f}% │" f" {m['pnl_krw']:>+13,.0f}원 {cum:>+13,.0f}원{flag}") def main(): print("캐시 로드 중...") cache = pickle.load(open(CACHE_FILE, "rb")) tickers = [t for t in list(cache["10m"].keys())[:TOP_N] if len(cache["10m"][t]) > 500] print(f" 종목: {len(tickers)}개") fng_map = {} if FNG_FILE.exists(): fng_map = json.load(open(FNG_FILE)) dates = sorted(fng_map.keys()) print(f" F&G: {dates[0]} ~ {dates[-1]} ({len(fng_map)}일)") dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers} regime_series = build_regime_series(dfs40) sample = next(iter(dfs40.values())) start_dt = sample.index[0].strftime("%Y-%m-%d") end_dt = sample.index[-1].strftime("%Y-%m-%d") # 필터 적용 일수 통계 if fng_map: period_fng = {k: v for k, v in fng_map.items() if start_dt <= k <= end_dt} fng_blocked = sum(1 for v in period_fng.values() if v < FNG_MIN_ENTRY) fng_allowed = len(period_fng) - fng_blocked print(f" F&G 차단: {fng_blocked}일 / 허용: {fng_allowed}일 (기준 ≥{FNG_MIN_ENTRY})") valid = regime_series.dropna() bear_pct = (valid < BEAR_THRESHOLD).sum() / len(valid) * 100 print(f" 레짐 BEAR: {bear_pct:.1f}%봉 (REGIME_N={REGIME_N}봉={REGIME_N*40}분)\n") # ── 시뮬 구성 ───────────────────────────────────────── CONFIGS = [ # (use_fng, use_regime, vol_override, label) (False, False, 0, "① 필터 없음"), (True, True, 0, f"② F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), (True, True, 5.0, f"③ [1순위:vol≥5x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), (True, True, 4.0, f"④ [1순위:vol≥4x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), (True, True, 3.0, f"⑤ [1순위:vol≥3x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), ] print(f"{'='*72}") print(f" vol 오버라이드 (레짐+F&G 동시) 시뮬 | 1년 | {len(tickers)}종목") print(f" 기간: {start_dt} ~ {end_dt}") print(f" 우선순위: vol≥Nx(오버라이드) > F&G필터 > 레짐필터 > vol-lead 로직") print(f"{'='*72}") print(f" {'구성':<48} {'진입':>5} {'승률':>5} {'수익률':>8} {'순수익':>12} {'낙폭':>6}") print(f" {'─'*70}") results = {} for use_fng, use_regime, vol_ov, label in CONFIGS: r, wf_b, skip = sim_one(dfs40, regime_series, fng_map, use_fng, use_regime, vol_ov) results[label] = r n = r["total"] print(f" {label:<48} {n:>5}건 {r['wr']:>4.1f}%" f" {r['roi_pct']:>+7.2f}% {r['total_krw']:>+11,.0f}원 -{r['max_dd']:.1f}%") # ── 월별 상세 ───────────────────────────────────────── print(f"\n{'='*72}") for use_fng, use_regime, vol_ov, label in CONFIGS: if label in results: print_monthly(results[label], label) # ── 비교 요약 ───────────────────────────────────────── print(f"\n{'='*72}") base_label = f"② F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}" base_r = results.get(base_label) if base_r: print(f" 오버라이드 효과 (vs {base_label}):") for _, _, vol_ov, label in CONFIGS[2:]: r = results.get(label) if r and r["total"] > 0: d_roi = r["roi_pct"] - base_r["roi_pct"] d_n = r["total"] - base_r["total"] d_wr = r["wr"] - base_r["wr"] d_dd = r["max_dd"] - base_r["max_dd"] print(f" vol≥{vol_ov:.0f}x: 수익률 {d_roi:>+.2f}%p " f"진입 {d_n:>+d}건 승률 {d_wr:>+.1f}%p 낙폭 {d_dd:>+.1f}%p") best_label = max(results, key=lambda k: results[k]["roi_pct"]) best = results[best_label] print(f"\n ★ 최고 수익률: {best_label}") print(f" 수익률 {best['roi_pct']:+.2f}% / 순수익 {best['total_krw']:+,.0f}원 " f"/ 낙폭 -{best['max_dd']:.1f}%") print(f"{'='*72}") if __name__ == "__main__": main()