"""현재 전략 기준 45일 복리 시뮬레이션 — 40분봉. sim_45m40.py의 검증된 코어 로직을 기반으로 현재 전략 추가사항만 반영: + F&G 필터 (FNG_MIN_ENTRY=41) + 시장 레짐 필터 (BEAR < -0.5% → 차단, BULL ≥ 1.5% → vol_mult 완화) + 신호 강도별 진입 임계값 티어 (5x→1%, 3.5x→2%, 2.5x→3%, 기본→5%) + 속도 기반 조기 진입 (0.10%/분) """ import json import os import pickle import sys import urllib.request import datetime from pathlib import Path import pandas as pd from dotenv import load_dotenv load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") sys.path.insert(0, str(Path(__file__).parent.parent)) # ── 파라미터 ───────────────────────────────────────────── CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl" TOP_N = 20 BUDGET = 15_000_000 MIN_BUDGET = BUDGET * 3 // 10 MAX_POS = 3 FEE = 0.0005 TIME_STOP_MIN_PCT= 3.0 ATR_MULT = 1.5 ATR_MIN = 0.010 ATR_MAX = 0.020 VOL_MULT_NEUTRAL = 2.0 # NEUTRAL 레짐 VOL_MULT_BULL = 1.5 # BULL 레짐 QUIET_PCT = 2.0 THRESH_BASE = 5.0 # 기본 진입 임계값 (TREND_AFTER_VOL) # 신호 강도별 임계값 티어 ENTRY_TIERS = [(5.0, 1.0), (3.5, 2.0), (2.5, 3.0)] # 속도 진입 VELOCITY_THRESHOLD = 0.10 # %/분 VELOCITY_MIN_MOVE = 0.5 # 최소 이동 % VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분 # F&G FNG_MIN_ENTRY = int(os.getenv("FNG_MIN_ENTRY", "41")) # 레짐 BEAR_THRESHOLD = -0.5 BULL_THRESHOLD = 1.5 REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, "KRW-SOL": 0.15, "KRW-XRP": 0.15} # 40분봉 봉수 환산 LOCAL_VOL_N = 7 # 5h QUIET_N = 3 # 2h SIGNAL_TO_N = 12 # 8h ATR_N = 7 TS_N = 12 # 8h (타임스탑) REGIME_N = 3 # 2h (레짐 추세) # ── F&G 히스토리 ───────────────────────────────────────── def load_fng_history() -> dict: try: url = "https://api.alternative.me/fng/?limit=90&format=json" with urllib.request.urlopen(url, timeout=10) as r: data = json.loads(r.read()) result = {} for e in data["data"]: dt = datetime.datetime.fromtimestamp(int(e["timestamp"])) result[dt.strftime("%Y-%m-%d")] = int(e["value"]) return result except Exception as ex: print(f" [경고] F&G 로드 실패: {ex} → 필터 비활성화") return {} # ── 리샘플링 ───────────────────────────────────────────── def resample_40m(df: pd.DataFrame) -> pd.DataFrame: return ( df.resample("40min") .agg({"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}) .dropna(subset=["close"]) ) # ── 레짐 시리즈 ────────────────────────────────────────── def build_regime_series(dfs: dict) -> pd.Series: weighted = None for ticker, w in REGIME_WEIGHTS.items(): if ticker not in dfs: continue pct = dfs[ticker]["close"].pct_change(REGIME_N) * 100 weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) return weighted if weighted is not None else pd.Series(dtype=float) # ── 임계값 ─────────────────────────────────────────────── def calc_entry_threshold(vol_ratio: float) -> float: for min_r, thr in ENTRY_TIERS: if vol_ratio >= min_r: return thr return THRESH_BASE # ── ATR ────────────────────────────────────────────────── def calc_atr(df: pd.DataFrame, buy_idx: int) -> float: sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx] if len(sub) < 3: return ATR_MIN try: avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean() return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) except Exception: return ATR_MIN # ── 포지션 시뮬 (기존 sim_45m40.py와 동일) ─────────────── def simulate_pos(df: pd.DataFrame, buy_idx: int, buy_price: float, stop_pct: float): peak = buy_price for i in range(buy_idx + 1, len(df)): row = df.iloc[i] ts = df.index[i] if row["high"] > peak: peak = row["high"] if row["low"] <= peak * (1 - stop_pct): sp = peak * (1 - stop_pct) pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, ts, pnl, "trailing_stop" pnl_now = (row["close"] - buy_price) / buy_price * 100 if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT: pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, ts, pnl, "time_stop" last = df.iloc[-1]["close"] pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, df.index[-1], pnl, "end_of_data" # ── vol-lead 전략 (현재 전략 파라미터 전체 반영) ────────── def run_vol_lead(df: pd.DataFrame, ticker: str, fng_map: dict, regime_series: pd.Series) -> list: trades = [] sig_i = None # 신호 봉 인덱스 sig_p = None # 신호가 sig_vr = 0.0 # 신호 vol_ratio in_pos = False buy_idx = buy_price = stop_pct = None i = max(LOCAL_VOL_N + 2, QUIET_N + 1) while i < len(df): ts = df.index[i] row = df.iloc[i] cur = row["close"] # ── 포지션 보유 중: 청산 체크 ───────────────────── if in_pos: is_win, sdt, pnl, reason = simulate_pos(df, buy_idx, buy_price, stop_pct) next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, reason)) in_pos = False sig_i = sig_p = None i = next_i continue # ── F&G 필터 ────────────────────────────────────── date_str = ts.strftime("%Y-%m-%d") if fng_map: fv = fng_map.get(date_str, 50) if fv < FNG_MIN_ENTRY: sig_i = sig_p = None # 신호 초기화 i += 1 continue # ── 레짐 필터 ───────────────────────────────────── score = 0.0 if not regime_series.empty and ts in regime_series.index: v = regime_series.loc[ts] score = float(v) if not pd.isna(v) else 0.0 if score < BEAR_THRESHOLD: sig_i = sig_p = None i += 1 continue vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_NEUTRAL # ── 신호 타임아웃 ────────────────────────────────── if sig_i is not None and (i - sig_i) > SIGNAL_TO_N: sig_i = sig_p = None # ── 신호 있음: 진입 체크 ────────────────────────── if sig_i is not None: move_pct = (cur - sig_p) / sig_p * 100 age_min = (i - sig_i) * 40 # 봉수 → 분 entry_thr = calc_entry_threshold(sig_vr) if cur < sig_p: # 신호가 이하 하락 → 초기화 sig_i = sig_p = None elif move_pct >= entry_thr: # 거리 기반 진입 in_pos = True buy_idx = i buy_price = cur stop_pct = calc_atr(df, i) sig_i = sig_p = None elif age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE: velocity = move_pct / age_min if velocity >= VELOCITY_THRESHOLD: # 속도 기반 조기 진입 in_pos = True buy_idx = i buy_price = cur stop_pct = calc_atr(df, i) sig_i = sig_p = None i += 1 continue # ── 신호 없음: 축적 조건 체크 ──────────────────── vol_p = df.iloc[i - 1]["volume"] vol_avg = df.iloc[i - 1 - LOCAL_VOL_N:i - 1]["volume"].mean() vol_r = vol_p / vol_avg if vol_avg > 0 else 0 close_qh = df.iloc[i - QUIET_N]["close"] chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 quiet = chg_qh < QUIET_PCT spike = vol_r >= vol_mult if quiet and spike: if sig_i is None: sig_i = i sig_p = cur sig_vr = vol_r else: if sig_i is not None and cur < sig_p: sig_i = sig_p = None i += 1 return trades # ── WF 필터 (기존 동일) ────────────────────────────────── def apply_wf(trades: list) -> tuple: history = [] shadow_streak = 0 blocked = False accepted = [] blocked_cnt = 0 for trade in trades: is_win = int(trade[0]) if not blocked: accepted.append(trade) history.append(is_win) if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: blocked = True shadow_streak = 0 else: blocked_cnt += 1 if is_win: shadow_streak += 1 if shadow_streak >= WF_SHADOW_WINS: blocked = False history = [] shadow_streak = 0 else: shadow_streak = 0 return accepted, blocked_cnt WF_WINDOW = 4 WF_MIN_WIN_RATE = 0.01 WF_SHADOW_WINS = 2 # ── MAX_POSITIONS (기존 동일) ──────────────────────────── def apply_max_positions(all_trades: list) -> tuple: open_exits, accepted, skipped = [], [], [] for trade in all_trades: buy_dt, sell_dt = trade[2], trade[3] open_exits = [s for s in open_exits if s > buy_dt] if len(open_exits) < MAX_POS: open_exits.append(sell_dt) accepted.append(trade) else: skipped.append(trade) return accepted, skipped # ── 복리 시뮬 (기존 동일) ─────────────────────────────── def simulate(accepted: list) -> dict: portfolio = float(BUDGET) total_krw = 0.0 monthly = {} trade_log = [] reason_cnt = {} for trade in accepted: is_win, pnl, buy_dt, sell_dt, ticker, reason = trade pos_size = max(portfolio, MIN_BUDGET) / MAX_POS krw_profit = pos_size * pnl / 100 portfolio = max(portfolio + krw_profit, MIN_BUDGET) total_krw += krw_profit reason_cnt[reason] = reason_cnt.get(reason, 0) + 1 ym = buy_dt.strftime("%Y-%m") if ym not in monthly: monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0} monthly[ym]["trades"] += 1 monthly[ym]["wins"] += int(is_win) monthly[ym]["pnl_krw"] += krw_profit trade_log.append({ "buy_dt": buy_dt, "sell_dt": sell_dt, "ticker": ticker, "is_win": is_win, "pnl_pct": pnl, "reason": reason, "pos_size": pos_size, "krw_profit": krw_profit, "portfolio": portfolio, }) wins = sum(1 for t in accepted if t[0]) return { "portfolio": portfolio, "total_krw": total_krw, "roi_pct": (portfolio - BUDGET) / BUDGET * 100, "total": len(accepted), "wins": wins, "wr": wins / len(accepted) * 100 if accepted else 0, "monthly": monthly, "trade_log": trade_log, "reason_cnt": reason_cnt, } # ── 메인 ───────────────────────────────────────────────── def main(): print("=" * 62) print("현재 전략 기준 시뮬 (F&G + 레짐 + 티어임계 + 속도진입)") print("=" * 62) print("F&G 히스토리 로드...") fng_map = load_fng_history() if fng_map: vals = sorted(fng_map.items()) print(f" {vals[0][0]} ~ {vals[-1][0]} ({len(fng_map)}일)") else: print(" F&G 데이터 없음 — 필터 비활성화") print("캐시 로드...") cache = pickle.load(open(CACHE_FILE, "rb")) tickers = [t for t in list(cache["10m"].keys())[:TOP_N] if len(cache["10m"][t]) > 200] print(f" 종목: {len(tickers)}개\n") dfs_40m = {t: resample_40m(cache["10m"][t]) for t in tickers} print("레짐 시리즈 계산...") regime_series = build_regime_series(dfs_40m) sample_df = next(iter(dfs_40m.values())) start_date = sample_df.index[0].strftime("%Y-%m-%d") end_date = sample_df.index[-1].strftime("%Y-%m-%d") print(f" 기간: {start_date} ~ {end_date}\n") # F&G 차단 일수 if fng_map: period_dates = [d for d in fng_map if start_date <= d <= end_date] fng_blocked = sum(1 for d in period_dates if fng_map.get(d, 50) < FNG_MIN_ENTRY) fng_allowed = len(period_dates) - fng_blocked else: fng_blocked = fng_allowed = 0 all_trades = [] wf_blocked = 0 for ticker in tickers: df40 = dfs_40m[ticker] raw = run_vol_lead(df40, ticker, fng_map, regime_series) filtered, blocked = apply_wf(raw) wf_blocked += blocked all_trades.extend(filtered) all_trades.sort(key=lambda x: x[2]) # buy_dt 기준 정렬 accepted, skipped = apply_max_positions(all_trades) result = simulate(accepted) # 최대 낙폭 peak = BUDGET max_dd = 0.0 for t in result["trade_log"]: peak = max(peak, t["portfolio"]) dd = (peak - t["portfolio"]) / peak * 100 max_dd = max(max_dd, dd) total = result["total"] wins = result["wins"] print(f"{'='*62}") print(f" 기간: {start_date} ~ {end_date} ({len(tickers)}종목 / 40분봉)") print(f" F&G 차단: {fng_blocked}일 / 허용: {fng_allowed}일 (기준 FNG≥{FNG_MIN_ENTRY})") print(f"{'='*62}") print(f" 신호 발생: {len(all_trades)+wf_blocked:>4}건 (WF 차단: {wf_blocked}건)") print(f" 실제 진입: {total:>4}건 ({len(skipped)}건 MAX_POS 스킵)") print(f" 승 / 패: {wins}승 {total-wins}패 (승률 {result['wr']:.1f}%)" if total else " 진입 없음") print(f" {'─'*52}") print(f" 초기 예산: {BUDGET:>15,}원") print(f" 최종 자산: {result['portfolio']:>15,.0f}원") print(f" 순수익: {result['total_krw']:>+15,.0f}원") print(f" 수익률: {result['roi_pct']:>+14.2f}%") print(f" 최대 낙폭: {-max_dd:>+14.2f}%" f" ({-max_dd/100*BUDGET:>+,.0f}원)") monthly_krw = [m["pnl_krw"] for m in result["monthly"].values()] avg_m = sum(monthly_krw) / len(monthly_krw) if monthly_krw else 0 print(f" 월평균 수익: {avg_m:>+13,.0f}원") print(f"\n── 청산 사유 {'─'*44}") label_map = {"trailing_stop": "트레일링스탑", "time_stop": "타임스탑", "end_of_data": "데이터종료"} for r, cnt in sorted(result["reason_cnt"].items(), key=lambda x: -x[1]): print(f" {label_map.get(r, r):12}: {cnt:>3}건") print(f"\n── 월별 수익 {'─'*44}") print(f" {'월':^8} │ {'거래':>4} {'승률':>5} │" f" {'월수익(KRW)':>13} {'누적수익(KRW)':>14}") cum = 0.0 for ym, m in sorted(result["monthly"].items()): wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0 cum += m["pnl_krw"] print(f" {ym:^8} │ {m['trades']:>4}건 {wr:>4.0f}% │" f" {m['pnl_krw']:>+13,.0f}원 {cum:>+14,.0f}원") print(f"\n── 파라미터 {'─'*46}") print(f" F&G≥{FNG_MIN_ENTRY} 레짐BEAR<{BEAR_THRESHOLD}% BULL≥{BULL_THRESHOLD}%") print(f" VOL: {VOL_MULT_NEUTRAL}x(중립)/{VOL_MULT_BULL}x(강세) 횡보<{QUIET_PCT}%") print(f" 임계: 5x→1% / 3.5x→2% / 2.5x→3% / 기본→{THRESH_BASE}%") print(f" 속도: ≥{VELOCITY_THRESHOLD}%/분 (≥{VELOCITY_MIN_MOVE}% / ≥{VELOCITY_MIN_AGE_M}분)") print(f" ATR: ×{ATR_MULT} ({ATR_MIN*100:.0f}~{ATR_MAX*100:.0f}%) 타임스탑: 8h/{TIME_STOP_MIN_PCT}%") print(f"{'='*62}") if __name__ == "__main__": main()