"""F&G 필터 전후 수익 비교 시뮬레이션 필터 없음 vs F&G ≥ 41 필터 적용 시 1년치 성과를 직접 비교. 표시: - 거래 수, 승률, 평균 PnL, 총 누적 PnL - 거래당 고정 자본 100만 원 기준 KRW 환산 손익 - 월별 손익 흐름 (계절성 확인) - 극공포 차단 일수 통계 결과는 Oracle DB(backtest_results)에 저장. 데이터: 1년치 1h 캔들 (배치 수집) """ import os as _os, sys as _sys _sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) from __future__ import annotations import datetime import json import sys import time import urllib.request from dataclasses import dataclass import pandas as pd import pyupbit # ── DB 저장 ───────────────────────────────────────────────── try: from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk DB_ENABLED = True except Exception as e: print(f" [DB 비활성화] {e}") DB_ENABLED = False TICKERS = [ "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE", "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK", "KRW-SUI", "KRW-HBAR", "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO", "KRW-KAVA", "KRW-KNC", ] CAPITAL_PER_TRADE = 1_000_000 # 거래당 고정 자본 (KRW) # 전략 파라미터 (현행) VOL_MULT = 2.0 QUIET_2H = 2.0 SIG_TO_H = 8 MOM_THR = 3.0 SIG_CANCEL = 3.0 TRAIL_STOP = 0.015 TIME_H = 24 TIME_MIN = 3.0 FNG_MIN = 41 # 이 값 미만이면 진입 차단 # ── 데이터 수집 ────────────────────────────────────────────── def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None: all_dfs = [] end = datetime.datetime.now() batch = 1440 prev_oldest = None while True: df = pyupbit.get_ohlcv( ticker, interval="minute60", count=batch, to=end.strftime("%Y-%m-%d %H:%M:%S"), ) if df is None or df.empty: break all_dfs.append(df) oldest = df.index[0] if prev_oldest is not None and oldest >= prev_oldest: break prev_oldest = oldest cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) if oldest <= cutoff: break end = oldest time.sleep(0.12) if not all_dfs: return None combined = pd.concat(all_dfs).sort_index() combined = combined[~combined.index.duplicated(keep="last")] cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) return combined[combined.index >= cutoff] def load_fng() -> dict[str, int]: url = "https://api.alternative.me/fng/?limit=400&format=json" with urllib.request.urlopen(url, timeout=10) as r: data = json.loads(r.read()) return { datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"): int(d["value"]) for d in data["data"] } def fng_val(fng_map, ts) -> int: return fng_map.get(ts.strftime("%Y-%m-%d"), 50) # ── 시뮬레이션 ────────────────────────────────────────────── @dataclass class Trade: pnl: float h: int fng: int exit: str date: str # YYYY-MM def simulate(df, fng_map, fng_min: int | None = None) -> list[Trade]: closes = df["close"].values vols = df["volume"].values idx = df.index trades: list[Trade] = [] sig_px = sig_i = None pos_buy = pos_peak = pos_i = pos_fng = None for i in range(7, len(closes) - max(TIME_H + 4, 10)): if pos_buy is not None: cur = closes[i] if cur > pos_peak: pos_peak = cur if (pos_peak - cur) / pos_peak >= TRAIL_STOP: trades.append(Trade( (cur - pos_buy) / pos_buy * 100, i - pos_i, pos_fng, "trail", idx[i].strftime("%Y-%m"), )) pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None continue if i - pos_i >= TIME_H: pnl = (cur - pos_buy) / pos_buy * 100 if pnl < TIME_MIN: trades.append(Trade( pnl, i - pos_i, pos_fng, "time", idx[i].strftime("%Y-%m"), )) pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None continue continue if sig_px is not None: if i - sig_i > SIG_TO_H: sig_px = sig_i = None elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL: sig_px = sig_i = None if sig_px is None: vol_avg = vols[i - 6:i - 1].mean() if vol_avg <= 0: continue if vols[i - 1] / vol_avg >= VOL_MULT: if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H: sig_px = closes[i] sig_i = i continue fv = fng_val(fng_map, idx[i]) if fng_min is not None and fv < fng_min: continue if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR: pos_buy = pos_peak = closes[i] pos_i = i pos_fng = fv sig_px = sig_i = None return trades def stats(trades: list[Trade]) -> dict: if not trades: return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0, avg_win=0, avg_loss=0, max_dd=0, krw_total=0) wins = [t for t in trades if t.pnl > 0] losses = [t for t in trades if t.pnl <= 0] aw = sum(t.pnl for t in wins) / len(wins) if wins else 0 al = sum(t.pnl for t in losses) / len(losses) if losses else 0 cum = pk = max_dd = 0.0 for t in trades: cum += t.pnl if cum > pk: pk = cum if pk - cum > max_dd: max_dd = pk - cum total_pnl = sum(t.pnl for t in trades) return dict( n=len(trades), wr=len(wins) / len(trades) * 100, avg_pnl=total_pnl / len(trades), total_pnl=total_pnl, rr=abs(aw / al) if al else 0, avg_win=aw, avg_loss=al, max_dd=max_dd, krw_total=total_pnl / 100 * CAPITAL_PER_TRADE, ) def monthly_pnl(trades: list[Trade]) -> dict[str, float]: """월별 누적 PnL(%) 반환.""" monthly: dict[str, float] = {} for t in trades: monthly[t.date] = monthly.get(t.date, 0) + t.pnl return dict(sorted(monthly.items())) def main(): print("F&G 데이터 로드...") fng_map = load_fng() # F&G 분포 block_days = sum(1 for v in fng_map.values() if v < FNG_MIN) total_days = len(fng_map) print(f" 1년 F&G 분포: 진입차단(< {FNG_MIN}) = {block_days}일 / {total_days}일 " f"({block_days/total_days*100:.1f}%)") print(f" 진입허용(≥ {FNG_MIN}) = {total_days - block_days}일 ({(total_days-block_days)/total_days*100:.1f}%)\n") print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...") datasets: dict[str, pd.DataFrame] = {} for i, tk in enumerate(TICKERS): try: df = fetch_1y(tk, total_days=365) if df is not None and len(df) > 100: datasets[tk] = df sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ") except Exception as e: sys.stderr.write(f"\r {tk} 실패: {e} ") sys.stderr.write("\n") print(f" 완료: {len(datasets)}개 종목\n") # ── 두 가지 조건 시뮬레이션 ────────────────────────────── # A: 필터 없음 (현행) # B: F&G ≥ 41 (신규) all_trades_A: list[Trade] = [] all_trades_B: list[Trade] = [] per_ticker_A: dict[str, list[Trade]] = {} per_ticker_B: dict[str, list[Trade]] = {} for tk, df in datasets.items(): ta = simulate(df, fng_map, fng_min=None) tb = simulate(df, fng_map, fng_min=FNG_MIN) all_trades_A.extend(ta) all_trades_B.extend(tb) per_ticker_A[tk] = ta per_ticker_B[tk] = tb sa = stats(all_trades_A) sb = stats(all_trades_B) # ── 결과 출력 ───────────────────────────────────────────── print("=" * 80) print(f" F&G 필터 전후 비교 (1년치 / {len(datasets)}개 종목 / 1h캔들 / 자본 {CAPITAL_PER_TRADE:,}원/거래)") print("=" * 80) print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} " f"{'손익비':>5} {'총PnL':>8} {'MaxDD':>7} {'KRW손익':>14}") print(" " + "-" * 76) for label, s in [("필터 없음 (현행)", sa), (f"F&G≥{FNG_MIN} 필터 (신규)", sb)]: krw_str = f"{s['krw_total']:>+,.0f}원" print( f" {label:<26} {s['n']:>5}건 {s['wr']:>5.1f}% " f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} " f"{s['total_pnl']:>+7.1f}% -{s['max_dd']:>5.1f}% {krw_str:>14}" ) diff_trades = sb["n"] - sa["n"] diff_krw = sb["krw_total"] - sa["krw_total"] diff_wr = sb["wr"] - sa["wr"] print(f"\n 변화: 거래수 {diff_trades:+d}건 | 승률 {diff_wr:+.1f}%p | " f"KRW손익 {diff_krw:>+,.0f}원") # ── 월별 손익 흐름 ──────────────────────────────────────── print() print(" 월별 손익 비교 (필터없음 vs F&G≥41):") print(f" {'월':>8} {'차단일수':>6} {'필터없음':>9} {'F&G필터':>9} {'개선':>8} {'누적(필터)':>12}") print(" " + "-" * 62) ma = monthly_pnl(all_trades_A) mb = monthly_pnl(all_trades_B) all_months = sorted(set(ma.keys()) | set(mb.keys())) cum_b = 0.0 for m in all_months: pa = ma.get(m, 0.0) pb = mb.get(m, 0.0) cum_b += pb diff = pb - pa # 해당 월 차단 일수 yr, mo = int(m[:4]), int(m[5:]) blocked = sum( 1 for d, v in fng_map.items() if d.startswith(m) and v < FNG_MIN ) bar = "▓" * min(int(abs(pb) / 3), 12) if pb > 0 else "░" * min(int(abs(pb) / 3), 12) sign = "+" if pb > 0 else "" diff_sign = "▲" if diff > 0 else ("▼" if diff < 0 else "=") print( f" {m} {blocked:>4}일차단 " f"{pa:>+8.1f}% {sign}{pb:>8.1f}% " f"{diff_sign}{abs(diff):>6.1f}% {cum_b:>+10.1f}%" ) # ── 종목별 비교 (상위/하위) ─────────────────────────────── print() print(" 종목별 성과 비교 (필터없음 vs F&G≥41):") print(f" {'종목':<14} {'현행거래':>6} {'현행PnL':>8} {'필터거래':>7} {'필터PnL':>8} {'개선':>8}") print(" " + "-" * 58) ticker_rows = [] for tk in sorted(datasets.keys()): ta_list = per_ticker_A.get(tk, []) tb_list = per_ticker_B.get(tk, []) pa = sum(t.pnl for t in ta_list) if ta_list else 0 pb = sum(t.pnl for t in tb_list) if tb_list else 0 ticker_rows.append((tk, len(ta_list), pa, len(tb_list), pb, pb - pa)) for row in sorted(ticker_rows, key=lambda x: x[5], reverse=True): tk, na, pa, nb, pb, delta = row mark = "▲" if delta > 1 else ("▼" if delta < -1 else " =") print( f" {tk:<14} {na:>6}건 {pa:>+7.1f}% {nb:>6}건 {pb:>+7.1f}% " f"{mark}{abs(delta):>6.1f}%" ) # ── 극공포 차단 효과 분석 ───────────────────────────────── print() print(f" F&G < {FNG_MIN} 구간(차단) 거래 성과 분석:") blocked_trades = [t for t in all_trades_A if t.fng < FNG_MIN] if blocked_trades: sb2 = stats(blocked_trades) print(f" → 차단된 거래 수: {sb2['n']}건") print(f" → 차단 거래 승률: {sb2['wr']:.1f}%") print(f" → 차단 거래 평균 PnL: {sb2['avg_pnl']:+.3f}%") print(f" → 차단으로 절약된 손실: {sb2['krw_total']:>+,.0f}원 " f"({CAPITAL_PER_TRADE:,}원 × {sb2['n']}거래 기준)") else: print(" → 차단된 거래 없음") # ── 최적 임계값 확인 ───────────────────────────────────── print() print(f" F&G 임계값별 성과 비교 (현행 기준 비교):") print(f" {'임계값':>8} {'거래':>5} {'승률':>6} {'평균PnL':>9} {'KRW손익':>14}") print(" " + "-" * 52) for thr in [25, 30, 35, 41, 45, 50]: filtered = [t for t in all_trades_A if t.fng >= thr] if not filtered: continue sf = stats(filtered) marker = " ◀ 채택" if thr == FNG_MIN else "" print( f" {thr:>5}이상 {sf['n']:>5}건 {sf['wr']:>5.1f}% " f"{sf['avg_pnl']:>+8.3f}% {sf['krw_total']:>+14,.0f}원{marker}" ) # ── DB 저장 ─────────────────────────────────────────────── if DB_ENABLED: try: ensure_tables() params = { "tickers": len(datasets), "days": 365, "candle": "1h", "trail_stop": TRAIL_STOP, "mom_thr": MOM_THR, "fng_min_new": FNG_MIN, "capital_per_trade": CAPITAL_PER_TRADE, } run_id = insert_run( "fng_sim_comparison", f"F&G 필터 전후 비교 시뮬레이션 (1년치 / F&G≥{FNG_MIN})", params, ) insert_result(run_id, "필터 없음 (현행)", sa, None, None) insert_result(run_id, f"F&G≥{FNG_MIN} 필터 (신규)", sb, FNG_MIN, None) for tk, t_list in per_ticker_A.items(): insert_trades_bulk(run_id, "필터없음", tk, t_list) for tk, t_list in per_ticker_B.items(): insert_trades_bulk(run_id, f"fng_ge{FNG_MIN}", tk, t_list) print(f"\n [DB 저장 완료] run_id: {run_id}") except Exception as e: print(f"\n [DB 저장 실패] {e}") if __name__ == "__main__": main()