"""레짐 REGIME_N 스윕 — BULL 진입 기준 봉수 최적화. REGIME_N (pct_change 봉수) 를 1~8봉(40분~320분) 으로 변화시키며 BULL 진입만 / BEAR 차단 / 필터없음 비교. 데이터: data/sim1y_cache.pkl (10분봉 1년치) """ import os as _os, sys as _sys _sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) import json import pickle from pathlib import Path import pandas as pd from dotenv import load_dotenv load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" TOP_N = 20 BUDGET = 15_000_000 MIN_BUDGET = BUDGET * 3 // 10 MAX_POS = 3 FEE = 0.0005 TIME_STOP_MIN_PCT = 3.0 ATR_MULT = 1.5 ATR_MIN = 0.010 ATR_MAX = 0.020 VOL_MULT_DEFAULT = 2.0 VOL_MULT_BULL = 1.5 QUIET_PCT = 2.0 THRESH = 4.8 LOCAL_VOL_N = 7 QUIET_N = 3 SIGNAL_TO_N = 12 ATR_N = 7 TS_N = 12 BEAR_THRESHOLD = -0.5 BULL_THRESHOLD = 1.5 WF_WINDOW = 4 WF_MIN_WIN_RATE = 0.01 WF_SHADOW_WINS = 2 REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, "KRW-SOL": 0.15, "KRW-XRP": 0.15} def resample_40m(df): return (df.resample("40min") .agg({"open":"first","high":"max","low":"min", "close":"last","volume":"sum"}) .dropna(subset=["close"])) def build_regime_series(dfs40, regime_n): weighted = None for ticker, w in REGIME_WEIGHTS.items(): if ticker not in dfs40: continue pct = dfs40[ticker]["close"].pct_change(regime_n) * 100 weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) return weighted if weighted is not None else pd.Series(dtype=float) def calc_atr(df, buy_idx): sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx] if len(sub) < 3: return ATR_MIN try: avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean() return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) except Exception: return ATR_MIN def simulate_pos(df, buy_idx, buy_price, stop_pct): peak = buy_price for i in range(buy_idx + 1, len(df)): row = df.iloc[i] if row["high"] > peak: peak = row["high"] if row["low"] <= peak * (1 - stop_pct): sp = peak * (1 - stop_pct) pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[i], pnl pnl_now = (row["close"] - buy_price) / buy_price * 100 if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT: pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[i], pnl last = df.iloc[-1]["close"] pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 return pnl > 0, df.index[-1], pnl def run_strategy(df, ticker, regime_series, mode): trades = [] sig_i = sig_p = None in_pos = False buy_idx = buy_price = stop_pct = None i = max(LOCAL_VOL_N + 2, QUIET_N + 1) while i < len(df): ts = df.index[i] row = df.iloc[i] cur = row["close"] if in_pos: is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct) next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker)) in_pos = False; sig_i = sig_p = None; i = next_i continue score = 0.0 if not regime_series.empty and ts in regime_series.index: v = regime_series.loc[ts] score = float(v) if not pd.isna(v) else 0.0 if mode == "bear_off": if score < BEAR_THRESHOLD: sig_i = sig_p = None; i += 1; continue vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_DEFAULT elif mode == "bull_only": if score < BULL_THRESHOLD: sig_i = sig_p = None; i += 1; continue vol_mult = VOL_MULT_BULL else: vol_mult = VOL_MULT_DEFAULT if sig_i is not None and (i - sig_i) > SIGNAL_TO_N: sig_i = sig_p = None if sig_i is not None: move_pct = (cur - sig_p) / sig_p * 100 if cur < sig_p: sig_i = sig_p = None elif move_pct >= THRESH: in_pos = True; buy_idx = i; buy_price = cur stop_pct = calc_atr(df, i); sig_i = sig_p = None i += 1; continue vol_p = df.iloc[i-1]["volume"] vol_avg = df.iloc[i-1-LOCAL_VOL_N:i-1]["volume"].mean() vol_r = vol_p / vol_avg if vol_avg > 0 else 0 close_qh = df.iloc[i-QUIET_N]["close"] chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 if chg_qh < QUIET_PCT and vol_r >= vol_mult: if sig_i is None: sig_i = i; sig_p = cur else: if sig_i is not None and cur < sig_p: sig_i = sig_p = None i += 1 return trades def apply_wf(trades): history = []; shadow = 0; blocked = False; accepted = []; cnt = 0 for t in trades: is_win = int(t[0]) if not blocked: accepted.append(t); history.append(is_win) if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: blocked = True; shadow = 0 else: cnt += 1 if is_win: shadow += 1 if shadow >= WF_SHADOW_WINS: blocked = False; history = []; shadow = 0 else: shadow = 0 return accepted, cnt def apply_max_pos(trades): open_exits = []; accepted = []; skipped = [] for t in trades: buy_dt, sell_dt = t[2], t[3] open_exits = [s for s in open_exits if s > buy_dt] if len(open_exits) < MAX_POS: open_exits.append(sell_dt); accepted.append(t) else: skipped.append(t) return accepted, skipped def run_compound(accepted): portfolio = float(BUDGET); total_krw = 0.0; monthly = {} for is_win, pnl, buy_dt, sell_dt, ticker in accepted: pos_size = max(portfolio, MIN_BUDGET) / MAX_POS krw_profit = pos_size * pnl / 100 portfolio = max(portfolio + krw_profit, MIN_BUDGET) total_krw += krw_profit ym = buy_dt.strftime("%Y-%m") if ym not in monthly: monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0} monthly[ym]["trades"] += 1 monthly[ym]["wins"] += int(is_win) monthly[ym]["pnl_krw"] += krw_profit wins = sum(1 for t in accepted if t[0]) peak = BUDGET; max_dd = 0.0 pf = float(BUDGET) for is_win, pnl, buy_dt, sell_dt, ticker in accepted: pf = max(pf + max(pf, MIN_BUDGET) / MAX_POS * pnl / 100, MIN_BUDGET) peak = max(peak, pf); max_dd = max(max_dd, (peak-pf)/peak*100) return { "portfolio": portfolio, "total_krw": total_krw, "roi_pct": (portfolio-BUDGET)/BUDGET*100, "total": len(accepted), "wins": wins, "wr": wins/len(accepted)*100 if accepted else 0, "monthly": monthly, "max_dd": max_dd, } def sim_one(dfs40, regime_n, mode): rs = build_regime_series(dfs40, regime_n) all_trades = []; wf_total = 0 for ticker, df40 in dfs40.items(): raw = run_strategy(df40, ticker, rs, mode) filtered, blocked = apply_wf(raw) wf_total += blocked all_trades.extend(filtered) all_trades.sort(key=lambda x: x[2]) accepted, skipped = apply_max_pos(all_trades) result = run_compound(accepted) # BULL 비율 if not rs.empty: valid = rs.dropna() bull_pct = (valid >= BULL_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0 bear_pct = (valid < BEAR_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0 else: bull_pct = bear_pct = 0 return result, bull_pct, bear_pct, wf_total, len(skipped) def main(): print("캐시 로드 중...") cache = pickle.load(open(CACHE_FILE, "rb")) tickers = [t for t in list(cache["10m"].keys())[:TOP_N] if len(cache["10m"][t]) > 500] print(f" 종목: {len(tickers)}개\n") dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers} sample = next(iter(dfs40.values())) start_dt = sample.index[0].strftime("%Y-%m-%d") end_dt = sample.index[-1].strftime("%Y-%m-%d") SWEEP_N = [1, 2, 3, 4, 5, 6, 8, 10] # 40분 ~ 400분 (6.7h) # ── BULL 진입만 스윕 ────────────────────────────────── print(f"{'='*72}") print(f" REGIME_N 스윕 (40분봉 × N봉 변화율 기준 | BULL≥{BULL_THRESHOLD}%)") print(f" 기간: {start_dt} ~ {end_dt} / {len(tickers)}종목") print(f"{'='*72}") print(f" {'N봉':>4} {'시간':>5} │ {'BULL%':>6} {'BEAR%':>6} │ " f"{'진입':>5} {'승률':>5} │ {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}") print(f" {'─'*68}") bull_results = {} for n in SWEEP_N: r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bull_only") bull_results[n] = r mins = n * 40 h = mins // 60 m = mins % 60 time_label = f"{h}h{m:02d}m" if m else f"{h}h" if r["total"] == 0: print(f" {n:>4}봉 {time_label:>5} │ {bull_pct:>5.1f}% {bear_pct:>5.1f}% │ " f"{'진입없음':>34}") else: print(f" {n:>4}봉 {time_label:>5} │ {bull_pct:>5.1f}% {bear_pct:>5.1f}% │ " f"{r['total']:>5}건 {r['wr']:>4.1f}% │ " f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%") # ── BEAR 차단 스윕 ──────────────────────────────────── print(f"\n{'='*72}") print(f" REGIME_N 스윕 (BEAR 차단 모드 | BEAR<{BEAR_THRESHOLD}%)") print(f"{'='*72}") print(f" {'N봉':>4} {'시간':>5} │ {'BULL%':>6} {'BEAR%':>6} │ " f"{'진입':>5} {'승률':>5} │ {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}") print(f" {'─'*68}") bear_results = {} for n in SWEEP_N: r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bear_off") bear_results[n] = r mins = n * 40 h = mins // 60; m = mins % 60 time_label = f"{h}h{m:02d}m" if m else f"{h}h" print(f" {n:>4}봉 {time_label:>5} │ {bull_pct:>5.1f}% {bear_pct:>5.1f}% │ " f"{r['total']:>5}건 {r['wr']:>4.1f}% │ " f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%") # ── 베이스라인 (필터없음) ───────────────────────────── r_none, _, _, _, _ = sim_one(dfs40, 1, "none") print(f"\n 베이스라인 (필터없음): {r_none['total']}건 {r_none['wr']:.1f}% " f"{r_none['roi_pct']:+.2f}% {r_none['total_krw']:+,.0f}원 -{r_none['max_dd']:.1f}%") # ── 최적 BULL 구간 ──────────────────────────────────── valid_bull = {n: r for n, r in bull_results.items() if r["total"] >= 5} if valid_bull: best_n = max(valid_bull, key=lambda n: valid_bull[n]["roi_pct"]) best_r = valid_bull[best_n] print(f"\n ★ BULL 진입 최적 N: {best_n}봉({best_n*40}분) " f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}건 " f"승률 {best_r['wr']:.1f}%") valid_bear = {n: r for n, r in bear_results.items() if r["total"] >= 5} if valid_bear: best_n = max(valid_bear, key=lambda n: valid_bear[n]["roi_pct"]) best_r = valid_bear[best_n] print(f" ★ BEAR 차단 최적 N: {best_n}봉({best_n*40}분) " f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}건 " f"승률 {best_r['wr']:.1f}%") print(f"{'='*72}") if __name__ == "__main__": main()