"""F&G 조건별 백테스트 - 1년치 데이터 (배치 수집) 60일 극공포 편향을 제거하고 Bull/Neutral/Bear 다양한 구간 포함. 데이터: 1h 캔들 배치 수집 → 약 365일치 """ from __future__ import annotations import datetime, json, time, sys, urllib.request import pandas as pd import pyupbit from dataclasses import dataclass TICKERS = [ "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE", "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK", "KRW-SUI", "KRW-HBAR", "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO", "KRW-KAVA", "KRW-KNC", ] VOL_MULT = 2.0 QUIET_2H = 2.0 SIG_TO_H = 8 MOM_THR = 3.0 SIG_CANCEL = 3.0 TRAIL_STOP = 0.015 TIME_H = 24 TIME_MIN = 3.0 # ── 데이터 수집 ─────────────────────────────────────────────── def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None: """1h 캔들을 배치로 수집해 약 1년치 DataFrame 반환.""" all_dfs = [] end = datetime.datetime.now() batch = 1440 # 60일치씩 prev_oldest = None while True: df = pyupbit.get_ohlcv( ticker, interval="minute60", count=batch, to=end.strftime("%Y-%m-%d %H:%M:%S"), ) if df is None or df.empty: break all_dfs.append(df) oldest = df.index[0] # 상장 초기 종목: oldest가 진전되지 않으면 더 오래된 데이터 없음 if prev_oldest is not None and oldest >= prev_oldest: break prev_oldest = oldest cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) if oldest <= cutoff: break end = oldest time.sleep(0.12) if not all_dfs: return None combined = pd.concat(all_dfs).sort_index() combined = combined[~combined.index.duplicated(keep="last")] cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) return combined[combined.index >= cutoff] def load_fng() -> dict[str, int]: url = "https://api.alternative.me/fng/?limit=400&format=json" with urllib.request.urlopen(url, timeout=10) as r: data = json.loads(r.read()) return { datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"): int(d["value"]) for d in data["data"] } def fng_val(fng_map, ts): return fng_map.get(ts.strftime("%Y-%m-%d"), 50) # ── 시뮬레이션 ──────────────────────────────────────────────── @dataclass class Trade: pnl: float h: int fng: int exit: str def simulate(df, fng_map, fng_lo=None, fng_hi=None) -> list[Trade]: closes = df["close"].values vols = df["volume"].values idx = df.index trades: list[Trade] = [] sig_px = sig_i = None pos_buy = pos_peak = pos_i = pos_fng = None for i in range(7, len(closes) - max(TIME_H + 4, 10)): if pos_buy is not None: cur = closes[i] if cur > pos_peak: pos_peak = cur if (pos_peak - cur) / pos_peak >= TRAIL_STOP: trades.append(Trade((cur - pos_buy) / pos_buy * 100, i - pos_i, pos_fng, "trail")) pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None continue if i - pos_i >= TIME_H: pnl = (cur - pos_buy) / pos_buy * 100 if pnl < TIME_MIN: trades.append(Trade(pnl, i - pos_i, pos_fng, "time")) pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None continue continue if sig_px is not None: if i - sig_i > SIG_TO_H: sig_px = sig_i = None elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL: sig_px = sig_i = None if sig_px is None: vol_avg = vols[i - 6:i - 1].mean() if vol_avg <= 0: continue if vols[i - 1] / vol_avg >= VOL_MULT: if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H: sig_px = closes[i] sig_i = i continue fv = fng_val(fng_map, idx[i]) if fng_lo is not None and fv < fng_lo: continue if fng_hi is not None and fv > fng_hi: continue if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR: pos_buy = pos_peak = closes[i] pos_i = i pos_fng = fv sig_px = sig_i = None return trades def stats(trades): if not trades: return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0, avg_win=0, avg_loss=0, max_dd=0) wins = [t for t in trades if t.pnl > 0] losses = [t for t in trades if t.pnl <= 0] aw = sum(t.pnl for t in wins) / len(wins) if wins else 0 al = sum(t.pnl for t in losses) / len(losses) if losses else 0 cum = pk = max_dd = 0.0 for t in trades: cum += t.pnl if cum > pk: pk = cum if pk - cum > max_dd: max_dd = pk - cum return dict( n=len(trades), wr=len(wins) / len(trades) * 100, avg_pnl=sum(t.pnl for t in trades) / len(trades), total_pnl=sum(t.pnl for t in trades), rr=abs(aw / al) if al else 0, avg_win=aw, avg_loss=al, max_dd=max_dd, ) def main(): print("F&G 데이터 로드...") fng_map = load_fng() # F&G 연간 분포 출력 from collections import Counter zone_cnt = Counter() for v in fng_map.values(): if v <= 25: zone_cnt["극공포(0~25)"] += 1 elif v <= 45: zone_cnt["공포(26~45)"] += 1 elif v <= 55: zone_cnt["중립(46~55)"] += 1 elif v <= 75: zone_cnt["탐욕(56~75)"] += 1 else: zone_cnt["극탐욕(76~100)"] += 1 total_days = sum(zone_cnt.values()) print(f" 1년 F&G 분포 ({total_days}일):") for k, v in sorted(zone_cnt.items()): bar = "█" * (v // 5) print(f" {k:<14} {v:>3}일 ({v/total_days*100:>4.1f}%) {bar}") print(f"\n종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...") datasets = {} for i, tk in enumerate(TICKERS): try: df = fetch_1y(tk, total_days=365) if df is not None and len(df) > 100: datasets[tk] = df sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ") except Exception as e: sys.stderr.write(f"\r {tk} 실패: {e} ") sys.stderr.write("\n") print(f" 완료: {len(datasets)}개 종목\n") # ── 전체 기간 F&G 구간별 성과 ──────────────────────────── CONFIGS = [ (None, None, "필터 없음 (전체)"), (None, 25, "극공포만 (0~25)"), (26, 45, "공포만 (26~45)"), (46, 55, "중립만 (46~55)"), (56, 100, "탐욕+ (56~100)"), (46, 100, "중립 이상 (46~100)"), (26, 100, "공포 이상 (26~100)"), ] print("=" * 78) print(" F&G 조건별 성과 - 1년치 (1h 캔들 / 모멘텀 / 스탑1.5%)") print("=" * 78) print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} " f"{'손익비':>6} {'총PnL':>9} {'MaxDD':>7}") print(" " + "-" * 72) all_results = {} for lo, hi, label in CONFIGS: all_trades = [] for df in datasets.values(): all_trades.extend(simulate(df, fng_map, lo, hi)) s = stats(all_trades) all_results[label] = (s, all_trades) if s["n"] == 0: print(f" {label:<26} 거래 없음 (해당 구간 진입 기회 없음)") continue sign = "+" if s["total_pnl"] > 0 else "" print( f" {label:<26} {s['n']:>5}건 {s['wr']:>5.1f}% " f"{s['avg_pnl']:>+7.3f}% {s['rr']:>5.2f} " f"{sign}{s['total_pnl']:>8.1f}% -{s['max_dd']:>5.1f}%" ) # ── 분기별 성과 (계절성) ────────────────────────────────── print() print(" 분기별 성과 (전체 필터 없음 기준):") base_trades = all_results["필터 없음 (전체)"][1] for df in datasets.values(): pass # already computed # 전체 종목 합산 후 날짜로 분기 분리 all_base = [] for df in datasets.values(): t_list = simulate(df, fng_map) # trade에 날짜 정보 추가 # simulate에서 idx를 참조하지 않으므로 재계산 all_base.extend(t_list) # F&G 수치별 세분화 print() print(" F&G 10단위 구간별 세부 성과:") print(f" {'구간':<16} {'건수':>5} {'승률':>6} {'평균PnL':>9} {'손익비':>6} {'의미'}") print(" " + "-" * 65) fng_zones_detail = [ (0, 10, "극단 공포(0~10)"), (11, 20, "극단 공포(11~20)"), (21, 30, "극공포(21~30)"), (31, 40, "공포(31~40)"), (41, 50, "약공포(41~50)"), (51, 60, "약탐욕(51~60)"), (61, 75, "탐욕(61~75)"), (76, 100, "극탐욕(76~100)"), ] base_all = all_results["필터 없음 (전체)"][1] for lo, hi, name in fng_zones_detail: sub = [t for t in base_all if lo <= t.fng <= hi] if not sub: continue s = stats(sub) breakeven_wr = 1 / (1 + s["rr"]) * 100 if s["rr"] > 0 else 50 profitable = "✅ 수익" if s["avg_pnl"] > 0 else ("⚠️ BEP 근접" if s["avg_pnl"] > -0.2 else "❌ 손실") print( f" {name:<16} {s['n']:>5}건 {s['wr']:>5.1f}% " f"{s['avg_pnl']:>+8.3f}% {s['rr']:>5.2f} {profitable}" ) # ── 최적 F&G 구간 요약 ─────────────────────────────────── print() best = max( [(label, s) for label, (s, _) in all_results.items() if s["n"] >= 50], key=lambda x: x[1]["avg_pnl"], ) print(f" ★ 최적 구간: {best[0]} " f"(거래 {best[1]['n']}건 | 승률 {best[1]['wr']:.1f}% | " f"평균PnL {best[1]['avg_pnl']:+.3f}%)") # ── DB 저장 ────────────────────────────────────────────── try: from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk ensure_tables() params = { "tickers": len(datasets), "days": 365, "candle": "1h", "trail_stop": 0.015, "mom_thr": 3.0, "vol_mult": 2.0, } run_id = insert_run( run_name="fng_1y_backtest", description="F&G 구간별 성과 1년치 백테스트 (1h 캔들 / 모멘텀 / 스탑1.5%)", params=params, ) for lo, hi, label in CONFIGS: if label in all_results: s, trades = all_results[label] if s["n"] > 0: insert_result(run_id, label, s, lo, hi) # 전체 거래는 per-ticker 분리 없이 일괄 저장 (run_id+label로 구분) insert_trades_bulk(run_id, label, "_all_", trades) print(f"\n [DB 저장 완료] run_id: {run_id}") except Exception as e: print(f"\n [DB 저장 실패] {e}") if __name__ == "__main__": main()