"""10분봉 vol 감지 vs 40분봉 vol 감지 비교 시뮬레이션. 40분봉 집계 시 10분봉 spike가 희석되는 문제를 해결하기 위해 신호 감지를 10분봉 기준으로 실행하고 40분봉 전략과 노이즈/수익 비교. 비교 모드 (각 필터 조합 × 2개 봉 단위): 10분봉 detection: A. 10m 필터없음 B. 10m F&G≥41 + BEAR차단N5 C. 10m vol≥5x 오버라이드 (F&G+레짐 무시) 40분봉 detection (기준선): D. 40m 필터없음 E. 40m F&G≥41 + BEAR차단N5 F. 40m vol≥5x 오버라이드 데이터: data/sim1y_cache.pkl (10분봉 1년치) """ import os as _os, sys as _sys _sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) import json import pickle from pathlib import Path import pandas as pd from dotenv import load_dotenv load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" TOP_N = 20 BUDGET = 15_000_000 MIN_BUDGET = BUDGET * 3 // 10 MAX_POS = 3 FEE = 0.0005 ATR_MULT = 1.5 ATR_MIN = 0.010 ATR_MAX = 0.020 THRESH = 4.8 QUIET_PCT = 2.0 BEAR_THRESHOLD = -0.5 BULL_THRESHOLD = 1.5 FNG_MIN_ENTRY = 41 # ── 40분봉 파라미터 ──────────────────────────────── P40 = dict( local_vol_n = 7, quiet_n = 3, signal_to_n = 12, atr_n = 7, ts_n = 12, time_stop_pct = 3.0, vol_mult = 2.0, ) # ── 10분봉 파라미터 (벽시계 기준 동등) ─────────────── # LOCAL_VOL_N: 40m×7=280min → 10min×28 # QUIET_N: 40m×3=120min → 10min×12 # SIGNAL_TO_N: 40m×12=480min → 10min×48 # ATR_N: 40m×7=280min → 10min×28 # TS_N: 40m×12=480min → 10min×48 P10 = dict( local_vol_n = 28, quiet_n = 12, signal_to_n = 48, atr_n = 28, ts_n = 48, time_stop_pct = 3.0, vol_mult = 2.0, ) REGIME_N = 5 # 40분봉 기준 REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, "KRW-SOL": 0.15, "KRW-XRP": 0.15} WF_WINDOW = 4 WF_MIN_WIN_RATE = 0.01 WF_SHADOW_WINS = 2 VOL_OVERRIDE_THRESH = 5.0 # ───────────────────────────────────────────────────────────────────────────── def resample_40m(df): return (df.resample("40min") .agg({"open":"first","high":"max","low":"min", "close":"last","volume":"sum"}) .dropna(subset=["close"])) def build_regime_series(dfs40): weighted = None for ticker, w in REGIME_WEIGHTS.items(): if ticker not in dfs40: continue pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100 weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) return weighted if weighted is not None else pd.Series(dtype=float) def regime_to_10m(regime_40m: pd.Series, df_10m: pd.DataFrame) -> pd.Series: """40분봉 레짐 시리즈를 10분봉 인덱스에 ffill 매핑.""" combined = regime_40m.reindex( regime_40m.index.union(df_10m.index) ).ffill() return combined.reindex(df_10m.index) def calc_atr(df, buy_idx, atr_n): sub = df.iloc[max(0, buy_idx - atr_n - 1):buy_idx] if len(sub) < 3: return ATR_MIN try: avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-atr_n:].mean() return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) except Exception: return ATR_MIN def simulate_pos(df, buy_idx, buy_price, stop_pct, ts_n, time_stop_pct): peak = buy_price for i in range(buy_idx + 1, len(df)): row = df.iloc[i] if row["high"] > peak: peak = row["high"] if row["low"] <= peak * (1 - stop_pct): sp = peak * (1 - stop_pct) pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[i], pnl pnl_now = (row["close"] - buy_price) / buy_price * 100 if (i - buy_idx) >= ts_n and pnl_now < time_stop_pct: pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[i], pnl last = df.iloc[-1]["close"] pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[-1], pnl def run_strategy(df, ticker, regime_series, fng_map, p, use_fng, use_regime, vol_override_thresh): """ 공통 전략 함수. df = 봉 단위 OHLCV (10분봉 또는 40분봉). regime_series: df 인덱스와 정렬된 레짐 시리즈. 우선순위: ① 포지션 청산 ② 축적 신호 감지 (필터 무관, 항상 실행) ③ 진입: vol_strong → 모든 필터 skip; 아니면 F&G+레짐 체크 """ local_vol_n = p["local_vol_n"] quiet_n = p["quiet_n"] signal_to_n = p["signal_to_n"] atr_n = p["atr_n"] ts_n = p["ts_n"] time_stop_pct = p["time_stop_pct"] vol_mult = p["vol_mult"] trades = [] sig_i = sig_p = sig_vr = None in_pos = False buy_idx = buy_price = stop_pct = None i = max(local_vol_n + 2, quiet_n + 1) while i < len(df): ts = df.index[i] row = df.iloc[i] cur = row["close"] # ── ① 포지션 청산 ───────────────────────────────── if in_pos: is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, ts_n, time_stop_pct) next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker)) in_pos = False; sig_i = sig_p = sig_vr = None; i = next_i continue # 신호 만료 체크 if sig_i is not None and (i - sig_i) > signal_to_n: sig_i = sig_p = sig_vr = None # ── ② 축적 신호 감지 (항상 실행) ────────────────────── if sig_i is None: vol_p = df.iloc[i-1]["volume"] vol_avg = df.iloc[i-1-local_vol_n:i-1]["volume"].mean() vol_r = vol_p / vol_avg if vol_avg > 0 else 0 close_qh = df.iloc[i-quiet_n]["close"] chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 if chg_qh < QUIET_PCT and vol_r >= vol_mult: sig_i = i; sig_p = cur; sig_vr = vol_r i += 1 continue # 신호 이후 가격 하락 → 초기화 if cur < sig_p: sig_i = sig_p = sig_vr = None i += 1 continue # ── ③ 진입 체크 ───────────────────────────────────── vol_strong = (vol_override_thresh > 0 and sig_vr is not None and sig_vr >= vol_override_thresh) if not vol_strong: # F&G 필터 if use_fng and fng_map: fv = fng_map.get(ts.strftime("%Y-%m-%d"), 50) if fv < FNG_MIN_ENTRY: i += 1 continue # 레짐 BEAR 차단 if use_regime and not regime_series.empty and ts in regime_series.index: v = regime_series.loc[ts] if not pd.isna(v) and float(v) < BEAR_THRESHOLD: i += 1 continue move_pct = (cur - sig_p) / sig_p * 100 if move_pct >= THRESH: in_pos = True; buy_idx = i; buy_price = cur stop_pct = calc_atr(df, i, atr_n) sig_i = sig_p = sig_vr = None i += 1 return trades def apply_wf(trades): history = []; shadow = 0; blocked = False; accepted = []; cnt = 0 for t in trades: is_win = int(t[0]) if not blocked: accepted.append(t); history.append(is_win) if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: blocked = True; shadow = 0 else: cnt += 1 if is_win: shadow += 1 if shadow >= WF_SHADOW_WINS: blocked = False; history = []; shadow = 0 else: shadow = 0 return accepted, cnt def apply_max_pos(trades): open_exits = []; accepted = []; skipped = [] for t in trades: buy_dt, sell_dt = t[2], t[3] open_exits = [s for s in open_exits if s > buy_dt] if len(open_exits) < MAX_POS: open_exits.append(sell_dt); accepted.append(t) else: skipped.append(t) return accepted, skipped def run_compound(accepted): portfolio = float(BUDGET); total_krw = 0.0 wins = 0; peak = BUDGET; max_dd = 0.0; pf = float(BUDGET) for is_win, pnl, buy_dt, sell_dt, ticker in accepted: pos_size = max(portfolio, MIN_BUDGET) / MAX_POS krw_profit = pos_size * pnl / 100 portfolio = max(portfolio + krw_profit, MIN_BUDGET) total_krw += krw_profit wins += int(is_win) pf = max(pf + max(pf, MIN_BUDGET) / MAX_POS * pnl / 100, MIN_BUDGET) peak = max(peak, pf) max_dd = max(max_dd, (peak - pf) / peak * 100) return { "portfolio": portfolio, "total_krw": total_krw, "roi_pct": (portfolio - BUDGET) / BUDGET * 100, "total": len(accepted), "wins": wins, "wr": wins / len(accepted) * 100 if accepted else 0, "max_dd": max_dd, } def sim_mode(dfs_per_ticker, regime_map, fng_map, p, use_fng, use_regime, vol_override_thresh): """ dfs_per_ticker: {ticker: DataFrame (10m 또는 40m)} regime_map: {ticker: regime Series (같은 인덱스로 정렬됨)} """ all_trades = []; wf_total = 0 for ticker, df in dfs_per_ticker.items(): rs = regime_map.get(ticker, pd.Series(dtype=float)) raw = run_strategy(df, ticker, rs, fng_map, p, use_fng, use_regime, vol_override_thresh) filtered, blocked = apply_wf(raw) wf_total += blocked all_trades.extend(filtered) all_trades.sort(key=lambda x: x[2]) accepted, skipped = apply_max_pos(all_trades) return run_compound(accepted), wf_total, len(skipped) def fmt(r, wf, skip): if r["total"] == 0: return "진입없음" return (f"{r['total']:>5}건 {r['wr']:>4.1f}% " f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 " f"-{r['max_dd']:>4.1f}% wf:{wf} skip:{skip}") def main(): print("캐시 로드 중...") cache = pickle.load(open(CACHE_FILE, "rb")) fng_map = json.loads(FNG_FILE.read_text()) tickers = [t for t in list(cache["10m"].keys())[:TOP_N] if len(cache["10m"][t]) > 500] print(f" 종목: {len(tickers)}개") dfs10 = {t: cache["10m"][t] for t in tickers} dfs40 = {t: resample_40m(df) for t, df in dfs10.items()} # 레짐 (40분봉 기반) regime_40m = build_regime_series(dfs40) # 40분봉용 regime map regime_map_40 = {t: regime_40m for t in tickers} # 10분봉용 regime map (ffill 매핑) regime_map_10 = { t: regime_to_10m(regime_40m, dfs10[t]) for t in tickers } sample = next(iter(dfs10.values())) start_dt = sample.index[0].strftime("%Y-%m-%d") end_dt = sample.index[-1].strftime("%Y-%m-%d") print(f"\n{'='*80}") print(f" 10분봉 vs 40분봉 vol 감지 비교 | {start_dt} ~ {end_dt} | {len(tickers)}종목") print(f" vol override: ≥{VOL_OVERRIDE_THRESH}x | F&G≥{FNG_MIN_ENTRY} | BEAR차단N{REGIME_N}") print(f"{'='*80}") print(f" {'모드':<32} │ {'진입':>5} {'승률':>5} │ {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>6}") print(f" {'─'*76}") # ── 10분봉 모드들 ───────────────────────────────────────────────────── print(f"\n [10분봉 vol 감지 — local_vol_n={P10['local_vol_n']}봉({P10['local_vol_n']*10}분) quiet_n={P10['quiet_n']}봉({P10['quiet_n']*10}분)]") modes_10 = [ ("A. 10m 필터없음", False, False, 0.0), ("B. 10m F&G+BEAR차단", True, True, 0.0), (f"C. 10m vol≥{VOL_OVERRIDE_THRESH}x 오버라이드", True, True, VOL_OVERRIDE_THRESH), ] for label, uf, ur, vt in modes_10: r, wf, skip = sim_mode(dfs10, regime_map_10, fng_map, P10, uf, ur, vt) print(f" {label:<32} │ {fmt(r, wf, skip)}") # ── 40분봉 모드들 (기준선) ────────────────────────────────────────── print(f"\n [40분봉 vol 감지 — local_vol_n={P40['local_vol_n']}봉({P40['local_vol_n']*40}분) quiet_n={P40['quiet_n']}봉({P40['quiet_n']*40}분)]") modes_40 = [ ("D. 40m 필터없음", False, False, 0.0), ("E. 40m F&G+BEAR차단", True, True, 0.0), (f"F. 40m vol≥{VOL_OVERRIDE_THRESH}x 오버라이드", True, True, VOL_OVERRIDE_THRESH), ] for label, uf, ur, vt in modes_40: r, wf, skip = sim_mode(dfs40, regime_map_40, fng_map, P40, uf, ur, vt) print(f" {label:<32} │ {fmt(r, wf, skip)}") print(f"\n{'='*80}") # ── vol≥5x 신호 품질 비교 (10m vs 40m) ───────────────────────────── print("\n [vol≥5x 오버라이드 신호 품질 비교 — 필터 없음 조건에서 override 효과]") print(f" {'모드':<32} │ {'진입':>5} {'승률':>5} │ {'수익률':>8} {'낙폭':>6}") for label, dfs, rmap, p in [ ("10m vol≥5x (filter none)", dfs10, regime_map_10, P10), ("40m vol≥5x (filter none)", dfs40, regime_map_40, P40), ]: r, wf, skip = sim_mode(dfs, rmap, fng_map, p, False, False, VOL_OVERRIDE_THRESH) if r["total"]: print(f" {label:<32} │ {r['total']:>5}건 {r['wr']:>4.1f}% " f"{r['roi_pct']:>+7.2f}% -{r['max_dd']:>4.1f}%") print(f"{'='*80}") if __name__ == "__main__": main()