Files
upbit-trader/tests/fng_sim_comparison.py
joungmin 6b2c962ed8 refactor: reorganize project structure into tests/, data/, logs/
- Move all backtest/simulation scripts to tests/
  - Add sys.path.insert to each script for correct import resolution
- Move pkl cache files to data/ (git-ignored)
- Move log files to logs/ (git-ignored)
- Update main.py: trading.log path → logs/trading.log
- Add ecosystem.config.js: pm2 log paths → logs/pm2*.log
- Update .gitignore: ignore data/ and logs/ instead of *.pkl/*.log
- core/fng.py: increase cache TTL 3600→86400s (API updates daily at KST 09:00)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 16:08:50 +09:00

386 lines
14 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""F&G 필터 전후 수익 비교 시뮬레이션
필터 없음 vs F&G ≥ 41 필터 적용 시 1년치 성과를 직접 비교.
표시:
- 거래 수, 승률, 평균 PnL, 총 누적 PnL
- 거래당 고정 자본 100만 원 기준 KRW 환산 손익
- 월별 손익 흐름 (계절성 확인)
- 극공포 차단 일수 통계
결과는 Oracle DB(backtest_results)에 저장.
데이터: 1년치 1h 캔들 (배치 수집)
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
from __future__ import annotations
import datetime
import json
import sys
import time
import urllib.request
from dataclasses import dataclass
import pandas as pd
import pyupbit
# ── DB 저장 ─────────────────────────────────────────────────
try:
from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
DB_ENABLED = True
except Exception as e:
print(f" [DB 비활성화] {e}")
DB_ENABLED = False
TICKERS = [
"KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
"KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
"KRW-SUI", "KRW-HBAR",
"KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
"KRW-KAVA", "KRW-KNC",
]
CAPITAL_PER_TRADE = 1_000_000 # 거래당 고정 자본 (KRW)
# 전략 파라미터 (현행)
VOL_MULT = 2.0
QUIET_2H = 2.0
SIG_TO_H = 8
MOM_THR = 3.0
SIG_CANCEL = 3.0
TRAIL_STOP = 0.015
TIME_H = 24
TIME_MIN = 3.0
FNG_MIN = 41 # 이 값 미만이면 진입 차단
# ── 데이터 수집 ──────────────────────────────────────────────
def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
all_dfs = []
end = datetime.datetime.now()
batch = 1440
prev_oldest = None
while True:
df = pyupbit.get_ohlcv(
ticker, interval="minute60", count=batch,
to=end.strftime("%Y-%m-%d %H:%M:%S"),
)
if df is None or df.empty:
break
all_dfs.append(df)
oldest = df.index[0]
if prev_oldest is not None and oldest >= prev_oldest:
break
prev_oldest = oldest
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
if oldest <= cutoff:
break
end = oldest
time.sleep(0.12)
if not all_dfs:
return None
combined = pd.concat(all_dfs).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
return combined[combined.index >= cutoff]
def load_fng() -> dict[str, int]:
url = "https://api.alternative.me/fng/?limit=400&format=json"
with urllib.request.urlopen(url, timeout=10) as r:
data = json.loads(r.read())
return {
datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
int(d["value"])
for d in data["data"]
}
def fng_val(fng_map, ts) -> int:
return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
# ── 시뮬레이션 ──────────────────────────────────────────────
@dataclass
class Trade:
pnl: float
h: int
fng: int
exit: str
date: str # YYYY-MM
def simulate(df, fng_map, fng_min: int | None = None) -> list[Trade]:
closes = df["close"].values
vols = df["volume"].values
idx = df.index
trades: list[Trade] = []
sig_px = sig_i = None
pos_buy = pos_peak = pos_i = pos_fng = None
for i in range(7, len(closes) - max(TIME_H + 4, 10)):
if pos_buy is not None:
cur = closes[i]
if cur > pos_peak:
pos_peak = cur
if (pos_peak - cur) / pos_peak >= TRAIL_STOP:
trades.append(Trade(
(cur - pos_buy) / pos_buy * 100,
i - pos_i, pos_fng, "trail",
idx[i].strftime("%Y-%m"),
))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
if i - pos_i >= TIME_H:
pnl = (cur - pos_buy) / pos_buy * 100
if pnl < TIME_MIN:
trades.append(Trade(
pnl, i - pos_i, pos_fng, "time",
idx[i].strftime("%Y-%m"),
))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
continue
if sig_px is not None:
if i - sig_i > SIG_TO_H:
sig_px = sig_i = None
elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL:
sig_px = sig_i = None
if sig_px is None:
vol_avg = vols[i - 6:i - 1].mean()
if vol_avg <= 0:
continue
if vols[i - 1] / vol_avg >= VOL_MULT:
if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H:
sig_px = closes[i]
sig_i = i
continue
fv = fng_val(fng_map, idx[i])
if fng_min is not None and fv < fng_min:
continue
if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR:
pos_buy = pos_peak = closes[i]
pos_i = i
pos_fng = fv
sig_px = sig_i = None
return trades
def stats(trades: list[Trade]) -> dict:
if not trades:
return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
avg_win=0, avg_loss=0, max_dd=0, krw_total=0)
wins = [t for t in trades if t.pnl > 0]
losses = [t for t in trades if t.pnl <= 0]
aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
al = sum(t.pnl for t in losses) / len(losses) if losses else 0
cum = pk = max_dd = 0.0
for t in trades:
cum += t.pnl
if cum > pk: pk = cum
if pk - cum > max_dd: max_dd = pk - cum
total_pnl = sum(t.pnl for t in trades)
return dict(
n=len(trades), wr=len(wins) / len(trades) * 100,
avg_pnl=total_pnl / len(trades),
total_pnl=total_pnl,
rr=abs(aw / al) if al else 0,
avg_win=aw, avg_loss=al, max_dd=max_dd,
krw_total=total_pnl / 100 * CAPITAL_PER_TRADE,
)
def monthly_pnl(trades: list[Trade]) -> dict[str, float]:
"""월별 누적 PnL(%) 반환."""
monthly: dict[str, float] = {}
for t in trades:
monthly[t.date] = monthly.get(t.date, 0) + t.pnl
return dict(sorted(monthly.items()))
def main():
print("F&G 데이터 로드...")
fng_map = load_fng()
# F&G 분포
block_days = sum(1 for v in fng_map.values() if v < FNG_MIN)
total_days = len(fng_map)
print(f" 1년 F&G 분포: 진입차단(< {FNG_MIN}) = {block_days}일 / {total_days}"
f"({block_days/total_days*100:.1f}%)")
print(f" 진입허용(≥ {FNG_MIN}) = {total_days - block_days}일 ({(total_days-block_days)/total_days*100:.1f}%)\n")
print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
datasets: dict[str, pd.DataFrame] = {}
for i, tk in enumerate(TICKERS):
try:
df = fetch_1y(tk, total_days=365)
if df is not None and len(df) > 100:
datasets[tk] = df
sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
except Exception as e:
sys.stderr.write(f"\r {tk} 실패: {e} ")
sys.stderr.write("\n")
print(f" 완료: {len(datasets)}개 종목\n")
# ── 두 가지 조건 시뮬레이션 ──────────────────────────────
# A: 필터 없음 (현행)
# B: F&G ≥ 41 (신규)
all_trades_A: list[Trade] = []
all_trades_B: list[Trade] = []
per_ticker_A: dict[str, list[Trade]] = {}
per_ticker_B: dict[str, list[Trade]] = {}
for tk, df in datasets.items():
ta = simulate(df, fng_map, fng_min=None)
tb = simulate(df, fng_map, fng_min=FNG_MIN)
all_trades_A.extend(ta)
all_trades_B.extend(tb)
per_ticker_A[tk] = ta
per_ticker_B[tk] = tb
sa = stats(all_trades_A)
sb = stats(all_trades_B)
# ── 결과 출력 ─────────────────────────────────────────────
print("=" * 80)
print(f" F&G 필터 전후 비교 (1년치 / {len(datasets)}개 종목 / 1h캔들 / 자본 {CAPITAL_PER_TRADE:,}원/거래)")
print("=" * 80)
print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
f"{'손익비':>5} {'총PnL':>8} {'MaxDD':>7} {'KRW손익':>14}")
print(" " + "-" * 76)
for label, s in [("필터 없음 (현행)", sa), (f"F&G≥{FNG_MIN} 필터 (신규)", sb)]:
krw_str = f"{s['krw_total']:>+,.0f}"
print(
f" {label:<26} {s['n']:>5}{s['wr']:>5.1f}% "
f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} "
f"{s['total_pnl']:>+7.1f}% -{s['max_dd']:>5.1f}% {krw_str:>14}"
)
diff_trades = sb["n"] - sa["n"]
diff_krw = sb["krw_total"] - sa["krw_total"]
diff_wr = sb["wr"] - sa["wr"]
print(f"\n 변화: 거래수 {diff_trades:+d}건 | 승률 {diff_wr:+.1f}%p | "
f"KRW손익 {diff_krw:>+,.0f}")
# ── 월별 손익 흐름 ────────────────────────────────────────
print()
print(" 월별 손익 비교 (필터없음 vs F&G≥41):")
print(f" {'':>8} {'차단일수':>6} {'필터없음':>9} {'F&G필터':>9} {'개선':>8} {'누적(필터)':>12}")
print(" " + "-" * 62)
ma = monthly_pnl(all_trades_A)
mb = monthly_pnl(all_trades_B)
all_months = sorted(set(ma.keys()) | set(mb.keys()))
cum_b = 0.0
for m in all_months:
pa = ma.get(m, 0.0)
pb = mb.get(m, 0.0)
cum_b += pb
diff = pb - pa
# 해당 월 차단 일수
yr, mo = int(m[:4]), int(m[5:])
blocked = sum(
1 for d, v in fng_map.items()
if d.startswith(m) and v < FNG_MIN
)
bar = "" * min(int(abs(pb) / 3), 12) if pb > 0 else "" * min(int(abs(pb) / 3), 12)
sign = "+" if pb > 0 else ""
diff_sign = "" if diff > 0 else ("" if diff < 0 else "=")
print(
f" {m} {blocked:>4}일차단 "
f"{pa:>+8.1f}% {sign}{pb:>8.1f}% "
f"{diff_sign}{abs(diff):>6.1f}% {cum_b:>+10.1f}%"
)
# ── 종목별 비교 (상위/하위) ───────────────────────────────
print()
print(" 종목별 성과 비교 (필터없음 vs F&G≥41):")
print(f" {'종목':<14} {'현행거래':>6} {'현행PnL':>8} {'필터거래':>7} {'필터PnL':>8} {'개선':>8}")
print(" " + "-" * 58)
ticker_rows = []
for tk in sorted(datasets.keys()):
ta_list = per_ticker_A.get(tk, [])
tb_list = per_ticker_B.get(tk, [])
pa = sum(t.pnl for t in ta_list) if ta_list else 0
pb = sum(t.pnl for t in tb_list) if tb_list else 0
ticker_rows.append((tk, len(ta_list), pa, len(tb_list), pb, pb - pa))
for row in sorted(ticker_rows, key=lambda x: x[5], reverse=True):
tk, na, pa, nb, pb, delta = row
mark = "" if delta > 1 else ("" if delta < -1 else " =")
print(
f" {tk:<14} {na:>6}{pa:>+7.1f}% {nb:>6}{pb:>+7.1f}% "
f"{mark}{abs(delta):>6.1f}%"
)
# ── 극공포 차단 효과 분석 ─────────────────────────────────
print()
print(f" F&G < {FNG_MIN} 구간(차단) 거래 성과 분석:")
blocked_trades = [t for t in all_trades_A if t.fng < FNG_MIN]
if blocked_trades:
sb2 = stats(blocked_trades)
print(f" → 차단된 거래 수: {sb2['n']}")
print(f" → 차단 거래 승률: {sb2['wr']:.1f}%")
print(f" → 차단 거래 평균 PnL: {sb2['avg_pnl']:+.3f}%")
print(f" → 차단으로 절약된 손실: {sb2['krw_total']:>+,.0f}"
f"({CAPITAL_PER_TRADE:,}× {sb2['n']}거래 기준)")
else:
print(" → 차단된 거래 없음")
# ── 최적 임계값 확인 ─────────────────────────────────────
print()
print(f" F&G 임계값별 성과 비교 (현행 기준 비교):")
print(f" {'임계값':>8} {'거래':>5} {'승률':>6} {'평균PnL':>9} {'KRW손익':>14}")
print(" " + "-" * 52)
for thr in [25, 30, 35, 41, 45, 50]:
filtered = [t for t in all_trades_A if t.fng >= thr]
if not filtered:
continue
sf = stats(filtered)
marker = " ◀ 채택" if thr == FNG_MIN else ""
print(
f" {thr:>5}이상 {sf['n']:>5}{sf['wr']:>5.1f}% "
f"{sf['avg_pnl']:>+8.3f}% {sf['krw_total']:>+14,.0f}{marker}"
)
# ── DB 저장 ───────────────────────────────────────────────
if DB_ENABLED:
try:
ensure_tables()
params = {
"tickers": len(datasets), "days": 365, "candle": "1h",
"trail_stop": TRAIL_STOP, "mom_thr": MOM_THR,
"fng_min_new": FNG_MIN, "capital_per_trade": CAPITAL_PER_TRADE,
}
run_id = insert_run(
"fng_sim_comparison",
f"F&G 필터 전후 비교 시뮬레이션 (1년치 / F&G≥{FNG_MIN})",
params,
)
insert_result(run_id, "필터 없음 (현행)", sa, None, None)
insert_result(run_id, f"F&G≥{FNG_MIN} 필터 (신규)", sb, FNG_MIN, None)
for tk, t_list in per_ticker_A.items():
insert_trades_bulk(run_id, "필터없음", tk, t_list)
for tk, t_list in per_ticker_B.items():
insert_trades_bulk(run_id, f"fng_ge{FNG_MIN}", tk, t_list)
print(f"\n [DB 저장 완료] run_id: {run_id}")
except Exception as e:
print(f"\n [DB 저장 실패] {e}")
if __name__ == "__main__":
main()