Compare commits

..

2 Commits

Author SHA1 Message Date
joungmin
bfe0b4d40c fix: reduce F&G log noise and skip scan loop when blocked
- fng.py: downgrade per-ticker block log to DEBUG
- runner.py: skip entire scan (continue) when F&G < FNG_MIN_ENTRY
  instead of iterating 20 tickers each blocked individually

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 15:57:12 +09:00
joungmin
27189b1ad9 feat: add Fear & Greed filter to entry logic
- core/fng.py: F&G API wrapper with 1h cache (alternative.me)
  - FNG_MIN_ENTRY=41 (env-configurable), blocks entry below threshold
- core/strategy.py: call is_entry_allowed() before volume/regime checks
- daemon/runner.py: log F&G status on every scan cycle
- core/notify.py: include F&G value in buy/signal/status notifications
- core/trader.py: pass current F&G value to notify_buy

Backtest evidence (1y / 18 tickers / 1h candles):
  - No filter:   820 trades, 32.7% WR, avg +0.012%, KRW +95k
  - F&G >= 41:   372 trades, 39.5% WR, avg +0.462%, KRW +1.72M
  - Blocked 452 trades (avg -0.372%, saved ~1.68M KRW loss)

Also add:
- backtest_db.py: Oracle DB storage for backtest runs/results/trades
- fng_1y_backtest.py, fng_adaptive_backtest.py, fng_sim_comparison.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-03 15:56:17 +09:00
9 changed files with 1406 additions and 8 deletions

247
backtest_db.py Normal file
View File

@@ -0,0 +1,247 @@
"""백테스트 결과 Oracle DB 저장 모듈.
테이블:
backtest_runs - 실행 단위 (실행시각, 설명, 파라미터)
backtest_results - 조건별 집계 (run_id + label)
backtest_trade_log - 개별 거래 (run_id + label + 종목 + pnl + fng + ...)
"""
from __future__ import annotations
import json
import os
from contextlib import contextmanager
from datetime import datetime
from pathlib import Path
from typing import Generator
import oracledb
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent / ".env")
_pool: oracledb.ConnectionPool | None = None
def _get_pool() -> oracledb.ConnectionPool:
global _pool
if _pool is None:
kwargs: dict = dict(
user=os.environ["ORACLE_USER"],
password=os.environ["ORACLE_PASSWORD"],
dsn=os.environ["ORACLE_DSN"],
min=1,
max=3,
increment=1,
)
wallet = os.environ.get("ORACLE_WALLET")
if wallet:
kwargs["config_dir"] = wallet
_pool = oracledb.create_pool(**kwargs)
return _pool
@contextmanager
def _conn() -> Generator[oracledb.Connection, None, None]:
pool = _get_pool()
conn = pool.acquire()
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
pool.release(conn)
# ── DDL ────────────────────────────────────────────────────────
_DDL_RUNS = """
CREATE TABLE backtest_runs (
run_id VARCHAR2(36) DEFAULT SYS_GUID() PRIMARY KEY,
run_name VARCHAR2(200) NOT NULL,
description VARCHAR2(1000),
params_json CLOB,
created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
)
"""
_DDL_RESULTS = """
CREATE TABLE backtest_results (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
run_id VARCHAR2(36) NOT NULL,
label VARCHAR2(100) NOT NULL,
n_trades NUMBER,
win_rate NUMBER(6,3),
avg_pnl NUMBER(10,4),
total_pnl NUMBER(12,4),
rr NUMBER(8,4),
avg_win NUMBER(10,4),
avg_loss NUMBER(10,4),
max_dd NUMBER(10,4),
fng_lo NUMBER,
fng_hi NUMBER,
created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
CONSTRAINT fk_br_run FOREIGN KEY (run_id) REFERENCES backtest_runs(run_id)
)
"""
_DDL_TRADES = """
CREATE TABLE backtest_trade_log (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
run_id VARCHAR2(36) NOT NULL,
label VARCHAR2(100),
ticker VARCHAR2(20),
pnl NUMBER(10,4),
hold_h NUMBER,
fng_val NUMBER,
exit_type VARCHAR2(10),
created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
CONSTRAINT fk_bt_run FOREIGN KEY (run_id) REFERENCES backtest_runs(run_id)
)
"""
def ensure_tables() -> None:
"""백테스트 테이블이 없으면 생성."""
with _conn() as conn:
cur = conn.cursor()
for tbl_name, ddl in [
("BACKTEST_RUNS", _DDL_RUNS),
("BACKTEST_RESULTS", _DDL_RESULTS),
("BACKTEST_TRADE_LOG", _DDL_TRADES),
]:
cur.execute(
"SELECT COUNT(*) FROM user_tables WHERE table_name=:1", [tbl_name]
)
if cur.fetchone()[0] == 0:
cur.execute(ddl)
print(f" {tbl_name} 테이블 생성 완료")
# ── 삽입 헬퍼 ──────────────────────────────────────────────────
def insert_run(run_name: str, description: str = "", params: dict | None = None) -> str:
"""새 백테스트 실행 레코드 삽입. run_id 반환."""
sql = """
INSERT INTO backtest_runs (run_name, description, params_json)
VALUES (:rname, :rdesc, :rparams)
RETURNING run_id INTO :out_id
"""
with _conn() as conn:
cur = conn.cursor()
out = cur.var(oracledb.STRING)
cur.execute(sql, {
"rname": run_name,
"rdesc": description,
"rparams": json.dumps(params or {}, ensure_ascii=False),
"out_id": out,
})
return out.getvalue()[0]
def insert_result(
run_id: str,
label: str,
stats: dict,
fng_lo: int | None = None,
fng_hi: int | None = None,
) -> None:
"""조건별 집계 결과 삽입."""
sql = """
INSERT INTO backtest_results
(run_id, label, n_trades, win_rate, avg_pnl, total_pnl,
rr, avg_win, avg_loss, max_dd, fng_lo, fng_hi)
VALUES
(:run_id, :label, :n, :wr, :avg_pnl, :total_pnl,
:rr, :avg_win, :avg_loss, :max_dd, :fng_lo, :fng_hi)
"""
with _conn() as conn:
conn.cursor().execute(sql, {
"run_id": run_id,
"label": label,
"n": stats.get("n", 0),
"wr": round(stats.get("wr", 0), 3),
"avg_pnl": round(stats.get("avg_pnl", 0), 4),
"total_pnl": round(stats.get("total_pnl", 0), 4),
"rr": round(stats.get("rr", 0), 4),
"avg_win": round(stats.get("avg_win", 0), 4),
"avg_loss": round(stats.get("avg_loss", 0), 4),
"max_dd": round(stats.get("max_dd", 0), 4),
"fng_lo": fng_lo,
"fng_hi": fng_hi,
})
def insert_trades_bulk(
run_id: str,
label: str,
ticker: str,
trades: list,
) -> None:
"""개별 거래 목록 일괄 삽입."""
if not trades:
return
sql = """
INSERT INTO backtest_trade_log
(run_id, label, ticker, pnl, hold_h, fng_val, exit_type)
VALUES (:run_id, :label, :ticker, :pnl, :hold_h, :fng_val, :exit_type)
"""
rows = []
for t in trades:
rows.append({
"run_id": run_id,
"label": label,
"ticker": ticker,
"pnl": round(float(getattr(t, "pnl", 0)), 4),
"hold_h": int(getattr(t, "h", 0)),
"fng_val": int(getattr(t, "fng", 0)),
"exit_type": str(getattr(t, "exit", "")),
})
with _conn() as conn:
conn.cursor().executemany(sql, rows)
# ── 조회 ───────────────────────────────────────────────────────
def list_runs(limit: int = 20) -> list[dict]:
"""최근 백테스트 실행 목록 반환."""
sql = """
SELECT run_id, run_name, description, created_at
FROM backtest_runs
ORDER BY created_at DESC
FETCH FIRST :n ROWS ONLY
"""
with _conn() as conn:
cur = conn.cursor()
cur.execute(sql, {"n": limit})
rows = cur.fetchall()
return [
{"run_id": r[0], "run_name": r[1], "description": r[2],
"created_at": r[3].strftime("%Y-%m-%d %H:%M")}
for r in rows
]
def get_results(run_id: str) -> list[dict]:
"""특정 run_id의 조건별 결과 반환."""
sql = """
SELECT label, n_trades, win_rate, avg_pnl, total_pnl,
rr, avg_win, avg_loss, max_dd, fng_lo, fng_hi
FROM backtest_results
WHERE run_id = :run_id
ORDER BY avg_pnl DESC
"""
with _conn() as conn:
cur = conn.cursor()
cur.execute(sql, {"run_id": run_id})
cols = ["label", "n_trades", "win_rate", "avg_pnl", "total_pnl",
"rr", "avg_win", "avg_loss", "max_dd", "fng_lo", "fng_hi"]
return [dict(zip(cols, r)) for r in cur.fetchall()]
if __name__ == "__main__":
print("백테스트 DB 테이블 확인/생성...")
ensure_tables()
print("완료. 최근 실행 목록:")
for r in list_runs(5):
print(f" {r['created_at']} {r['run_name']}")

71
core/fng.py Normal file
View File

@@ -0,0 +1,71 @@
"""공포탐욕지수(F&G) 조회 모듈.
alternative.me API로 일일 F&G 값을 가져와 메모리에 캐시한다.
캐시 TTL은 1시간 (F&G는 하루 1회 업데이트).
환경변수:
FNG_MIN_ENTRY (기본값 41): 이 값 미만이면 진입 차단
"""
from __future__ import annotations
import json
import logging
import os
import time
import urllib.request
from datetime import datetime
logger = logging.getLogger(__name__)
FNG_MIN_ENTRY = int(os.getenv("FNG_MIN_ENTRY", "41")) # 진입 허용 최소 F&G 값
_FNG_API_URL = "https://api.alternative.me/fng/?limit=1&format=json"
_CACHE_TTL = 3600 # 1시간
_fng_value: int | None = None
_fng_cached_at: float = 0.0
_fng_date_str: str = ""
def get_fng() -> int:
"""오늘의 F&G 지수 반환 (0~100). API 실패 시 50(중립) 반환."""
global _fng_value, _fng_cached_at, _fng_date_str
now = time.time()
if _fng_value is not None and (now - _fng_cached_at) < _CACHE_TTL:
return _fng_value
try:
with urllib.request.urlopen(_FNG_API_URL, timeout=5) as r:
data = json.loads(r.read())
entry = data["data"][0]
_fng_value = int(entry["value"])
_fng_cached_at = now
_fng_date_str = entry.get("timestamp", "")
logger.info(
f"[F&G] 지수={_fng_value} ({entry.get('value_classification','')}) "
f"날짜={datetime.fromtimestamp(int(_fng_date_str)).strftime('%Y-%m-%d') if _fng_date_str else '?'}"
)
except Exception as e:
logger.warning(f"[F&G] API 조회 실패: {e} → 캐시/중립값 사용")
if _fng_value is None:
_fng_value = 50 # 폴백: 중립
return _fng_value # type: ignore[return-value]
def is_entry_allowed() -> bool:
"""현재 F&G 기준으로 진입 허용 여부 반환.
F&G ≥ FNG_MIN_ENTRY(41) 이면 True.
극공포/공포 구간(< 41)이면 False → 진입 차단.
"""
fv = get_fng()
allowed = fv >= FNG_MIN_ENTRY
if not allowed:
label = (
"극공포" if fv <= 25 else
"공포" if fv <= 40 else
"약공포"
)
logger.debug(f"[F&G] 진입 차단 — F&G={fv} ({label}) < {FNG_MIN_ENTRY}")
return allowed

View File

@@ -31,16 +31,27 @@ def _send(text: str) -> None:
def notify_buy(
ticker: str, price: float, amount: float, invested_krw: int,
max_budget: int = 0, per_position: int = 0,
fng: int = 0,
) -> None:
budget_line = (
f"운용예산: {max_budget:,}원 (포지션당 {per_position:,}원)\n"
if max_budget else ""
)
fng_label = (
"극탐욕" if fng >= 76 else
"탐욕" if fng >= 56 else
"중립" if fng >= 46 else
"약공포" if fng >= 41 else
"공포" if fng >= 26 else
"극공포"
) if fng else ""
fng_line = f"F&amp;G: {fng} ({fng_label})\n" if fng else ""
_send(
f"📈 <b>[매수]</b> {ticker}\n"
f"가격: {price:,.2f}\n"
f"수량: {amount:.8f}\n"
f"투자금: {invested_krw:,.2f}\n"
f"{fng_line}"
f"{budget_line}"
)
@@ -62,12 +73,28 @@ def notify_sell(
)
def notify_signal(ticker: str, signal_price: float, vol_mult: float) -> None:
def notify_signal(ticker: str, signal_price: float, vol_mult: float, fng: int = 0) -> None:
"""거래량 축적 신호 감지 알림."""
from .fng import FNG_MIN_ENTRY
fng_label = (
"극탐욕" if fng >= 76 else
"탐욕" if fng >= 56 else
"중립" if fng >= 46 else
"약공포" if fng >= 41 else
"공포" if fng >= 26 else
"극공포"
) if fng else ""
fng_line = f"F&amp;G: {fng} ({fng_label})\n" if fng else ""
warn_line = (
f"⚠️ F&amp;G={fng} &lt; {FNG_MIN_ENTRY} → <b>진입차단중</b>\n"
if fng and fng < FNG_MIN_ENTRY else ""
)
_send(
f"🔍 <b>[축적감지]</b> {ticker}\n"
f"신호가: {signal_price:,.2f}\n"
f"거래량: {vol_mult:.1f}x 급증 + 2h 횡보\n"
f"{fng_line}"
f"{warn_line}"
f"진입 목표: {signal_price * 1.048:,.2f}원 (+4.8%)"
)
@@ -98,6 +125,20 @@ def notify_status(
f"| 조건 TREND≥{regime['trend_pct']}% / VOL≥{regime['vol_mult']}x\n"
)
# F&G 지수
from .fng import get_fng, FNG_MIN_ENTRY
fv = get_fng()
fng_label = (
"극탐욕" if fv >= 76 else
"탐욕" if fv >= 56 else
"중립" if fv >= 46 else
"약공포" if fv >= 41 else
"공포" if fv >= 26 else
"극공포"
)
fng_status = "✅진입허용" if fv >= FNG_MIN_ENTRY else "🚫진입차단"
fng_line = f"😨 F&amp;G: {fv} ({fng_label}) {fng_status}\n"
# 1시간 이상 보유 포지션만 필터
long_positions = {
ticker: pos for ticker, pos in positions.items()
@@ -112,7 +153,7 @@ def notify_status(
)
# 포지션 없어도 레짐 정보는 전송
header = f"📊 <b>[{now} 현황]</b>\n{regime_line}{budget_info}"
header = f"📊 <b>[{now} 현황]</b>\n{regime_line}{fng_line}{budget_info}"
if not long_positions:
_send(header + "1h+ 보유 포지션 없음")

View File

@@ -23,6 +23,7 @@ import time
import pyupbit
from .fng import FNG_MIN_ENTRY, is_entry_allowed
from .market import get_current_price
from .market_regime import get_regime
from .notify import notify_signal
@@ -122,9 +123,14 @@ def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
def should_buy(ticker: str) -> bool:
"""Volume Lead 전략.
1단계: 거래량 급증 + 2h 횡보 → 신호가 기록
2단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
1단계: F&G 필터 — 공포탐욕지수 < FNG_MIN_ENTRY(41)이면 즉시 차단
2단계: 거래량 급증 + 2h 횡보 → 신호가 기록
3단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
"""
# ── F&G 진입 필터 ─────────────────────────────────────
if not is_entry_allowed():
return False
regime = get_regime()
vol_mult = regime["vol_mult"]
@@ -177,11 +183,13 @@ def should_buy(ticker: str) -> bool:
entry_thr = _calc_entry_threshold(ratio)
_accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": ratio}
from .fng import get_fng
fng_now = get_fng()
logger.info(
f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}"
f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}%)"
f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}% | F&G={fng_now})"
)
notify_signal(ticker, current, ratio)
notify_signal(ticker, current, ratio, fng=fng_now)
return False # 신호 첫 발생 시는 진입 안 함
# ── 신호 있음: 상승 확인 → 진입 ─────────────────────────

View File

@@ -480,8 +480,10 @@ def buy(ticker: str) -> bool:
f"{prefix}[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | "
f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}"
)
from .fng import get_fng
notify_buy(ticker, actual_price, amount, order_krw,
max_budget=MAX_BUDGET, per_position=PER_POSITION)
max_budget=MAX_BUDGET, per_position=PER_POSITION,
fng=get_fng())
return True
except Exception as e:
logger.error(f"매수 예외 {ticker}: {e}")

View File

@@ -6,6 +6,7 @@ import threading
import time
from core import trader
from core.fng import FNG_MIN_ENTRY, get_fng
from core.market import get_top_tickers
from core.market_regime import get_regime
from core.strategy import get_active_signals, should_buy
@@ -73,8 +74,22 @@ def run_scanner() -> None:
time.sleep(SCAN_INTERVAL)
continue
# F&G 진입 필터 — 차단 구간이면 전체 스캔 스킵
fv = get_fng()
fng_label = (
"극탐욕" if fv >= 76 else "탐욕" if fv >= 56 else
"중립" if fv >= 46 else "약공포" if fv >= 41 else
"공포" if fv >= 26 else "극공포"
)
if fv < FNG_MIN_ENTRY:
logger.info(
f"[F&G차단] F&G={fv} ({fng_label}) < {FNG_MIN_ENTRY} — 신규 매수 스킵"
)
time.sleep(SCAN_INTERVAL)
continue
tickers = get_top_tickers()
logger.info(f"스캔 시작: {len(tickers)}개 종목")
logger.info(f"스캔 시작: {len(tickers)}개 종목 | F&G={fv}({fng_label})")
for ticker in tickers:
# 이미 보유 중인 종목 제외

315
fng_1y_backtest.py Normal file
View File

@@ -0,0 +1,315 @@
"""F&G 조건별 백테스트 - 1년치 데이터 (배치 수집)
60일 극공포 편향을 제거하고 Bull/Neutral/Bear 다양한 구간 포함.
데이터: 1h 캔들 배치 수집 → 약 365일치
"""
from __future__ import annotations
import datetime, json, time, sys, urllib.request
import pandas as pd
import pyupbit
from dataclasses import dataclass
TICKERS = [
"KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
"KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
"KRW-SUI", "KRW-HBAR",
"KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
"KRW-KAVA", "KRW-KNC",
]
VOL_MULT = 2.0
QUIET_2H = 2.0
SIG_TO_H = 8
MOM_THR = 3.0
SIG_CANCEL = 3.0
TRAIL_STOP = 0.015
TIME_H = 24
TIME_MIN = 3.0
# ── 데이터 수집 ───────────────────────────────────────────────
def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
"""1h 캔들을 배치로 수집해 약 1년치 DataFrame 반환."""
all_dfs = []
end = datetime.datetime.now()
batch = 1440 # 60일치씩
prev_oldest = None
while True:
df = pyupbit.get_ohlcv(
ticker, interval="minute60", count=batch,
to=end.strftime("%Y-%m-%d %H:%M:%S"),
)
if df is None or df.empty:
break
all_dfs.append(df)
oldest = df.index[0]
# 상장 초기 종목: oldest가 진전되지 않으면 더 오래된 데이터 없음
if prev_oldest is not None and oldest >= prev_oldest:
break
prev_oldest = oldest
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
if oldest <= cutoff:
break
end = oldest
time.sleep(0.12)
if not all_dfs:
return None
combined = pd.concat(all_dfs).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
return combined[combined.index >= cutoff]
def load_fng() -> dict[str, int]:
url = "https://api.alternative.me/fng/?limit=400&format=json"
with urllib.request.urlopen(url, timeout=10) as r:
data = json.loads(r.read())
return {
datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
int(d["value"])
for d in data["data"]
}
def fng_val(fng_map, ts):
return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
# ── 시뮬레이션 ────────────────────────────────────────────────
@dataclass
class Trade:
pnl: float
h: int
fng: int
exit: str
def simulate(df, fng_map, fng_lo=None, fng_hi=None) -> list[Trade]:
closes = df["close"].values
vols = df["volume"].values
idx = df.index
trades: list[Trade] = []
sig_px = sig_i = None
pos_buy = pos_peak = pos_i = pos_fng = None
for i in range(7, len(closes) - max(TIME_H + 4, 10)):
if pos_buy is not None:
cur = closes[i]
if cur > pos_peak:
pos_peak = cur
if (pos_peak - cur) / pos_peak >= TRAIL_STOP:
trades.append(Trade((cur - pos_buy) / pos_buy * 100,
i - pos_i, pos_fng, "trail"))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
if i - pos_i >= TIME_H:
pnl = (cur - pos_buy) / pos_buy * 100
if pnl < TIME_MIN:
trades.append(Trade(pnl, i - pos_i, pos_fng, "time"))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
continue
if sig_px is not None:
if i - sig_i > SIG_TO_H:
sig_px = sig_i = None
elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL:
sig_px = sig_i = None
if sig_px is None:
vol_avg = vols[i - 6:i - 1].mean()
if vol_avg <= 0:
continue
if vols[i - 1] / vol_avg >= VOL_MULT:
if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H:
sig_px = closes[i]
sig_i = i
continue
fv = fng_val(fng_map, idx[i])
if fng_lo is not None and fv < fng_lo:
continue
if fng_hi is not None and fv > fng_hi:
continue
if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR:
pos_buy = pos_peak = closes[i]
pos_i = i
pos_fng = fv
sig_px = sig_i = None
return trades
def stats(trades):
if not trades:
return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
avg_win=0, avg_loss=0, max_dd=0)
wins = [t for t in trades if t.pnl > 0]
losses = [t for t in trades if t.pnl <= 0]
aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
al = sum(t.pnl for t in losses) / len(losses) if losses else 0
cum = pk = max_dd = 0.0
for t in trades:
cum += t.pnl
if cum > pk: pk = cum
if pk - cum > max_dd: max_dd = pk - cum
return dict(
n=len(trades), wr=len(wins) / len(trades) * 100,
avg_pnl=sum(t.pnl for t in trades) / len(trades),
total_pnl=sum(t.pnl for t in trades),
rr=abs(aw / al) if al else 0,
avg_win=aw, avg_loss=al, max_dd=max_dd,
)
def main():
print("F&G 데이터 로드...")
fng_map = load_fng()
# F&G 연간 분포 출력
from collections import Counter
zone_cnt = Counter()
for v in fng_map.values():
if v <= 25: zone_cnt["극공포(0~25)"] += 1
elif v <= 45: zone_cnt["공포(26~45)"] += 1
elif v <= 55: zone_cnt["중립(46~55)"] += 1
elif v <= 75: zone_cnt["탐욕(56~75)"] += 1
else: zone_cnt["극탐욕(76~100)"] += 1
total_days = sum(zone_cnt.values())
print(f" 1년 F&G 분포 ({total_days}일):")
for k, v in sorted(zone_cnt.items()):
bar = "" * (v // 5)
print(f" {k:<14} {v:>3}일 ({v/total_days*100:>4.1f}%) {bar}")
print(f"\n종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
datasets = {}
for i, tk in enumerate(TICKERS):
try:
df = fetch_1y(tk, total_days=365)
if df is not None and len(df) > 100:
datasets[tk] = df
sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
except Exception as e:
sys.stderr.write(f"\r {tk} 실패: {e} ")
sys.stderr.write("\n")
print(f" 완료: {len(datasets)}개 종목\n")
# ── 전체 기간 F&G 구간별 성과 ────────────────────────────
CONFIGS = [
(None, None, "필터 없음 (전체)"),
(None, 25, "극공포만 (0~25)"),
(26, 45, "공포만 (26~45)"),
(46, 55, "중립만 (46~55)"),
(56, 100, "탐욕+ (56~100)"),
(46, 100, "중립 이상 (46~100)"),
(26, 100, "공포 이상 (26~100)"),
]
print("=" * 78)
print(" F&G 조건별 성과 - 1년치 (1h 캔들 / 모멘텀 / 스탑1.5%)")
print("=" * 78)
print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
f"{'손익비':>6} {'총PnL':>9} {'MaxDD':>7}")
print(" " + "-" * 72)
all_results = {}
for lo, hi, label in CONFIGS:
all_trades = []
for df in datasets.values():
all_trades.extend(simulate(df, fng_map, lo, hi))
s = stats(all_trades)
all_results[label] = (s, all_trades)
if s["n"] == 0:
print(f" {label:<26} 거래 없음 (해당 구간 진입 기회 없음)")
continue
sign = "+" if s["total_pnl"] > 0 else ""
print(
f" {label:<26} {s['n']:>5}{s['wr']:>5.1f}% "
f"{s['avg_pnl']:>+7.3f}% {s['rr']:>5.2f} "
f"{sign}{s['total_pnl']:>8.1f}% -{s['max_dd']:>5.1f}%"
)
# ── 분기별 성과 (계절성) ──────────────────────────────────
print()
print(" 분기별 성과 (전체 필터 없음 기준):")
base_trades = all_results["필터 없음 (전체)"][1]
for df in datasets.values():
pass # already computed
# 전체 종목 합산 후 날짜로 분기 분리
all_base = []
for df in datasets.values():
t_list = simulate(df, fng_map)
# trade에 날짜 정보 추가
# simulate에서 idx를 참조하지 않으므로 재계산
all_base.extend(t_list)
# F&G 수치별 세분화
print()
print(" F&G 10단위 구간별 세부 성과:")
print(f" {'구간':<16} {'건수':>5} {'승률':>6} {'평균PnL':>9} {'손익비':>6} {'의미'}")
print(" " + "-" * 65)
fng_zones_detail = [
(0, 10, "극단 공포(0~10)"),
(11, 20, "극단 공포(11~20)"),
(21, 30, "극공포(21~30)"),
(31, 40, "공포(31~40)"),
(41, 50, "약공포(41~50)"),
(51, 60, "약탐욕(51~60)"),
(61, 75, "탐욕(61~75)"),
(76, 100, "극탐욕(76~100)"),
]
base_all = all_results["필터 없음 (전체)"][1]
for lo, hi, name in fng_zones_detail:
sub = [t for t in base_all if lo <= t.fng <= hi]
if not sub:
continue
s = stats(sub)
breakeven_wr = 1 / (1 + s["rr"]) * 100 if s["rr"] > 0 else 50
profitable = "✅ 수익" if s["avg_pnl"] > 0 else ("⚠️ BEP 근접" if s["avg_pnl"] > -0.2 else "❌ 손실")
print(
f" {name:<16} {s['n']:>5}{s['wr']:>5.1f}% "
f"{s['avg_pnl']:>+8.3f}% {s['rr']:>5.2f} {profitable}"
)
# ── 최적 F&G 구간 요약 ───────────────────────────────────
print()
best = max(
[(label, s) for label, (s, _) in all_results.items() if s["n"] >= 50],
key=lambda x: x[1]["avg_pnl"],
)
print(f" ★ 최적 구간: {best[0]} "
f"(거래 {best[1]['n']}건 | 승률 {best[1]['wr']:.1f}% | "
f"평균PnL {best[1]['avg_pnl']:+.3f}%)")
# ── DB 저장 ──────────────────────────────────────────────
try:
from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
ensure_tables()
params = {
"tickers": len(datasets), "days": 365, "candle": "1h",
"trail_stop": 0.015, "mom_thr": 3.0, "vol_mult": 2.0,
}
run_id = insert_run(
run_name="fng_1y_backtest",
description="F&G 구간별 성과 1년치 백테스트 (1h 캔들 / 모멘텀 / 스탑1.5%)",
params=params,
)
for lo, hi, label in CONFIGS:
if label in all_results:
s, trades = all_results[label]
if s["n"] > 0:
insert_result(run_id, label, s, lo, hi)
# 전체 거래는 per-ticker 분리 없이 일괄 저장 (run_id+label로 구분)
insert_trades_bulk(run_id, label, "_all_", trades)
print(f"\n [DB 저장 완료] run_id: {run_id}")
except Exception as e:
print(f"\n [DB 저장 실패] {e}")
if __name__ == "__main__":
main()

317
fng_adaptive_backtest.py Normal file
View File

@@ -0,0 +1,317 @@
"""F&G 구간별 맞춤 파라미터 백테스트
핵심 가설:
극공포 구간은 시장이 불안정 → 더 엄격한 진입 기준 필요
탐욕 구간은 상승 모멘텀이 지속 → 다소 느슨한 기준도 가능
테스트 방식:
각 F&G 구간마다 다른 파라미터 조합을 적용하고 성과 비교.
구간별 최적 파라미터 도출 → 실제 전략에 반영
결과를 Oracle DB에 저장.
데이터: 1년치 1h 캔들 (배치 수집)
"""
from __future__ import annotations
import datetime
import json
import sys
import time
import urllib.request
import pandas as pd
import pyupbit
from dataclasses import dataclass
# ── DB 저장 ─────────────────────────────────────────────────
try:
from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
DB_ENABLED = True
except Exception as e:
print(f" [DB 비활성화] {e}")
DB_ENABLED = False
TICKERS = [
"KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
"KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
"KRW-SUI", "KRW-HBAR",
"KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
"KRW-KAVA", "KRW-KNC",
]
# ── F&G 구간별 파라미터 조합 ─────────────────────────────────
# (fng_lo, fng_hi, label, vol_mult, quiet_2h, sig_to_h, mom_thr, sig_cancel, trail_stop, time_h, time_min)
ADAPTIVE_CONFIGS = [
# 기준선 (F&G 무관, 단일 파라미터)
(None, None, "기준선(전체/현행파라미터)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
# ── 극공포 (0~25) 구간 ── 엄격한 기준 ──
# 극공포에서는 변동성 급증이 흔함 → 볼륨 기준 올리고, 모멘텀 강화
(None, 25, "극공포/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
(None, 25, "극공포/엄격(3x vol+4%mom)", 3.0, 2.0, 8, 4.0, 3.0, 0.010, 24, 3.0),
(None, 25, "극공포/매우엄격(3x+5%+1%스탑)", 3.0, 2.0, 6, 5.0, 3.0, 0.010, 24, 3.0),
(None, 25, "극공포/넓은스탑(2x+3%+2%스탑)", 2.0, 2.0, 8, 3.0, 3.0, 0.020, 24, 3.0),
(None, 25, "극공포/짧은신호(3x+4%+4h유효)", 3.0, 2.0, 4, 4.0, 3.0, 0.015, 24, 3.0),
# ── 공포 (26~45) ── 중간 기준 ──
(26, 45, "공포/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
(26, 45, "공포/약강화(2.5x vol+3.5%mom)", 2.5, 2.0, 8, 3.5, 3.0, 0.015, 24, 3.0),
(26, 45, "공포/엄격(3x vol+4%mom)", 3.0, 2.0, 8, 4.0, 3.0, 0.010, 24, 3.0),
# ── 중립 이상 (46~100) ── 완화된 기준 가능 ──
(46, None, "중립이상/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
(46, None, "중립이상/완화(1.5x vol+2.5%mom)",1.5, 2.0, 8, 2.5, 3.0, 0.015, 24, 3.0),
(46, None, "중립이상/엄격(2.5x+3.5%)", 2.5, 2.0, 8, 3.5, 3.0, 0.015, 24, 3.0),
# ── 탐욕+ (56~100) ──
(56, None, "탐욕이상/기준", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
(56, None, "탐욕이상/완화(1.5x+2.5%)", 1.5, 2.0, 8, 2.5, 3.0, 0.015, 24, 3.0),
]
# ── 데이터 수집 ──────────────────────────────────────────────
def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
all_dfs = []
end = datetime.datetime.now()
batch = 1440 # 60일치씩
prev_oldest = None
while True:
df = pyupbit.get_ohlcv(
ticker, interval="minute60", count=batch,
to=end.strftime("%Y-%m-%d %H:%M:%S"),
)
if df is None or df.empty:
break
all_dfs.append(df)
oldest = df.index[0]
# 상장 초기 종목: oldest가 진전되지 않으면 더 이상 오래된 데이터 없음
if prev_oldest is not None and oldest >= prev_oldest:
break
prev_oldest = oldest
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
if oldest <= cutoff:
break
end = oldest
time.sleep(0.12)
if not all_dfs:
return None
combined = pd.concat(all_dfs).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
return combined[combined.index >= cutoff]
def load_fng() -> dict[str, int]:
url = "https://api.alternative.me/fng/?limit=400&format=json"
with urllib.request.urlopen(url, timeout=10) as r:
data = json.loads(r.read())
return {
datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
int(d["value"])
for d in data["data"]
}
def fng_val(fng_map, ts) -> int:
return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
# ── 시뮬레이션 ──────────────────────────────────────────────
@dataclass
class Trade:
pnl: float
h: int
fng: int
exit: str
def simulate(
df, fng_map,
fng_lo=None, fng_hi=None,
vol_mult=2.0, quiet_2h=2.0, sig_to_h=8,
mom_thr=3.0, sig_cancel=3.0, trail_stop=0.015,
time_h=24, time_min=3.0,
) -> list[Trade]:
closes = df["close"].values
vols = df["volume"].values
idx = df.index
trades: list[Trade] = []
sig_px = sig_i = None
pos_buy = pos_peak = pos_i = pos_fng = None
for i in range(7, len(closes) - max(time_h + 4, 10)):
if pos_buy is not None:
cur = closes[i]
if cur > pos_peak:
pos_peak = cur
if (pos_peak - cur) / pos_peak >= trail_stop:
trades.append(Trade((cur - pos_buy) / pos_buy * 100,
i - pos_i, pos_fng, "trail"))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
if i - pos_i >= time_h:
pnl = (cur - pos_buy) / pos_buy * 100
if pnl < time_min:
trades.append(Trade(pnl, i - pos_i, pos_fng, "time"))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
continue
if sig_px is not None:
if i - sig_i > sig_to_h:
sig_px = sig_i = None
elif (closes[i] - sig_px) / sig_px * 100 < -sig_cancel:
sig_px = sig_i = None
if sig_px is None:
vol_avg = vols[i - 6:i - 1].mean()
if vol_avg <= 0:
continue
if vols[i - 1] / vol_avg >= vol_mult:
if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < quiet_2h:
sig_px = closes[i]
sig_i = i
continue
fv = fng_val(fng_map, idx[i])
if fng_lo is not None and fv < fng_lo:
continue
if fng_hi is not None and fv > fng_hi:
continue
if (closes[i] - sig_px) / sig_px * 100 >= mom_thr:
pos_buy = pos_peak = closes[i]
pos_i = i
pos_fng = fv
sig_px = sig_i = None
return trades
def stats(trades):
if not trades:
return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
avg_win=0, avg_loss=0, max_dd=0)
wins = [t for t in trades if t.pnl > 0]
losses = [t for t in trades if t.pnl <= 0]
aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
al = sum(t.pnl for t in losses) / len(losses) if losses else 0
cum = pk = max_dd = 0.0
for t in trades:
cum += t.pnl
if cum > pk: pk = cum
if pk - cum > max_dd: max_dd = pk - cum
return dict(
n=len(trades), wr=len(wins) / len(trades) * 100,
avg_pnl=sum(t.pnl for t in trades) / len(trades),
total_pnl=sum(t.pnl for t in trades),
rr=abs(aw / al) if al else 0,
avg_win=aw, avg_loss=al, max_dd=max_dd,
)
def main():
print("F&G 데이터 로드...")
fng_map = load_fng()
print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
datasets = {}
for i, tk in enumerate(TICKERS):
try:
df = fetch_1y(tk, total_days=365)
if df is not None and len(df) > 100:
datasets[tk] = df
sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
except Exception as e:
sys.stderr.write(f"\r {tk} 실패: {e} ")
sys.stderr.write("\n")
print(f" 완료: {len(datasets)}개 종목\n")
# ── DB 준비 ───────────────────────────────────────────
run_id = None
if DB_ENABLED:
ensure_tables()
params = {
"tickers": len(datasets),
"days": 365,
"candle": "1h",
"stop": "trail+time",
}
run_id = insert_run(
run_name="fng_adaptive_1y",
description="F&G 구간별 맞춤 파라미터 1년 백테스트",
params=params,
)
print(f" DB run_id: {run_id}\n")
# ── 결과 출력 ─────────────────────────────────────────
print("=" * 92)
print(" F&G 구간별 맞춤 파라미터 성과 비교 (1년치 / 1h 캔들)")
print("=" * 92)
print(f" {'조건':<42} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
f"{'손익비':>5} {'총PnL':>9} {'MaxDD':>7}")
print(" " + "-" * 86)
best_by_zone: dict[str, tuple] = {}
for cfg in ADAPTIVE_CONFIGS:
fng_lo, fng_hi, label, vol_mult, quiet_2h, sig_to_h, mom_thr, sig_cancel, trail_stop, time_h, time_min = cfg
all_trades: list[Trade] = []
per_ticker: dict[str, list[Trade]] = {}
for tk, df in datasets.items():
t = simulate(
df, fng_map,
fng_lo=fng_lo, fng_hi=fng_hi,
vol_mult=vol_mult, quiet_2h=quiet_2h, sig_to_h=sig_to_h,
mom_thr=mom_thr, sig_cancel=sig_cancel, trail_stop=trail_stop,
time_h=time_h, time_min=time_min,
)
all_trades.extend(t)
per_ticker[tk] = t
s = stats(all_trades)
# 구분선 (기준선 다음)
if label == "극공포/기준(2x vol+3%mom)":
print()
if s["n"] == 0:
print(f" {label:<42} 거래 없음")
continue
marker = "" if s["avg_pnl"] > 0 else ""
print(
f" {label:<42} {s['n']:>5}{s['wr']:>5.1f}% "
f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} "
f"{s['total_pnl']:>+8.1f}% -{s['max_dd']:>5.1f}%{marker}"
)
# DB 저장
if DB_ENABLED and run_id:
insert_result(run_id, label, s, fng_lo, fng_hi)
for tk, t_list in per_ticker.items():
insert_trades_bulk(run_id, label, tk, t_list)
# 구간별 최고 avg_pnl 추적
zone_key = label.split("/")[0]
if zone_key not in best_by_zone or s["avg_pnl"] > best_by_zone[zone_key][1]:
best_by_zone[zone_key] = (label, s["avg_pnl"], s)
# ── 구간별 최적 요약 ──────────────────────────────────
print()
print(" ★ 구간별 최적 파라미터:")
print(f" {'구간':<14} {'최적 조건':<42} {'거래':>5} {'승률':>6} {'평균PnL':>8}")
print(" " + "-" * 72)
for zone, (label, best_pnl, s) in best_by_zone.items():
if s["n"] > 0:
print(f" {zone:<14} {label:<42} {s['n']:>5}{s['wr']:>5.1f}% {best_pnl:>+7.3f}%")
if DB_ENABLED and run_id:
print(f"\n [DB 저장 완료] run_id: {run_id}")
if __name__ == "__main__":
main()

382
fng_sim_comparison.py Normal file
View File

@@ -0,0 +1,382 @@
"""F&G 필터 전후 수익 비교 시뮬레이션
필터 없음 vs F&G ≥ 41 필터 적용 시 1년치 성과를 직접 비교.
표시:
- 거래 수, 승률, 평균 PnL, 총 누적 PnL
- 거래당 고정 자본 100만 원 기준 KRW 환산 손익
- 월별 손익 흐름 (계절성 확인)
- 극공포 차단 일수 통계
결과는 Oracle DB(backtest_results)에 저장.
데이터: 1년치 1h 캔들 (배치 수집)
"""
from __future__ import annotations
import datetime
import json
import sys
import time
import urllib.request
from dataclasses import dataclass
import pandas as pd
import pyupbit
# ── DB 저장 ─────────────────────────────────────────────────
try:
from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
DB_ENABLED = True
except Exception as e:
print(f" [DB 비활성화] {e}")
DB_ENABLED = False
TICKERS = [
"KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
"KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
"KRW-SUI", "KRW-HBAR",
"KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
"KRW-KAVA", "KRW-KNC",
]
CAPITAL_PER_TRADE = 1_000_000 # 거래당 고정 자본 (KRW)
# 전략 파라미터 (현행)
VOL_MULT = 2.0
QUIET_2H = 2.0
SIG_TO_H = 8
MOM_THR = 3.0
SIG_CANCEL = 3.0
TRAIL_STOP = 0.015
TIME_H = 24
TIME_MIN = 3.0
FNG_MIN = 41 # 이 값 미만이면 진입 차단
# ── 데이터 수집 ──────────────────────────────────────────────
def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
all_dfs = []
end = datetime.datetime.now()
batch = 1440
prev_oldest = None
while True:
df = pyupbit.get_ohlcv(
ticker, interval="minute60", count=batch,
to=end.strftime("%Y-%m-%d %H:%M:%S"),
)
if df is None or df.empty:
break
all_dfs.append(df)
oldest = df.index[0]
if prev_oldest is not None and oldest >= prev_oldest:
break
prev_oldest = oldest
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
if oldest <= cutoff:
break
end = oldest
time.sleep(0.12)
if not all_dfs:
return None
combined = pd.concat(all_dfs).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
return combined[combined.index >= cutoff]
def load_fng() -> dict[str, int]:
url = "https://api.alternative.me/fng/?limit=400&format=json"
with urllib.request.urlopen(url, timeout=10) as r:
data = json.loads(r.read())
return {
datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
int(d["value"])
for d in data["data"]
}
def fng_val(fng_map, ts) -> int:
return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
# ── 시뮬레이션 ──────────────────────────────────────────────
@dataclass
class Trade:
pnl: float
h: int
fng: int
exit: str
date: str # YYYY-MM
def simulate(df, fng_map, fng_min: int | None = None) -> list[Trade]:
closes = df["close"].values
vols = df["volume"].values
idx = df.index
trades: list[Trade] = []
sig_px = sig_i = None
pos_buy = pos_peak = pos_i = pos_fng = None
for i in range(7, len(closes) - max(TIME_H + 4, 10)):
if pos_buy is not None:
cur = closes[i]
if cur > pos_peak:
pos_peak = cur
if (pos_peak - cur) / pos_peak >= TRAIL_STOP:
trades.append(Trade(
(cur - pos_buy) / pos_buy * 100,
i - pos_i, pos_fng, "trail",
idx[i].strftime("%Y-%m"),
))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
if i - pos_i >= TIME_H:
pnl = (cur - pos_buy) / pos_buy * 100
if pnl < TIME_MIN:
trades.append(Trade(
pnl, i - pos_i, pos_fng, "time",
idx[i].strftime("%Y-%m"),
))
pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
continue
continue
if sig_px is not None:
if i - sig_i > SIG_TO_H:
sig_px = sig_i = None
elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL:
sig_px = sig_i = None
if sig_px is None:
vol_avg = vols[i - 6:i - 1].mean()
if vol_avg <= 0:
continue
if vols[i - 1] / vol_avg >= VOL_MULT:
if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H:
sig_px = closes[i]
sig_i = i
continue
fv = fng_val(fng_map, idx[i])
if fng_min is not None and fv < fng_min:
continue
if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR:
pos_buy = pos_peak = closes[i]
pos_i = i
pos_fng = fv
sig_px = sig_i = None
return trades
def stats(trades: list[Trade]) -> dict:
if not trades:
return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
avg_win=0, avg_loss=0, max_dd=0, krw_total=0)
wins = [t for t in trades if t.pnl > 0]
losses = [t for t in trades if t.pnl <= 0]
aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
al = sum(t.pnl for t in losses) / len(losses) if losses else 0
cum = pk = max_dd = 0.0
for t in trades:
cum += t.pnl
if cum > pk: pk = cum
if pk - cum > max_dd: max_dd = pk - cum
total_pnl = sum(t.pnl for t in trades)
return dict(
n=len(trades), wr=len(wins) / len(trades) * 100,
avg_pnl=total_pnl / len(trades),
total_pnl=total_pnl,
rr=abs(aw / al) if al else 0,
avg_win=aw, avg_loss=al, max_dd=max_dd,
krw_total=total_pnl / 100 * CAPITAL_PER_TRADE,
)
def monthly_pnl(trades: list[Trade]) -> dict[str, float]:
"""월별 누적 PnL(%) 반환."""
monthly: dict[str, float] = {}
for t in trades:
monthly[t.date] = monthly.get(t.date, 0) + t.pnl
return dict(sorted(monthly.items()))
def main():
print("F&G 데이터 로드...")
fng_map = load_fng()
# F&G 분포
block_days = sum(1 for v in fng_map.values() if v < FNG_MIN)
total_days = len(fng_map)
print(f" 1년 F&G 분포: 진입차단(< {FNG_MIN}) = {block_days}일 / {total_days}"
f"({block_days/total_days*100:.1f}%)")
print(f" 진입허용(≥ {FNG_MIN}) = {total_days - block_days}일 ({(total_days-block_days)/total_days*100:.1f}%)\n")
print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
datasets: dict[str, pd.DataFrame] = {}
for i, tk in enumerate(TICKERS):
try:
df = fetch_1y(tk, total_days=365)
if df is not None and len(df) > 100:
datasets[tk] = df
sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
except Exception as e:
sys.stderr.write(f"\r {tk} 실패: {e} ")
sys.stderr.write("\n")
print(f" 완료: {len(datasets)}개 종목\n")
# ── 두 가지 조건 시뮬레이션 ──────────────────────────────
# A: 필터 없음 (현행)
# B: F&G ≥ 41 (신규)
all_trades_A: list[Trade] = []
all_trades_B: list[Trade] = []
per_ticker_A: dict[str, list[Trade]] = {}
per_ticker_B: dict[str, list[Trade]] = {}
for tk, df in datasets.items():
ta = simulate(df, fng_map, fng_min=None)
tb = simulate(df, fng_map, fng_min=FNG_MIN)
all_trades_A.extend(ta)
all_trades_B.extend(tb)
per_ticker_A[tk] = ta
per_ticker_B[tk] = tb
sa = stats(all_trades_A)
sb = stats(all_trades_B)
# ── 결과 출력 ─────────────────────────────────────────────
print("=" * 80)
print(f" F&G 필터 전후 비교 (1년치 / {len(datasets)}개 종목 / 1h캔들 / 자본 {CAPITAL_PER_TRADE:,}원/거래)")
print("=" * 80)
print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
f"{'손익비':>5} {'총PnL':>8} {'MaxDD':>7} {'KRW손익':>14}")
print(" " + "-" * 76)
for label, s in [("필터 없음 (현행)", sa), (f"F&G≥{FNG_MIN} 필터 (신규)", sb)]:
krw_str = f"{s['krw_total']:>+,.0f}"
print(
f" {label:<26} {s['n']:>5}{s['wr']:>5.1f}% "
f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} "
f"{s['total_pnl']:>+7.1f}% -{s['max_dd']:>5.1f}% {krw_str:>14}"
)
diff_trades = sb["n"] - sa["n"]
diff_krw = sb["krw_total"] - sa["krw_total"]
diff_wr = sb["wr"] - sa["wr"]
print(f"\n 변화: 거래수 {diff_trades:+d}건 | 승률 {diff_wr:+.1f}%p | "
f"KRW손익 {diff_krw:>+,.0f}")
# ── 월별 손익 흐름 ────────────────────────────────────────
print()
print(" 월별 손익 비교 (필터없음 vs F&G≥41):")
print(f" {'':>8} {'차단일수':>6} {'필터없음':>9} {'F&G필터':>9} {'개선':>8} {'누적(필터)':>12}")
print(" " + "-" * 62)
ma = monthly_pnl(all_trades_A)
mb = monthly_pnl(all_trades_B)
all_months = sorted(set(ma.keys()) | set(mb.keys()))
cum_b = 0.0
for m in all_months:
pa = ma.get(m, 0.0)
pb = mb.get(m, 0.0)
cum_b += pb
diff = pb - pa
# 해당 월 차단 일수
yr, mo = int(m[:4]), int(m[5:])
blocked = sum(
1 for d, v in fng_map.items()
if d.startswith(m) and v < FNG_MIN
)
bar = "" * min(int(abs(pb) / 3), 12) if pb > 0 else "" * min(int(abs(pb) / 3), 12)
sign = "+" if pb > 0 else ""
diff_sign = "" if diff > 0 else ("" if diff < 0 else "=")
print(
f" {m} {blocked:>4}일차단 "
f"{pa:>+8.1f}% {sign}{pb:>8.1f}% "
f"{diff_sign}{abs(diff):>6.1f}% {cum_b:>+10.1f}%"
)
# ── 종목별 비교 (상위/하위) ───────────────────────────────
print()
print(" 종목별 성과 비교 (필터없음 vs F&G≥41):")
print(f" {'종목':<14} {'현행거래':>6} {'현행PnL':>8} {'필터거래':>7} {'필터PnL':>8} {'개선':>8}")
print(" " + "-" * 58)
ticker_rows = []
for tk in sorted(datasets.keys()):
ta_list = per_ticker_A.get(tk, [])
tb_list = per_ticker_B.get(tk, [])
pa = sum(t.pnl for t in ta_list) if ta_list else 0
pb = sum(t.pnl for t in tb_list) if tb_list else 0
ticker_rows.append((tk, len(ta_list), pa, len(tb_list), pb, pb - pa))
for row in sorted(ticker_rows, key=lambda x: x[5], reverse=True):
tk, na, pa, nb, pb, delta = row
mark = "" if delta > 1 else ("" if delta < -1 else " =")
print(
f" {tk:<14} {na:>6}{pa:>+7.1f}% {nb:>6}{pb:>+7.1f}% "
f"{mark}{abs(delta):>6.1f}%"
)
# ── 극공포 차단 효과 분석 ─────────────────────────────────
print()
print(f" F&G < {FNG_MIN} 구간(차단) 거래 성과 분석:")
blocked_trades = [t for t in all_trades_A if t.fng < FNG_MIN]
if blocked_trades:
sb2 = stats(blocked_trades)
print(f" → 차단된 거래 수: {sb2['n']}")
print(f" → 차단 거래 승률: {sb2['wr']:.1f}%")
print(f" → 차단 거래 평균 PnL: {sb2['avg_pnl']:+.3f}%")
print(f" → 차단으로 절약된 손실: {sb2['krw_total']:>+,.0f}"
f"({CAPITAL_PER_TRADE:,}× {sb2['n']}거래 기준)")
else:
print(" → 차단된 거래 없음")
# ── 최적 임계값 확인 ─────────────────────────────────────
print()
print(f" F&G 임계값별 성과 비교 (현행 기준 비교):")
print(f" {'임계값':>8} {'거래':>5} {'승률':>6} {'평균PnL':>9} {'KRW손익':>14}")
print(" " + "-" * 52)
for thr in [25, 30, 35, 41, 45, 50]:
filtered = [t for t in all_trades_A if t.fng >= thr]
if not filtered:
continue
sf = stats(filtered)
marker = " ◀ 채택" if thr == FNG_MIN else ""
print(
f" {thr:>5}이상 {sf['n']:>5}{sf['wr']:>5.1f}% "
f"{sf['avg_pnl']:>+8.3f}% {sf['krw_total']:>+14,.0f}{marker}"
)
# ── DB 저장 ───────────────────────────────────────────────
if DB_ENABLED:
try:
ensure_tables()
params = {
"tickers": len(datasets), "days": 365, "candle": "1h",
"trail_stop": TRAIL_STOP, "mom_thr": MOM_THR,
"fng_min_new": FNG_MIN, "capital_per_trade": CAPITAL_PER_TRADE,
}
run_id = insert_run(
"fng_sim_comparison",
f"F&G 필터 전후 비교 시뮬레이션 (1년치 / F&G≥{FNG_MIN})",
params,
)
insert_result(run_id, "필터 없음 (현행)", sa, None, None)
insert_result(run_id, f"F&G≥{FNG_MIN} 필터 (신규)", sb, FNG_MIN, None)
for tk, t_list in per_ticker_A.items():
insert_trades_bulk(run_id, "필터없음", tk, t_list)
for tk, t_list in per_ticker_B.items():
insert_trades_bulk(run_id, f"fng_ge{FNG_MIN}", tk, t_list)
print(f"\n [DB 저장 완료] run_id: {run_id}")
except Exception as e:
print(f"\n [DB 저장 실패] {e}")
if __name__ == "__main__":
main()