Files
upbit-trader/archive/tests/fng_1y_backtest.py
joungmin 6e0c4508fa refactor: MVC 구조 분리 + 미사용 파일 archive 정리
- tick_trader.py를 Controller로 축소, 로직을 3개 모듈로 분리:
  - core/signal.py: 시그널 감지, 지표 계산 (calc_vr, calc_atr, detect_signal)
  - core/order.py: Upbit 주문 실행 (매수/매도/취소/조회)
  - core/position_manager.py: 포지션 관리, DB sync, 복구, 청산 조건
- type hints, Google docstring, 구체적 예외 타입 적용
- 50줄 초과 함수 분리 (process_signal, restore_positions)
- 미사용 파일 58개 archive/ 폴더로 이동
- README.md 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 20:46:47 +09:00

319 lines
12 KiB
Python

"""F&G 조건별 백테스트 - 1년치 데이터 (배치 수집)
60일 극공포 편향을 제거하고 Bull/Neutral/Bear 다양한 구간 포함.
데이터: 1h 캔들 배치 수집 → 약 365일치
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
from __future__ import annotations
import datetime, json, time, sys, urllib.request
import pandas as pd
import pyupbit
from dataclasses import dataclass
TICKERS = [
"KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
"KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
"KRW-SUI", "KRW-HBAR",
"KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
"KRW-KAVA", "KRW-KNC",
]
VOL_MULT = 2.0
QUIET_2H = 2.0
SIG_TO_H = 8
MOM_THR = 3.0
SIG_CANCEL = 3.0
TRAIL_STOP = 0.015
TIME_H = 24
TIME_MIN = 3.0
# ── 데이터 수집 ───────────────────────────────────────────────
def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
"""1h 캔들을 배치로 수집해 약 1년치 DataFrame 반환."""
all_dfs = []
end = datetime.datetime.now()
batch = 1440 # 60일치씩
prev_oldest = None
while True:
df = pyupbit.get_ohlcv(
ticker, interval="minute60", count=batch,
to=end.strftime("%Y-%m-%d %H:%M:%S"),
)
if df is None or df.empty:
break
all_dfs.append(df)
oldest = df.index[0]
# 상장 초기 종목: oldest가 진전되지 않으면 더 오래된 데이터 없음
if prev_oldest is not None and oldest >= prev_oldest:
break
prev_oldest = oldest
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
if oldest <= cutoff:
break
end = oldest
time.sleep(0.12)
if not all_dfs:
return None
combined = pd.concat(all_dfs).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
return combined[combined.index >= cutoff]
def load_fng() -> dict[str, int]:
url = "https://api.alternative.me/fng/?limit=400&format=json"
with urllib.request.urlopen(url, timeout=10) as r:
data = json.loads(r.read())
return {
datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
int(d["value"])
for d in data["data"]
}
def fng_val(fng_map, ts):
return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
# ── 시뮬레이션 ────────────────────────────────────────────────
@dataclass
class Trade:
pnl: float
h: int
fng: int
exit: str
def simulate(df, fng_map, fng_lo=None, fng_hi=None) -> list[Trade]:
closes = df["close"].values
vols = df["volume"].values
idx = df.index
trades: list[Trade] = []
sig_px = sig_i = None
pos_buy = pos_peak = pos_i = pos_fng = None
for i in range(7, len(closes) - max(TIME_H + 4, 10)):
if pos_buy is not None:
cur = closes[i]
if cur > pos_peak:
pos_peak = cur
if (pos_peak - cur) / pos_peak >= TRAIL_STOP:
trades.append(Trade((cur - pos_buy) / pos_buy * 100,
i - pos_i, pos_fng, "trail"))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
if i - pos_i >= TIME_H:
pnl = (cur - pos_buy) / pos_buy * 100
if pnl < TIME_MIN:
trades.append(Trade(pnl, i - pos_i, pos_fng, "time"))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
continue
if sig_px is not None:
if i - sig_i > SIG_TO_H:
sig_px = sig_i = None
elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL:
sig_px = sig_i = None
if sig_px is None:
vol_avg = vols[i - 6:i - 1].mean()
if vol_avg <= 0:
continue
if vols[i - 1] / vol_avg >= VOL_MULT:
if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H:
sig_px = closes[i]
sig_i = i
continue
fv = fng_val(fng_map, idx[i])
if fng_lo is not None and fv < fng_lo:
continue
if fng_hi is not None and fv > fng_hi:
continue
if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR:
pos_buy = pos_peak = closes[i]
pos_i = i
pos_fng = fv
sig_px = sig_i = None
return trades
def stats(trades):
if not trades:
return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
avg_win=0, avg_loss=0, max_dd=0)
wins = [t for t in trades if t.pnl > 0]
losses = [t for t in trades if t.pnl <= 0]
aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
al = sum(t.pnl for t in losses) / len(losses) if losses else 0
cum = pk = max_dd = 0.0
for t in trades:
cum += t.pnl
if cum > pk: pk = cum
if pk - cum > max_dd: max_dd = pk - cum
return dict(
n=len(trades), wr=len(wins) / len(trades) * 100,
avg_pnl=sum(t.pnl for t in trades) / len(trades),
total_pnl=sum(t.pnl for t in trades),
rr=abs(aw / al) if al else 0,
avg_win=aw, avg_loss=al, max_dd=max_dd,
)
def main():
print("F&G 데이터 로드...")
fng_map = load_fng()
# F&G 연간 분포 출력
from collections import Counter
zone_cnt = Counter()
for v in fng_map.values():
if v <= 25: zone_cnt["극공포(0~25)"] += 1
elif v <= 45: zone_cnt["공포(26~45)"] += 1
elif v <= 55: zone_cnt["중립(46~55)"] += 1
elif v <= 75: zone_cnt["탐욕(56~75)"] += 1
else: zone_cnt["극탐욕(76~100)"] += 1
total_days = sum(zone_cnt.values())
print(f" 1년 F&G 분포 ({total_days}일):")
for k, v in sorted(zone_cnt.items()):
bar = "" * (v // 5)
print(f" {k:<14} {v:>3}일 ({v/total_days*100:>4.1f}%) {bar}")
print(f"\n종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
datasets = {}
for i, tk in enumerate(TICKERS):
try:
df = fetch_1y(tk, total_days=365)
if df is not None and len(df) > 100:
datasets[tk] = df
sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
except Exception as e:
sys.stderr.write(f"\r {tk} 실패: {e} ")
sys.stderr.write("\n")
print(f" 완료: {len(datasets)}개 종목\n")
# ── 전체 기간 F&G 구간별 성과 ────────────────────────────
CONFIGS = [
(None, None, "필터 없음 (전체)"),
(None, 25, "극공포만 (0~25)"),
(26, 45, "공포만 (26~45)"),
(46, 55, "중립만 (46~55)"),
(56, 100, "탐욕+ (56~100)"),
(46, 100, "중립 이상 (46~100)"),
(26, 100, "공포 이상 (26~100)"),
]
print("=" * 78)
print(" F&G 조건별 성과 - 1년치 (1h 캔들 / 모멘텀 / 스탑1.5%)")
print("=" * 78)
print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
f"{'손익비':>6} {'총PnL':>9} {'MaxDD':>7}")
print(" " + "-" * 72)
all_results = {}
for lo, hi, label in CONFIGS:
all_trades = []
for df in datasets.values():
all_trades.extend(simulate(df, fng_map, lo, hi))
s = stats(all_trades)
all_results[label] = (s, all_trades)
if s["n"] == 0:
print(f" {label:<26} 거래 없음 (해당 구간 진입 기회 없음)")
continue
sign = "+" if s["total_pnl"] > 0 else ""
print(
f" {label:<26} {s['n']:>5}{s['wr']:>5.1f}% "
f"{s['avg_pnl']:>+7.3f}% {s['rr']:>5.2f} "
f"{sign}{s['total_pnl']:>8.1f}% -{s['max_dd']:>5.1f}%"
)
# ── 분기별 성과 (계절성) ──────────────────────────────────
print()
print(" 분기별 성과 (전체 필터 없음 기준):")
base_trades = all_results["필터 없음 (전체)"][1]
for df in datasets.values():
pass # already computed
# 전체 종목 합산 후 날짜로 분기 분리
all_base = []
for df in datasets.values():
t_list = simulate(df, fng_map)
# trade에 날짜 정보 추가
# simulate에서 idx를 참조하지 않으므로 재계산
all_base.extend(t_list)
# F&G 수치별 세분화
print()
print(" F&G 10단위 구간별 세부 성과:")
print(f" {'구간':<16} {'건수':>5} {'승률':>6} {'평균PnL':>9} {'손익비':>6} {'의미'}")
print(" " + "-" * 65)
fng_zones_detail = [
(0, 10, "극단 공포(0~10)"),
(11, 20, "극단 공포(11~20)"),
(21, 30, "극공포(21~30)"),
(31, 40, "공포(31~40)"),
(41, 50, "약공포(41~50)"),
(51, 60, "약탐욕(51~60)"),
(61, 75, "탐욕(61~75)"),
(76, 100, "극탐욕(76~100)"),
]
base_all = all_results["필터 없음 (전체)"][1]
for lo, hi, name in fng_zones_detail:
sub = [t for t in base_all if lo <= t.fng <= hi]
if not sub:
continue
s = stats(sub)
breakeven_wr = 1 / (1 + s["rr"]) * 100 if s["rr"] > 0 else 50
profitable = "✅ 수익" if s["avg_pnl"] > 0 else ("⚠️ BEP 근접" if s["avg_pnl"] > -0.2 else "❌ 손실")
print(
f" {name:<16} {s['n']:>5}{s['wr']:>5.1f}% "
f"{s['avg_pnl']:>+8.3f}% {s['rr']:>5.2f} {profitable}"
)
# ── 최적 F&G 구간 요약 ───────────────────────────────────
print()
best = max(
[(label, s) for label, (s, _) in all_results.items() if s["n"] >= 50],
key=lambda x: x[1]["avg_pnl"],
)
print(f" ★ 최적 구간: {best[0]} "
f"(거래 {best[1]['n']}건 | 승률 {best[1]['wr']:.1f}% | "
f"평균PnL {best[1]['avg_pnl']:+.3f}%)")
# ── DB 저장 ──────────────────────────────────────────────
try:
from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
ensure_tables()
params = {
"tickers": len(datasets), "days": 365, "candle": "1h",
"trail_stop": 0.015, "mom_thr": 3.0, "vol_mult": 2.0,
}
run_id = insert_run(
run_name="fng_1y_backtest",
description="F&G 구간별 성과 1년치 백테스트 (1h 캔들 / 모멘텀 / 스탑1.5%)",
params=params,
)
for lo, hi, label in CONFIGS:
if label in all_results:
s, trades = all_results[label]
if s["n"] > 0:
insert_result(run_id, label, s, lo, hi)
# 전체 거래는 per-ticker 분리 없이 일괄 저장 (run_id+label로 구분)
insert_trades_bulk(run_id, label, "_all_", trades)
print(f"\n [DB 저장 완료] run_id: {run_id}")
except Exception as e:
print(f"\n [DB 저장 실패] {e}")
if __name__ == "__main__":
main()