"""velocity_backtest.py — 속도 진입 효과 비교 백테스트. 전략 A: 기존 거리 기반 (signal_price 대비 +THRESH% 도달 시 진입) 전략 B: 거리 + 속도 기반 (velocity >= 레짐별 VELOCITY_THRESHOLD 시 조기 진입) BULL → vel_thresh = 0.10 (공격적) NEUTRAL → vel_thresh = 0.15 (보수적) BEAR → vel_thresh = 0.20 (더 높음) 레짐 판단: KRW-BTC 1h 변동률 (캐시 데이터 활용) 10분봉 캐시(sim10m_cache.pkl)를 사용. 신호 감지: 40분봉 vol spike + 2h 횡보 (10분봉 합산/슬라이스로 계산) 진입/청산: 10분봉 단위로 체크 (실제 시스템의 15초 폴링 근사) """ import pickle from pathlib import Path import pandas as pd # ── 파라미터 ────────────────────────────────────────────────────────────────── CACHE_FILE = Path("sim10m_cache.pkl") TOP_FILE = Path("top30_tickers.pkl") TOP_N = 20 BUDGET = 15_000_000 MIN_BUDGET = BUDGET * 3 // 10 MAX_POS = 3 FEE = 0.0005 # 전략 파라미터 VOL_MULT = 2.0 # 거래량 배수 기준 QUIET_PCT = 2.0 # 2h 횡보 기준 (%) THRESH = 4.8 # 거리 기반 진입 임계값 (%) # 10분봉 기준 캔들 수 QUIET_C = 12 # 2h = 12 × 10분 VOL40_C = 4 # 40분봉 1개 = 4 × 10분봉 LOCAL_C = 7 # 로컬 평균 40분봉 7개 = 28 × 10분봉 TIMEOUT_C = 48 # 신호 타임아웃 8h = 48 × 10분봉 TS_C = 48 # 타임스탑 8h = 48 × 10분봉 ATR_C = 28 # ATR 5h = 7 × 40분 = 28 × 10분봉 ATR_MULT = 1.5 ATR_MIN = 0.010 ATR_MAX = 0.020 TS_MIN_PCT = 3.0 MIN_I = LOCAL_C * VOL40_C + VOL40_C + QUIET_C + 2 # = 42 # 속도 기반 진입 파라미터 (레짐별) VEL_THRESH_BULL = 0.10 # BULL: 0.10%/분 (공격적) VEL_THRESH_NEUTRAL = 0.15 # NEUTRAL: 0.15%/분 (보수적) VEL_THRESH_BEAR = 0.20 # BEAR: 0.20%/분 (더 높음) VELOCITY_MIN_MOVE = 0.5 # 최소 이동 % (잡음 제거) VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분 # 레짐 판단 기준 (BTC 1h 변동률) REGIME_BULL_CHANGE = 5.0 # +5% 이상 → BULL REGIME_BEAR_CHANGE = -5.0 # -5% 이하 → BEAR WF_WINDOW = 4 WF_MIN_WIN_RATE = 0.01 WF_SHADOW_WINS = 2 # ── 낙폭 제어 파라미터 ──────────────────────────────────────────────────────── HARD_STOP_PCT = 0.015 # 진입가 대비 -1.5% 즉시 청산 (하드 손절) STREAK_TIGHT_N = 2 # 연속 N회 손절 시 타임스탑 강화 TS_C_TIGHT = 24 # 강화 타임스탑 보유 시간 (4h = 24 × 10분) TS_MIN_PCT_TIGHT = 0.0 # 강화 타임스탑 최소 수익률 (0%) # ── 레짐 헬퍼 ───────────────────────────────────────────────────────────────── def build_regime_series(btc_df: pd.DataFrame) -> pd.Series: """BTC 10분봉으로 1h 변동률 계산 → 레짐 시리즈 반환.""" close = btc_df["close"] change = close.pct_change(6) * 100 # 6 × 10분 = 1h regime = pd.Series("neutral", index=close.index, dtype=object) regime[change > REGIME_BULL_CHANGE] = "bull" regime[change < REGIME_BEAR_CHANGE] = "bear" return regime def _vel_thresh_for(regime_s: pd.Series, ts) -> float: """타임스탬프 기준 레짐별 velocity 임계값 반환.""" if regime_s is None: return VEL_THRESH_NEUTRAL idx = regime_s.index.searchsorted(ts) if idx >= len(regime_s): return VEL_THRESH_NEUTRAL r = regime_s.iloc[idx] if r == "bull": return VEL_THRESH_BULL elif r == "bear": return VEL_THRESH_BEAR return VEL_THRESH_NEUTRAL # ── 헬퍼 ────────────────────────────────────────────────────────────────────── def calc_atr(df: pd.DataFrame, i: int) -> float: sub = df.iloc[max(0, i - ATR_C):i] if len(sub) < 3: return ATR_MIN try: avg = ((sub["high"] - sub["low"]) / sub["low"]).mean() return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) except Exception: return ATR_MIN def simulate_pos(df: pd.DataFrame, buy_idx: int, buy_price: float, stop_pct: float, hard_stop: bool = False, tight_ts: bool = False): """매수 이후 청산 시점·손익 계산. hard_stop : True → 진입가 대비 -HARD_STOP_PCT% 즉시 청산 tight_ts : True → 강화된 타임스탑 (4h / 0%) 적용 """ peak = buy_price hard_stop_px = buy_price * (1 - HARD_STOP_PCT) if hard_stop else None ts_c_use = TS_C_TIGHT if tight_ts else TS_C ts_min_use = TS_MIN_PCT_TIGHT if tight_ts else TS_MIN_PCT for i in range(buy_idx + 1, len(df)): row = df.iloc[i] if row["high"] > peak: peak = row["high"] # 1. 하드 손절 (진입가 대비 고정 %) if hard_stop_px is not None and row["low"] <= hard_stop_px: pnl = (hard_stop_px * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, df.index[i], pnl # 2. 트레일링 스탑 (최고가 대비) if row["low"] <= peak * (1 - stop_pct): sp = peak * (1 - stop_pct) pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, df.index[i], pnl # 3. 타임스탑 pnl_now = (row["close"] - buy_price) / buy_price * 100 if (i - buy_idx) >= ts_c_use and pnl_now < ts_min_use: pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, df.index[i], pnl last = df.iloc[-1]["close"] pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, df.index[-1], pnl def _prev_40m_vol(df: pd.DataFrame, i: int) -> float: """직전 완성 40분봉 거래량 (10분봉 4개 합산).""" return df.iloc[max(0, i - VOL40_C):i]["volume"].sum() def _local_vol_avg(df: pd.DataFrame, i: int) -> float: """로컬 5h 평균 (직전 7개 40분봉 각각의 합산 평균).""" vols = [] for k in range(1, LOCAL_C + 1): end = i - VOL40_C * (k - 1) start = end - VOL40_C if start < 0: break vols.append(df.iloc[start:end]["volume"].sum()) return sum(vols) / len(vols) if vols else 0 # ── 핵심 전략 루프 ───────────────────────────────────────────────────────────── def run_strategy(df: pd.DataFrame, ticker: str, use_velocity: bool = False, regime_s: pd.Series = None, dd_control: bool = False) -> list: """ Returns list of (is_win, pnl, buy_dt, sell_dt, ticker, entry_type) entry_type: 'dist' | 'vel' dd_control: True → 연속 손절 추적하여 hard_stop + tight_ts 적용 """ trades = [] sig_i = sig_p = None in_pos = False buy_idx = buy_price = stop_pct = None entry_type = "dist" consec_losses = 0 # 연속 손절 횟수 i = MIN_I while i < len(df): # ── 포지션 중 → 청산 계산 후 다음 진입 탐색 ────────────────── if in_pos: use_hard = dd_control and consec_losses >= STREAK_TIGHT_N use_tight = dd_control and consec_losses >= STREAK_TIGHT_N is_win, sdt, pnl = simulate_pos( df, buy_idx, buy_price, stop_pct, hard_stop=use_hard, tight_ts=use_tight, ) next_i = next( (j for j in range(i, len(df)) if df.index[j] > sdt), len(df), ) trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, entry_type)) # 연속 손절 카운터 업데이트 if is_win: consec_losses = 0 else: consec_losses += 1 in_pos = False sig_i = sig_p = None i = next_i continue close = df.iloc[i]["close"] # ── 신호 감지 ───────────────────────────────────────────────── prev_vol = _prev_40m_vol(df, i) local_avg = _local_vol_avg(df, i) vol_r = prev_vol / local_avg if local_avg > 0 else 0 close_2h = df.iloc[i - QUIET_C]["close"] quiet = abs(close - close_2h) / close_2h * 100 < QUIET_PCT spike = vol_r >= VOL_MULT if quiet and spike: if sig_i is None: sig_i, sig_p = i, close else: if sig_i is not None and close < sig_p: sig_i = sig_p = None # 타임아웃 if sig_i is not None and (i - sig_i) > TIMEOUT_C: sig_i = sig_p = None # ── 진입 판단 ───────────────────────────────────────────────── if sig_i is not None: move_pct = (close - sig_p) / sig_p * 100 age_min = (i - sig_i) * 10 # 10분봉 × 10분 # A. 거리 기반 if move_pct >= THRESH: in_pos = True buy_idx = i buy_price = close stop_pct = calc_atr(df, i) entry_type = "dist" sig_i = sig_p = None i += 1 continue # B. 속도 기반 (use_velocity=True 일 때만) if (use_velocity and age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE): velocity = move_pct / age_min vel_thresh = _vel_thresh_for(regime_s, df.index[i]) if velocity >= vel_thresh: in_pos = True buy_idx = i buy_price = close stop_pct = calc_atr(df, i) entry_type = "vel" sig_i = sig_p = None i += 1 continue i += 1 return trades # ── WF 필터 ─────────────────────────────────────────────────────────────────── def apply_wf(trades: list) -> tuple: history, shadow_streak, blocked = [], 0, False accepted, blocked_cnt = [], 0 for trade in trades: is_win = int(trade[0]) if not blocked: accepted.append(trade) history.append(is_win) if len(history) >= WF_WINDOW: if sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: blocked = True shadow_streak = 0 else: blocked_cnt += 1 shadow_streak = (shadow_streak + 1) if is_win else 0 if shadow_streak >= WF_SHADOW_WINS: blocked = False history = [] shadow_streak = 0 return accepted, blocked_cnt # ── MAX_POSITIONS 필터 ──────────────────────────────────────────────────────── def apply_max_pos(all_trades: list) -> tuple: open_exits, accepted, skipped = [], [], [] for trade in sorted(all_trades, key=lambda x: x[2]): buy_dt, sell_dt = trade[2], trade[3] open_exits = [s for s in open_exits if s > buy_dt] if len(open_exits) < MAX_POS: open_exits.append(sell_dt) accepted.append(trade) else: skipped.append(trade) return accepted, skipped # ── 복리 시뮬 ───────────────────────────────────────────────────────────────── def simulate(accepted: list) -> dict: portfolio = float(BUDGET) total_krw = 0.0 monthly = {} peak_pf = BUDGET max_dd = 0.0 win_cnt = 0 vel_count = sum(1 for t in accepted if t[5] == "vel") vel_wins = sum(1 for t in accepted if t[5] == "vel" and t[0]) vel_pnls = [t[1] for t in accepted if t[5] == "vel"] dist_pnls = [t[1] for t in accepted if t[5] == "dist"] for is_win, pnl, buy_dt, sell_dt, ticker, etype in accepted: pos_size = max(portfolio, MIN_BUDGET) / MAX_POS krw_profit = pos_size * pnl / 100 portfolio = max(portfolio + krw_profit, MIN_BUDGET) total_krw += krw_profit peak_pf = max(peak_pf, portfolio) dd = (peak_pf - portfolio) / peak_pf * 100 max_dd = max(max_dd, dd) win_cnt += int(is_win) ym = buy_dt.strftime("%Y-%m") m = monthly.setdefault(ym, {"t": 0, "w": 0, "krw": 0.0}) m["t"] += 1; m["w"] += int(is_win); m["krw"] += krw_profit n = len(accepted) return { "portfolio": portfolio, "total_krw": total_krw, "roi": (portfolio - BUDGET) / BUDGET * 100, "n": n, "wins": win_cnt, "wr": win_cnt / n * 100 if n else 0, "max_dd": max_dd, "monthly": monthly, "vel_count": vel_count, "vel_wins": vel_wins, "vel_wr": vel_wins / vel_count * 100 if vel_count else 0, "vel_avg_pnl": sum(vel_pnls) / len(vel_pnls) if vel_pnls else 0, "dist_avg_pnl": sum(dist_pnls) / len(dist_pnls) if dist_pnls else 0, } # ── 메인 ────────────────────────────────────────────────────────────────────── def main(): print("캐시 로드...") cache = pickle.load(open(CACHE_FILE, "rb")) top30 = pickle.load(open(TOP_FILE, "rb")) tickers = [t for t in top30[:TOP_N] if t in cache["10m"]] print(f"유효 종목: {len(tickers)}개\n") # BTC 레짐 시리즈 빌드 btc_df = cache["10m"].get("KRW-BTC") regime_s = build_regime_series(btc_df) if btc_df is not None else None if regime_s is not None: bull_pct = (regime_s == "bull").mean() * 100 bear_pct = (regime_s == "bear").mean() * 100 print(f"레짐 분포: BULL {bull_pct:.1f}% / NEUTRAL {100-bull_pct-bear_pct:.1f}% / BEAR {bear_pct:.1f}%") print(f"vel threshold: BULL={VEL_THRESH_BULL} / NEUTRAL={VEL_THRESH_NEUTRAL} / BEAR={VEL_THRESH_BEAR}\n") all_a, all_b, all_c = [], [], [] wf_a_total = wf_b_total = wf_c_total = 0 for t in tickers: df = cache["10m"][t] if len(df) < MIN_I + 50: continue raw_a = run_strategy(df, t, use_velocity=False) raw_b = run_strategy(df, t, use_velocity=True, regime_s=regime_s) raw_c = run_strategy(df, t, use_velocity=True, regime_s=regime_s, dd_control=True) fa, ba = apply_wf(raw_a) fb, bb = apply_wf(raw_b) fc, bc = apply_wf(raw_c) wf_a_total += ba; wf_b_total += bb; wf_c_total += bc all_a.extend(fa); all_b.extend(fb); all_c.extend(fc) acc_a, skp_a = apply_max_pos(all_a) acc_b, skp_b = apply_max_pos(all_b) acc_c, skp_c = apply_max_pos(all_c) ra = simulate(acc_a) rb = simulate(acc_b) rc = simulate(acc_c) # ── 날짜 범위 ───────────────────────────────────────── def date_range(acc): if acc: s = min(t[2] for t in acc).strftime("%Y-%m-%d") e = max(t[3] for t in acc).strftime("%Y-%m-%d") return f"{s} ~ {e}" return "N/A" # ── 출력 ───────────────────────────────────────────── W = 72 print("=" * W) print(f" 낙폭 제어 비교 백테스트 | 10분봉 | {len(tickers)}종목") print(f" 기간: {date_range(acc_a)}") print(f" hard_stop={HARD_STOP_PCT*100:.1f}% | tight_ts={TS_C_TIGHT*10//60}h+{TS_MIN_PCT_TIGHT:.0f}% " f"(연속 {STREAK_TIGHT_N}손절 후)") print("=" * W) print(f" {'항목':<22} {'A. 기존':>12} {'B. +속도':>12} {'C. +속도+DD제어':>14}") print(f" {'─'*64}") def row3(label, va, vb, vc, fmt="{}"): sa, sb, sc = fmt.format(va), fmt.format(vb), fmt.format(vc) try: dbc = float(str(vc).replace(",","").replace("%","").replace("원","")) \ - float(str(va).replace(",","").replace("%","").replace("원","")) dc = f" ({dbc:+.1f})" if abs(dbc) >= 0.01 else "" except Exception: dc = "" print(f" {label:<22} {sa:>12} {sb:>12} {sc:>14}{dc}") row3("총 진입", ra["n"], rb["n"], rc["n"], "{:,}건") row3(" 속도 진입", 0, rb["vel_count"], rc["vel_count"], "{:,}건") row3("WF 차단", wf_a_total, wf_b_total, wf_c_total, "{:,}건") row3("MAX_POS 스킵", len(skp_a), len(skp_b), len(skp_c), "{:,}건") print(f" {'─'*64}") row3("승률", f"{ra['wr']:.1f}%", f"{rb['wr']:.1f}%", f"{rc['wr']:.1f}%") row3(" 속도진입 승률","-", f"{rb['vel_wr']:.1f}%", f"{rc['vel_wr']:.1f}%") print(f" {'─'*64}") row3("평균 pnl (거리)",f"{ra['dist_avg_pnl']:.2f}%", f"{rb['dist_avg_pnl']:.2f}%", f"{rc['dist_avg_pnl']:.2f}%") row3("평균 pnl (속도)","-", f"{rb['vel_avg_pnl']:.2f}%", f"{rc['vel_avg_pnl']:.2f}%") print(f" {'─'*64}") row3("최종 자산", f"{ra['portfolio']:,.0f}원", f"{rb['portfolio']:,.0f}원", f"{rc['portfolio']:,.0f}원") row3("총 수익", f"{ra['total_krw']:+,.0f}원", f"{rb['total_krw']:+,.0f}원", f"{rc['total_krw']:+,.0f}원") row3("수익률", f"{ra['roi']:.2f}%", f"{rb['roi']:.2f}%", f"{rc['roi']:.2f}%") row3("최대 낙폭", f"{-ra['max_dd']:.2f}%", f"{-rb['max_dd']:.2f}%", f"{-rc['max_dd']:.2f}%") print("=" * W) # ── 월별 ───────────────────────────────────────────── print(f"\n── 월별 수익 비교 {'─'*50}") print(f" {'월':^8} │ {'A':>5} {'A%':>4} {'A수익':>10} │ " f"{'B':>5} {'B%':>4} {'B수익':>10} │ " f"{'C':>5} {'C%':>4} {'C수익':>10}") all_months = sorted(set(list(ra["monthly"]) + list(rb["monthly"]) + list(rc["monthly"]))) for ym in all_months: ma = ra["monthly"].get(ym, {"t":0,"w":0,"krw":0}) mb = rb["monthly"].get(ym, {"t":0,"w":0,"krw":0}) mc = rc["monthly"].get(ym, {"t":0,"w":0,"krw":0}) wra = ma["w"]/ma["t"]*100 if ma["t"] else 0 wrb = mb["w"]/mb["t"]*100 if mb["t"] else 0 wrc = mc["w"]/mc["t"]*100 if mc["t"] else 0 print(f" {ym:^8} │ {ma['t']:>4}건 {wra:>3.0f}% {ma['krw']:>+9,.0f}원 │ " f"{mb['t']:>4}건 {wrb:>3.0f}% {mb['krw']:>+9,.0f}원 │ " f"{mc['t']:>4}건 {wrc:>3.0f}% {mc['krw']:>+9,.0f}원") print("=" * W) if __name__ == "__main__": main()