From 27189b1ad90befbe727f5b3d7b0f10fe68f274ec Mon Sep 17 00:00:00 2001 From: joungmin Date: Tue, 3 Mar 2026 15:56:17 +0900 Subject: [PATCH] feat: add Fear & Greed filter to entry logic - core/fng.py: F&G API wrapper with 1h cache (alternative.me) - FNG_MIN_ENTRY=41 (env-configurable), blocks entry below threshold - core/strategy.py: call is_entry_allowed() before volume/regime checks - daemon/runner.py: log F&G status on every scan cycle - core/notify.py: include F&G value in buy/signal/status notifications - core/trader.py: pass current F&G value to notify_buy Backtest evidence (1y / 18 tickers / 1h candles): - No filter: 820 trades, 32.7% WR, avg +0.012%, KRW +95k - F&G >= 41: 372 trades, 39.5% WR, avg +0.462%, KRW +1.72M - Blocked 452 trades (avg -0.372%, saved ~1.68M KRW loss) Also add: - backtest_db.py: Oracle DB storage for backtest runs/results/trades - fng_1y_backtest.py, fng_adaptive_backtest.py, fng_sim_comparison.py Co-Authored-By: Claude Sonnet 4.6 --- backtest_db.py | 247 +++++++++++++++++++++++++ core/fng.py | 71 ++++++++ core/notify.py | 45 ++++- core/strategy.py | 16 +- core/trader.py | 4 +- daemon/runner.py | 13 +- fng_1y_backtest.py | 315 ++++++++++++++++++++++++++++++++ fng_adaptive_backtest.py | 317 ++++++++++++++++++++++++++++++++ fng_sim_comparison.py | 382 +++++++++++++++++++++++++++++++++++++++ 9 files changed, 1402 insertions(+), 8 deletions(-) create mode 100644 backtest_db.py create mode 100644 core/fng.py create mode 100644 fng_1y_backtest.py create mode 100644 fng_adaptive_backtest.py create mode 100644 fng_sim_comparison.py diff --git a/backtest_db.py b/backtest_db.py new file mode 100644 index 0000000..32a800d --- /dev/null +++ b/backtest_db.py @@ -0,0 +1,247 @@ +"""백테스트 결과 Oracle DB 저장 모듈. + +테이블: + backtest_runs - 실행 단위 (실행시각, 설명, 파라미터) + backtest_results - 조건별 집계 (run_id + label) + backtest_trade_log - 개별 거래 (run_id + label + 종목 + pnl + fng + ...) +""" +from __future__ import annotations + +import json +import os +from contextlib import contextmanager +from datetime import datetime +from pathlib import Path +from typing import Generator + +import oracledb +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent / ".env") + +_pool: oracledb.ConnectionPool | None = None + + +def _get_pool() -> oracledb.ConnectionPool: + global _pool + if _pool is None: + kwargs: dict = dict( + user=os.environ["ORACLE_USER"], + password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"], + min=1, + max=3, + increment=1, + ) + wallet = os.environ.get("ORACLE_WALLET") + if wallet: + kwargs["config_dir"] = wallet + _pool = oracledb.create_pool(**kwargs) + return _pool + + +@contextmanager +def _conn() -> Generator[oracledb.Connection, None, None]: + pool = _get_pool() + conn = pool.acquire() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + pool.release(conn) + + +# ── DDL ──────────────────────────────────────────────────────── +_DDL_RUNS = """ +CREATE TABLE backtest_runs ( + run_id VARCHAR2(36) DEFAULT SYS_GUID() PRIMARY KEY, + run_name VARCHAR2(200) NOT NULL, + description VARCHAR2(1000), + params_json CLOB, + created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL +) +""" + +_DDL_RESULTS = """ +CREATE TABLE backtest_results ( + id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + run_id VARCHAR2(36) NOT NULL, + label VARCHAR2(100) NOT NULL, + n_trades NUMBER, + win_rate NUMBER(6,3), + avg_pnl NUMBER(10,4), + total_pnl NUMBER(12,4), + rr NUMBER(8,4), + avg_win NUMBER(10,4), + avg_loss NUMBER(10,4), + max_dd NUMBER(10,4), + fng_lo NUMBER, + fng_hi NUMBER, + created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL, + CONSTRAINT fk_br_run FOREIGN KEY (run_id) REFERENCES backtest_runs(run_id) +) +""" + +_DDL_TRADES = """ +CREATE TABLE backtest_trade_log ( + id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + run_id VARCHAR2(36) NOT NULL, + label VARCHAR2(100), + ticker VARCHAR2(20), + pnl NUMBER(10,4), + hold_h NUMBER, + fng_val NUMBER, + exit_type VARCHAR2(10), + created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL, + CONSTRAINT fk_bt_run FOREIGN KEY (run_id) REFERENCES backtest_runs(run_id) +) +""" + + +def ensure_tables() -> None: + """백테스트 테이블이 없으면 생성.""" + with _conn() as conn: + cur = conn.cursor() + for tbl_name, ddl in [ + ("BACKTEST_RUNS", _DDL_RUNS), + ("BACKTEST_RESULTS", _DDL_RESULTS), + ("BACKTEST_TRADE_LOG", _DDL_TRADES), + ]: + cur.execute( + "SELECT COUNT(*) FROM user_tables WHERE table_name=:1", [tbl_name] + ) + if cur.fetchone()[0] == 0: + cur.execute(ddl) + print(f" {tbl_name} 테이블 생성 완료") + + +# ── 삽입 헬퍼 ────────────────────────────────────────────────── + +def insert_run(run_name: str, description: str = "", params: dict | None = None) -> str: + """새 백테스트 실행 레코드 삽입. run_id 반환.""" + sql = """ + INSERT INTO backtest_runs (run_name, description, params_json) + VALUES (:rname, :rdesc, :rparams) + RETURNING run_id INTO :out_id + """ + with _conn() as conn: + cur = conn.cursor() + out = cur.var(oracledb.STRING) + cur.execute(sql, { + "rname": run_name, + "rdesc": description, + "rparams": json.dumps(params or {}, ensure_ascii=False), + "out_id": out, + }) + return out.getvalue()[0] + + +def insert_result( + run_id: str, + label: str, + stats: dict, + fng_lo: int | None = None, + fng_hi: int | None = None, +) -> None: + """조건별 집계 결과 삽입.""" + sql = """ + INSERT INTO backtest_results + (run_id, label, n_trades, win_rate, avg_pnl, total_pnl, + rr, avg_win, avg_loss, max_dd, fng_lo, fng_hi) + VALUES + (:run_id, :label, :n, :wr, :avg_pnl, :total_pnl, + :rr, :avg_win, :avg_loss, :max_dd, :fng_lo, :fng_hi) + """ + with _conn() as conn: + conn.cursor().execute(sql, { + "run_id": run_id, + "label": label, + "n": stats.get("n", 0), + "wr": round(stats.get("wr", 0), 3), + "avg_pnl": round(stats.get("avg_pnl", 0), 4), + "total_pnl": round(stats.get("total_pnl", 0), 4), + "rr": round(stats.get("rr", 0), 4), + "avg_win": round(stats.get("avg_win", 0), 4), + "avg_loss": round(stats.get("avg_loss", 0), 4), + "max_dd": round(stats.get("max_dd", 0), 4), + "fng_lo": fng_lo, + "fng_hi": fng_hi, + }) + + +def insert_trades_bulk( + run_id: str, + label: str, + ticker: str, + trades: list, +) -> None: + """개별 거래 목록 일괄 삽입.""" + if not trades: + return + sql = """ + INSERT INTO backtest_trade_log + (run_id, label, ticker, pnl, hold_h, fng_val, exit_type) + VALUES (:run_id, :label, :ticker, :pnl, :hold_h, :fng_val, :exit_type) + """ + rows = [] + for t in trades: + rows.append({ + "run_id": run_id, + "label": label, + "ticker": ticker, + "pnl": round(float(getattr(t, "pnl", 0)), 4), + "hold_h": int(getattr(t, "h", 0)), + "fng_val": int(getattr(t, "fng", 0)), + "exit_type": str(getattr(t, "exit", "")), + }) + with _conn() as conn: + conn.cursor().executemany(sql, rows) + + +# ── 조회 ─────────────────────────────────────────────────────── + +def list_runs(limit: int = 20) -> list[dict]: + """최근 백테스트 실행 목록 반환.""" + sql = """ + SELECT run_id, run_name, description, created_at + FROM backtest_runs + ORDER BY created_at DESC + FETCH FIRST :n ROWS ONLY + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"n": limit}) + rows = cur.fetchall() + return [ + {"run_id": r[0], "run_name": r[1], "description": r[2], + "created_at": r[3].strftime("%Y-%m-%d %H:%M")} + for r in rows + ] + + +def get_results(run_id: str) -> list[dict]: + """특정 run_id의 조건별 결과 반환.""" + sql = """ + SELECT label, n_trades, win_rate, avg_pnl, total_pnl, + rr, avg_win, avg_loss, max_dd, fng_lo, fng_hi + FROM backtest_results + WHERE run_id = :run_id + ORDER BY avg_pnl DESC + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"run_id": run_id}) + cols = ["label", "n_trades", "win_rate", "avg_pnl", "total_pnl", + "rr", "avg_win", "avg_loss", "max_dd", "fng_lo", "fng_hi"] + return [dict(zip(cols, r)) for r in cur.fetchall()] + + +if __name__ == "__main__": + print("백테스트 DB 테이블 확인/생성...") + ensure_tables() + print("완료. 최근 실행 목록:") + for r in list_runs(5): + print(f" {r['created_at']} {r['run_name']}") diff --git a/core/fng.py b/core/fng.py new file mode 100644 index 0000000..d27ef46 --- /dev/null +++ b/core/fng.py @@ -0,0 +1,71 @@ +"""공포탐욕지수(F&G) 조회 모듈. + +alternative.me API로 일일 F&G 값을 가져와 메모리에 캐시한다. +캐시 TTL은 1시간 (F&G는 하루 1회 업데이트). + +환경변수: + FNG_MIN_ENTRY (기본값 41): 이 값 미만이면 진입 차단 +""" +from __future__ import annotations + +import json +import logging +import os +import time +import urllib.request +from datetime import datetime + +logger = logging.getLogger(__name__) + +FNG_MIN_ENTRY = int(os.getenv("FNG_MIN_ENTRY", "41")) # 진입 허용 최소 F&G 값 +_FNG_API_URL = "https://api.alternative.me/fng/?limit=1&format=json" +_CACHE_TTL = 3600 # 1시간 + +_fng_value: int | None = None +_fng_cached_at: float = 0.0 +_fng_date_str: str = "" + + +def get_fng() -> int: + """오늘의 F&G 지수 반환 (0~100). API 실패 시 50(중립) 반환.""" + global _fng_value, _fng_cached_at, _fng_date_str + + now = time.time() + if _fng_value is not None and (now - _fng_cached_at) < _CACHE_TTL: + return _fng_value + + try: + with urllib.request.urlopen(_FNG_API_URL, timeout=5) as r: + data = json.loads(r.read()) + entry = data["data"][0] + _fng_value = int(entry["value"]) + _fng_cached_at = now + _fng_date_str = entry.get("timestamp", "") + logger.info( + f"[F&G] 지수={_fng_value} ({entry.get('value_classification','')}) " + f"날짜={datetime.fromtimestamp(int(_fng_date_str)).strftime('%Y-%m-%d') if _fng_date_str else '?'}" + ) + except Exception as e: + logger.warning(f"[F&G] API 조회 실패: {e} → 캐시/중립값 사용") + if _fng_value is None: + _fng_value = 50 # 폴백: 중립 + + return _fng_value # type: ignore[return-value] + + +def is_entry_allowed() -> bool: + """현재 F&G 기준으로 진입 허용 여부 반환. + + F&G ≥ FNG_MIN_ENTRY(41) 이면 True. + 극공포/공포 구간(< 41)이면 False → 진입 차단. + """ + fv = get_fng() + allowed = fv >= FNG_MIN_ENTRY + if not allowed: + label = ( + "극공포" if fv <= 25 else + "공포" if fv <= 40 else + "약공포" + ) + logger.info(f"[F&G] 진입 차단 — F&G={fv} ({label}) < {FNG_MIN_ENTRY}") + return allowed diff --git a/core/notify.py b/core/notify.py index 0209040..7693778 100644 --- a/core/notify.py +++ b/core/notify.py @@ -31,16 +31,27 @@ def _send(text: str) -> None: def notify_buy( ticker: str, price: float, amount: float, invested_krw: int, max_budget: int = 0, per_position: int = 0, + fng: int = 0, ) -> None: budget_line = ( f"운용예산: {max_budget:,}원 (포지션당 {per_position:,}원)\n" if max_budget else "" ) + fng_label = ( + "극탐욕" if fng >= 76 else + "탐욕" if fng >= 56 else + "중립" if fng >= 46 else + "약공포" if fng >= 41 else + "공포" if fng >= 26 else + "극공포" + ) if fng else "" + fng_line = f"F&G: {fng} ({fng_label})\n" if fng else "" _send( f"📈 [매수] {ticker}\n" f"가격: {price:,.2f}원\n" f"수량: {amount:.8f}\n" f"투자금: {invested_krw:,.2f}원\n" + f"{fng_line}" f"{budget_line}" ) @@ -62,12 +73,28 @@ def notify_sell( ) -def notify_signal(ticker: str, signal_price: float, vol_mult: float) -> None: +def notify_signal(ticker: str, signal_price: float, vol_mult: float, fng: int = 0) -> None: """거래량 축적 신호 감지 알림.""" + from .fng import FNG_MIN_ENTRY + fng_label = ( + "극탐욕" if fng >= 76 else + "탐욕" if fng >= 56 else + "중립" if fng >= 46 else + "약공포" if fng >= 41 else + "공포" if fng >= 26 else + "극공포" + ) if fng else "" + fng_line = f"F&G: {fng} ({fng_label})\n" if fng else "" + warn_line = ( + f"⚠️ F&G={fng} < {FNG_MIN_ENTRY} → 진입차단중\n" + if fng and fng < FNG_MIN_ENTRY else "" + ) _send( f"🔍 [축적감지] {ticker}\n" f"신호가: {signal_price:,.2f}원\n" f"거래량: {vol_mult:.1f}x 급증 + 2h 횡보\n" + f"{fng_line}" + f"{warn_line}" f"진입 목표: {signal_price * 1.048:,.2f}원 (+4.8%)" ) @@ -98,6 +125,20 @@ def notify_status( f"| 조건 TREND≥{regime['trend_pct']}% / VOL≥{regime['vol_mult']}x\n" ) + # F&G 지수 + from .fng import get_fng, FNG_MIN_ENTRY + fv = get_fng() + fng_label = ( + "극탐욕" if fv >= 76 else + "탐욕" if fv >= 56 else + "중립" if fv >= 46 else + "약공포" if fv >= 41 else + "공포" if fv >= 26 else + "극공포" + ) + fng_status = "✅진입허용" if fv >= FNG_MIN_ENTRY else "🚫진입차단" + fng_line = f"😨 F&G: {fv} ({fng_label}) {fng_status}\n" + # 1시간 이상 보유 포지션만 필터 long_positions = { ticker: pos for ticker, pos in positions.items() @@ -112,7 +153,7 @@ def notify_status( ) # 포지션 없어도 레짐 정보는 전송 - header = f"📊 [{now} 현황]\n{regime_line}{budget_info}" + header = f"📊 [{now} 현황]\n{regime_line}{fng_line}{budget_info}" if not long_positions: _send(header + "1h+ 보유 포지션 없음") diff --git a/core/strategy.py b/core/strategy.py index 00867c7..1e06f8f 100644 --- a/core/strategy.py +++ b/core/strategy.py @@ -23,6 +23,7 @@ import time import pyupbit +from .fng import FNG_MIN_ENTRY, is_entry_allowed from .market import get_current_price from .market_regime import get_regime from .notify import notify_signal @@ -122,9 +123,14 @@ def _check_vol_spike(ticker: str, vol_mult: float) -> bool: def should_buy(ticker: str) -> bool: """Volume Lead 전략. - 1단계: 거래량 급증 + 2h 횡보 → 신호가 기록 - 2단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입 + 1단계: F&G 필터 — 공포탐욕지수 < FNG_MIN_ENTRY(41)이면 즉시 차단 + 2단계: 거래량 급증 + 2h 횡보 → 신호가 기록 + 3단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입 """ + # ── F&G 진입 필터 ───────────────────────────────────── + if not is_entry_allowed(): + return False + regime = get_regime() vol_mult = regime["vol_mult"] @@ -177,11 +183,13 @@ def should_buy(ticker: str) -> bool: entry_thr = _calc_entry_threshold(ratio) _accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": ratio} + from .fng import get_fng + fng_now = get_fng() logger.info( f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}원 " - f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}%)" + f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}% | F&G={fng_now})" ) - notify_signal(ticker, current, ratio) + notify_signal(ticker, current, ratio, fng=fng_now) return False # 신호 첫 발생 시는 진입 안 함 # ── 신호 있음: 상승 확인 → 진입 ───────────────────────── diff --git a/core/trader.py b/core/trader.py index 656d0c5..a662ecf 100644 --- a/core/trader.py +++ b/core/trader.py @@ -480,8 +480,10 @@ def buy(ticker: str) -> bool: f"{prefix}[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | " f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}" ) + from .fng import get_fng notify_buy(ticker, actual_price, amount, order_krw, - max_budget=MAX_BUDGET, per_position=PER_POSITION) + max_budget=MAX_BUDGET, per_position=PER_POSITION, + fng=get_fng()) return True except Exception as e: logger.error(f"매수 예외 {ticker}: {e}") diff --git a/daemon/runner.py b/daemon/runner.py index e963dce..27605b1 100644 --- a/daemon/runner.py +++ b/daemon/runner.py @@ -6,6 +6,7 @@ import threading import time from core import trader +from core.fng import FNG_MIN_ENTRY, get_fng from core.market import get_top_tickers from core.market_regime import get_regime from core.strategy import get_active_signals, should_buy @@ -73,8 +74,18 @@ def run_scanner() -> None: time.sleep(SCAN_INTERVAL) continue + # F&G 진입 필터 로그 (should_buy 내부에서 차단하지만 스캔 전 상태 기록) + fv = get_fng() + fng_label = ( + "극탐욕" if fv >= 76 else "탐욕" if fv >= 56 else + "중립" if fv >= 46 else "약공포" if fv >= 41 else + "공포" if fv >= 26 else "극공포" + ) + if fv < FNG_MIN_ENTRY: + logger.info(f"[F&G차단] F&G={fv} ({fng_label}) < {FNG_MIN_ENTRY} — 이번 스캔 진입 없음") + tickers = get_top_tickers() - logger.info(f"스캔 시작: {len(tickers)}개 종목") + logger.info(f"스캔 시작: {len(tickers)}개 종목 | F&G={fv}({fng_label})") for ticker in tickers: # 이미 보유 중인 종목 제외 diff --git a/fng_1y_backtest.py b/fng_1y_backtest.py new file mode 100644 index 0000000..8976a02 --- /dev/null +++ b/fng_1y_backtest.py @@ -0,0 +1,315 @@ +"""F&G 조건별 백테스트 - 1년치 데이터 (배치 수집) + +60일 극공포 편향을 제거하고 Bull/Neutral/Bear 다양한 구간 포함. +데이터: 1h 캔들 배치 수집 → 약 365일치 +""" +from __future__ import annotations + +import datetime, json, time, sys, urllib.request +import pandas as pd +import pyupbit +from dataclasses import dataclass + +TICKERS = [ + "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE", + "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK", + "KRW-SUI", "KRW-HBAR", + "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO", + "KRW-KAVA", "KRW-KNC", +] + +VOL_MULT = 2.0 +QUIET_2H = 2.0 +SIG_TO_H = 8 +MOM_THR = 3.0 +SIG_CANCEL = 3.0 +TRAIL_STOP = 0.015 +TIME_H = 24 +TIME_MIN = 3.0 + + +# ── 데이터 수집 ─────────────────────────────────────────────── +def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None: + """1h 캔들을 배치로 수집해 약 1년치 DataFrame 반환.""" + all_dfs = [] + end = datetime.datetime.now() + batch = 1440 # 60일치씩 + prev_oldest = None + + while True: + df = pyupbit.get_ohlcv( + ticker, interval="minute60", count=batch, + to=end.strftime("%Y-%m-%d %H:%M:%S"), + ) + if df is None or df.empty: + break + all_dfs.append(df) + oldest = df.index[0] + # 상장 초기 종목: oldest가 진전되지 않으면 더 오래된 데이터 없음 + if prev_oldest is not None and oldest >= prev_oldest: + break + prev_oldest = oldest + cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) + if oldest <= cutoff: + break + end = oldest + time.sleep(0.12) + + if not all_dfs: + return None + combined = pd.concat(all_dfs).sort_index() + combined = combined[~combined.index.duplicated(keep="last")] + cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) + return combined[combined.index >= cutoff] + + +def load_fng() -> dict[str, int]: + url = "https://api.alternative.me/fng/?limit=400&format=json" + with urllib.request.urlopen(url, timeout=10) as r: + data = json.loads(r.read()) + return { + datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"): + int(d["value"]) + for d in data["data"] + } + + +def fng_val(fng_map, ts): + return fng_map.get(ts.strftime("%Y-%m-%d"), 50) + + +# ── 시뮬레이션 ──────────────────────────────────────────────── +@dataclass +class Trade: + pnl: float + h: int + fng: int + exit: str + + +def simulate(df, fng_map, fng_lo=None, fng_hi=None) -> list[Trade]: + closes = df["close"].values + vols = df["volume"].values + idx = df.index + trades: list[Trade] = [] + sig_px = sig_i = None + pos_buy = pos_peak = pos_i = pos_fng = None + + for i in range(7, len(closes) - max(TIME_H + 4, 10)): + if pos_buy is not None: + cur = closes[i] + if cur > pos_peak: + pos_peak = cur + if (pos_peak - cur) / pos_peak >= TRAIL_STOP: + trades.append(Trade((cur - pos_buy) / pos_buy * 100, + i - pos_i, pos_fng, "trail")) + pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None + continue + if i - pos_i >= TIME_H: + pnl = (cur - pos_buy) / pos_buy * 100 + if pnl < TIME_MIN: + trades.append(Trade(pnl, i - pos_i, pos_fng, "time")) + pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None + continue + continue + + if sig_px is not None: + if i - sig_i > SIG_TO_H: + sig_px = sig_i = None + elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL: + sig_px = sig_i = None + + if sig_px is None: + vol_avg = vols[i - 6:i - 1].mean() + if vol_avg <= 0: + continue + if vols[i - 1] / vol_avg >= VOL_MULT: + if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H: + sig_px = closes[i] + sig_i = i + continue + + fv = fng_val(fng_map, idx[i]) + if fng_lo is not None and fv < fng_lo: + continue + if fng_hi is not None and fv > fng_hi: + continue + + if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR: + pos_buy = pos_peak = closes[i] + pos_i = i + pos_fng = fv + sig_px = sig_i = None + + return trades + + +def stats(trades): + if not trades: + return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0, + avg_win=0, avg_loss=0, max_dd=0) + wins = [t for t in trades if t.pnl > 0] + losses = [t for t in trades if t.pnl <= 0] + aw = sum(t.pnl for t in wins) / len(wins) if wins else 0 + al = sum(t.pnl for t in losses) / len(losses) if losses else 0 + cum = pk = max_dd = 0.0 + for t in trades: + cum += t.pnl + if cum > pk: pk = cum + if pk - cum > max_dd: max_dd = pk - cum + return dict( + n=len(trades), wr=len(wins) / len(trades) * 100, + avg_pnl=sum(t.pnl for t in trades) / len(trades), + total_pnl=sum(t.pnl for t in trades), + rr=abs(aw / al) if al else 0, + avg_win=aw, avg_loss=al, max_dd=max_dd, + ) + + +def main(): + print("F&G 데이터 로드...") + fng_map = load_fng() + + # F&G 연간 분포 출력 + from collections import Counter + zone_cnt = Counter() + for v in fng_map.values(): + if v <= 25: zone_cnt["극공포(0~25)"] += 1 + elif v <= 45: zone_cnt["공포(26~45)"] += 1 + elif v <= 55: zone_cnt["중립(46~55)"] += 1 + elif v <= 75: zone_cnt["탐욕(56~75)"] += 1 + else: zone_cnt["극탐욕(76~100)"] += 1 + total_days = sum(zone_cnt.values()) + print(f" 1년 F&G 분포 ({total_days}일):") + for k, v in sorted(zone_cnt.items()): + bar = "█" * (v // 5) + print(f" {k:<14} {v:>3}일 ({v/total_days*100:>4.1f}%) {bar}") + + print(f"\n종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...") + datasets = {} + for i, tk in enumerate(TICKERS): + try: + df = fetch_1y(tk, total_days=365) + if df is not None and len(df) > 100: + datasets[tk] = df + sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ") + except Exception as e: + sys.stderr.write(f"\r {tk} 실패: {e} ") + sys.stderr.write("\n") + print(f" 완료: {len(datasets)}개 종목\n") + + # ── 전체 기간 F&G 구간별 성과 ──────────────────────────── + CONFIGS = [ + (None, None, "필터 없음 (전체)"), + (None, 25, "극공포만 (0~25)"), + (26, 45, "공포만 (26~45)"), + (46, 55, "중립만 (46~55)"), + (56, 100, "탐욕+ (56~100)"), + (46, 100, "중립 이상 (46~100)"), + (26, 100, "공포 이상 (26~100)"), + ] + + print("=" * 78) + print(" F&G 조건별 성과 - 1년치 (1h 캔들 / 모멘텀 / 스탑1.5%)") + print("=" * 78) + print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} " + f"{'손익비':>6} {'총PnL':>9} {'MaxDD':>7}") + print(" " + "-" * 72) + + all_results = {} + for lo, hi, label in CONFIGS: + all_trades = [] + for df in datasets.values(): + all_trades.extend(simulate(df, fng_map, lo, hi)) + s = stats(all_trades) + all_results[label] = (s, all_trades) + if s["n"] == 0: + print(f" {label:<26} 거래 없음 (해당 구간 진입 기회 없음)") + continue + sign = "+" if s["total_pnl"] > 0 else "" + print( + f" {label:<26} {s['n']:>5}건 {s['wr']:>5.1f}% " + f"{s['avg_pnl']:>+7.3f}% {s['rr']:>5.2f} " + f"{sign}{s['total_pnl']:>8.1f}% -{s['max_dd']:>5.1f}%" + ) + + # ── 분기별 성과 (계절성) ────────────────────────────────── + print() + print(" 분기별 성과 (전체 필터 없음 기준):") + base_trades = all_results["필터 없음 (전체)"][1] + for df in datasets.values(): + pass # already computed + + # 전체 종목 합산 후 날짜로 분기 분리 + all_base = [] + for df in datasets.values(): + t_list = simulate(df, fng_map) + # trade에 날짜 정보 추가 + # simulate에서 idx를 참조하지 않으므로 재계산 + all_base.extend(t_list) + + # F&G 수치별 세분화 + print() + print(" F&G 10단위 구간별 세부 성과:") + print(f" {'구간':<16} {'건수':>5} {'승률':>6} {'평균PnL':>9} {'손익비':>6} {'의미'}") + print(" " + "-" * 65) + fng_zones_detail = [ + (0, 10, "극단 공포(0~10)"), + (11, 20, "극단 공포(11~20)"), + (21, 30, "극공포(21~30)"), + (31, 40, "공포(31~40)"), + (41, 50, "약공포(41~50)"), + (51, 60, "약탐욕(51~60)"), + (61, 75, "탐욕(61~75)"), + (76, 100, "극탐욕(76~100)"), + ] + base_all = all_results["필터 없음 (전체)"][1] + for lo, hi, name in fng_zones_detail: + sub = [t for t in base_all if lo <= t.fng <= hi] + if not sub: + continue + s = stats(sub) + breakeven_wr = 1 / (1 + s["rr"]) * 100 if s["rr"] > 0 else 50 + profitable = "✅ 수익" if s["avg_pnl"] > 0 else ("⚠️ BEP 근접" if s["avg_pnl"] > -0.2 else "❌ 손실") + print( + f" {name:<16} {s['n']:>5}건 {s['wr']:>5.1f}% " + f"{s['avg_pnl']:>+8.3f}% {s['rr']:>5.2f} {profitable}" + ) + + # ── 최적 F&G 구간 요약 ─────────────────────────────────── + print() + best = max( + [(label, s) for label, (s, _) in all_results.items() if s["n"] >= 50], + key=lambda x: x[1]["avg_pnl"], + ) + print(f" ★ 최적 구간: {best[0]} " + f"(거래 {best[1]['n']}건 | 승률 {best[1]['wr']:.1f}% | " + f"평균PnL {best[1]['avg_pnl']:+.3f}%)") + + # ── DB 저장 ────────────────────────────────────────────── + try: + from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk + ensure_tables() + params = { + "tickers": len(datasets), "days": 365, "candle": "1h", + "trail_stop": 0.015, "mom_thr": 3.0, "vol_mult": 2.0, + } + run_id = insert_run( + run_name="fng_1y_backtest", + description="F&G 구간별 성과 1년치 백테스트 (1h 캔들 / 모멘텀 / 스탑1.5%)", + params=params, + ) + for lo, hi, label in CONFIGS: + if label in all_results: + s, trades = all_results[label] + if s["n"] > 0: + insert_result(run_id, label, s, lo, hi) + # 전체 거래는 per-ticker 분리 없이 일괄 저장 (run_id+label로 구분) + insert_trades_bulk(run_id, label, "_all_", trades) + print(f"\n [DB 저장 완료] run_id: {run_id}") + except Exception as e: + print(f"\n [DB 저장 실패] {e}") + + +if __name__ == "__main__": + main() diff --git a/fng_adaptive_backtest.py b/fng_adaptive_backtest.py new file mode 100644 index 0000000..e81ef3a --- /dev/null +++ b/fng_adaptive_backtest.py @@ -0,0 +1,317 @@ +"""F&G 구간별 맞춤 파라미터 백테스트 + +핵심 가설: + 극공포 구간은 시장이 불안정 → 더 엄격한 진입 기준 필요 + 탐욕 구간은 상승 모멘텀이 지속 → 다소 느슨한 기준도 가능 + +테스트 방식: + 각 F&G 구간마다 다른 파라미터 조합을 적용하고 성과 비교. + 구간별 최적 파라미터 도출 → 실제 전략에 반영 + +결과를 Oracle DB에 저장. +데이터: 1년치 1h 캔들 (배치 수집) +""" +from __future__ import annotations + +import datetime +import json +import sys +import time +import urllib.request + +import pandas as pd +import pyupbit +from dataclasses import dataclass + +# ── DB 저장 ───────────────────────────────────────────────── +try: + from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk + DB_ENABLED = True +except Exception as e: + print(f" [DB 비활성화] {e}") + DB_ENABLED = False + +TICKERS = [ + "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE", + "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK", + "KRW-SUI", "KRW-HBAR", + "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO", + "KRW-KAVA", "KRW-KNC", +] + +# ── F&G 구간별 파라미터 조합 ───────────────────────────────── +# (fng_lo, fng_hi, label, vol_mult, quiet_2h, sig_to_h, mom_thr, sig_cancel, trail_stop, time_h, time_min) +ADAPTIVE_CONFIGS = [ + # 기준선 (F&G 무관, 단일 파라미터) + (None, None, "기준선(전체/현행파라미터)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0), + + # ── 극공포 (0~25) 구간 ── 엄격한 기준 ── + # 극공포에서는 변동성 급증이 흔함 → 볼륨 기준 올리고, 모멘텀 강화 + (None, 25, "극공포/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0), + (None, 25, "극공포/엄격(3x vol+4%mom)", 3.0, 2.0, 8, 4.0, 3.0, 0.010, 24, 3.0), + (None, 25, "극공포/매우엄격(3x+5%+1%스탑)", 3.0, 2.0, 6, 5.0, 3.0, 0.010, 24, 3.0), + (None, 25, "극공포/넓은스탑(2x+3%+2%스탑)", 2.0, 2.0, 8, 3.0, 3.0, 0.020, 24, 3.0), + (None, 25, "극공포/짧은신호(3x+4%+4h유효)", 3.0, 2.0, 4, 4.0, 3.0, 0.015, 24, 3.0), + + # ── 공포 (26~45) ── 중간 기준 ── + (26, 45, "공포/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0), + (26, 45, "공포/약강화(2.5x vol+3.5%mom)", 2.5, 2.0, 8, 3.5, 3.0, 0.015, 24, 3.0), + (26, 45, "공포/엄격(3x vol+4%mom)", 3.0, 2.0, 8, 4.0, 3.0, 0.010, 24, 3.0), + + # ── 중립 이상 (46~100) ── 완화된 기준 가능 ── + (46, None, "중립이상/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0), + (46, None, "중립이상/완화(1.5x vol+2.5%mom)",1.5, 2.0, 8, 2.5, 3.0, 0.015, 24, 3.0), + (46, None, "중립이상/엄격(2.5x+3.5%)", 2.5, 2.0, 8, 3.5, 3.0, 0.015, 24, 3.0), + + # ── 탐욕+ (56~100) ── + (56, None, "탐욕이상/기준", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0), + (56, None, "탐욕이상/완화(1.5x+2.5%)", 1.5, 2.0, 8, 2.5, 3.0, 0.015, 24, 3.0), +] + + +# ── 데이터 수집 ────────────────────────────────────────────── +def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None: + all_dfs = [] + end = datetime.datetime.now() + batch = 1440 # 60일치씩 + prev_oldest = None + + while True: + df = pyupbit.get_ohlcv( + ticker, interval="minute60", count=batch, + to=end.strftime("%Y-%m-%d %H:%M:%S"), + ) + if df is None or df.empty: + break + all_dfs.append(df) + oldest = df.index[0] + # 상장 초기 종목: oldest가 진전되지 않으면 더 이상 오래된 데이터 없음 + if prev_oldest is not None and oldest >= prev_oldest: + break + prev_oldest = oldest + cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) + if oldest <= cutoff: + break + end = oldest + time.sleep(0.12) + + if not all_dfs: + return None + combined = pd.concat(all_dfs).sort_index() + combined = combined[~combined.index.duplicated(keep="last")] + cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) + return combined[combined.index >= cutoff] + + +def load_fng() -> dict[str, int]: + url = "https://api.alternative.me/fng/?limit=400&format=json" + with urllib.request.urlopen(url, timeout=10) as r: + data = json.loads(r.read()) + return { + datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"): + int(d["value"]) + for d in data["data"] + } + + +def fng_val(fng_map, ts) -> int: + return fng_map.get(ts.strftime("%Y-%m-%d"), 50) + + +# ── 시뮬레이션 ────────────────────────────────────────────── +@dataclass +class Trade: + pnl: float + h: int + fng: int + exit: str + + +def simulate( + df, fng_map, + fng_lo=None, fng_hi=None, + vol_mult=2.0, quiet_2h=2.0, sig_to_h=8, + mom_thr=3.0, sig_cancel=3.0, trail_stop=0.015, + time_h=24, time_min=3.0, +) -> list[Trade]: + closes = df["close"].values + vols = df["volume"].values + idx = df.index + trades: list[Trade] = [] + sig_px = sig_i = None + pos_buy = pos_peak = pos_i = pos_fng = None + + for i in range(7, len(closes) - max(time_h + 4, 10)): + if pos_buy is not None: + cur = closes[i] + if cur > pos_peak: + pos_peak = cur + if (pos_peak - cur) / pos_peak >= trail_stop: + trades.append(Trade((cur - pos_buy) / pos_buy * 100, + i - pos_i, pos_fng, "trail")) + pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None + continue + if i - pos_i >= time_h: + pnl = (cur - pos_buy) / pos_buy * 100 + if pnl < time_min: + trades.append(Trade(pnl, i - pos_i, pos_fng, "time")) + pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None + continue + continue + + if sig_px is not None: + if i - sig_i > sig_to_h: + sig_px = sig_i = None + elif (closes[i] - sig_px) / sig_px * 100 < -sig_cancel: + sig_px = sig_i = None + + if sig_px is None: + vol_avg = vols[i - 6:i - 1].mean() + if vol_avg <= 0: + continue + if vols[i - 1] / vol_avg >= vol_mult: + if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < quiet_2h: + sig_px = closes[i] + sig_i = i + continue + + fv = fng_val(fng_map, idx[i]) + if fng_lo is not None and fv < fng_lo: + continue + if fng_hi is not None and fv > fng_hi: + continue + + if (closes[i] - sig_px) / sig_px * 100 >= mom_thr: + pos_buy = pos_peak = closes[i] + pos_i = i + pos_fng = fv + sig_px = sig_i = None + + return trades + + +def stats(trades): + if not trades: + return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0, + avg_win=0, avg_loss=0, max_dd=0) + wins = [t for t in trades if t.pnl > 0] + losses = [t for t in trades if t.pnl <= 0] + aw = sum(t.pnl for t in wins) / len(wins) if wins else 0 + al = sum(t.pnl for t in losses) / len(losses) if losses else 0 + cum = pk = max_dd = 0.0 + for t in trades: + cum += t.pnl + if cum > pk: pk = cum + if pk - cum > max_dd: max_dd = pk - cum + return dict( + n=len(trades), wr=len(wins) / len(trades) * 100, + avg_pnl=sum(t.pnl for t in trades) / len(trades), + total_pnl=sum(t.pnl for t in trades), + rr=abs(aw / al) if al else 0, + avg_win=aw, avg_loss=al, max_dd=max_dd, + ) + + +def main(): + print("F&G 데이터 로드...") + fng_map = load_fng() + + print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...") + datasets = {} + for i, tk in enumerate(TICKERS): + try: + df = fetch_1y(tk, total_days=365) + if df is not None and len(df) > 100: + datasets[tk] = df + sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ") + except Exception as e: + sys.stderr.write(f"\r {tk} 실패: {e} ") + sys.stderr.write("\n") + print(f" 완료: {len(datasets)}개 종목\n") + + # ── DB 준비 ─────────────────────────────────────────── + run_id = None + if DB_ENABLED: + ensure_tables() + params = { + "tickers": len(datasets), + "days": 365, + "candle": "1h", + "stop": "trail+time", + } + run_id = insert_run( + run_name="fng_adaptive_1y", + description="F&G 구간별 맞춤 파라미터 1년 백테스트", + params=params, + ) + print(f" DB run_id: {run_id}\n") + + # ── 결과 출력 ───────────────────────────────────────── + print("=" * 92) + print(" F&G 구간별 맞춤 파라미터 성과 비교 (1년치 / 1h 캔들)") + print("=" * 92) + print(f" {'조건':<42} {'거래':>5} {'승률':>6} {'평균PnL':>8} " + f"{'손익비':>5} {'총PnL':>9} {'MaxDD':>7}") + print(" " + "-" * 86) + + best_by_zone: dict[str, tuple] = {} + + for cfg in ADAPTIVE_CONFIGS: + fng_lo, fng_hi, label, vol_mult, quiet_2h, sig_to_h, mom_thr, sig_cancel, trail_stop, time_h, time_min = cfg + + all_trades: list[Trade] = [] + per_ticker: dict[str, list[Trade]] = {} + for tk, df in datasets.items(): + t = simulate( + df, fng_map, + fng_lo=fng_lo, fng_hi=fng_hi, + vol_mult=vol_mult, quiet_2h=quiet_2h, sig_to_h=sig_to_h, + mom_thr=mom_thr, sig_cancel=sig_cancel, trail_stop=trail_stop, + time_h=time_h, time_min=time_min, + ) + all_trades.extend(t) + per_ticker[tk] = t + + s = stats(all_trades) + + # 구분선 (기준선 다음) + if label == "극공포/기준(2x vol+3%mom)": + print() + + if s["n"] == 0: + print(f" {label:<42} 거래 없음") + continue + + marker = " ★" if s["avg_pnl"] > 0 else "" + print( + f" {label:<42} {s['n']:>5}건 {s['wr']:>5.1f}% " + f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} " + f"{s['total_pnl']:>+8.1f}% -{s['max_dd']:>5.1f}%{marker}" + ) + + # DB 저장 + if DB_ENABLED and run_id: + insert_result(run_id, label, s, fng_lo, fng_hi) + for tk, t_list in per_ticker.items(): + insert_trades_bulk(run_id, label, tk, t_list) + + # 구간별 최고 avg_pnl 추적 + zone_key = label.split("/")[0] + if zone_key not in best_by_zone or s["avg_pnl"] > best_by_zone[zone_key][1]: + best_by_zone[zone_key] = (label, s["avg_pnl"], s) + + # ── 구간별 최적 요약 ────────────────────────────────── + print() + print(" ★ 구간별 최적 파라미터:") + print(f" {'구간':<14} {'최적 조건':<42} {'거래':>5} {'승률':>6} {'평균PnL':>8}") + print(" " + "-" * 72) + for zone, (label, best_pnl, s) in best_by_zone.items(): + if s["n"] > 0: + print(f" {zone:<14} {label:<42} {s['n']:>5}건 {s['wr']:>5.1f}% {best_pnl:>+7.3f}%") + + if DB_ENABLED and run_id: + print(f"\n [DB 저장 완료] run_id: {run_id}") + + +if __name__ == "__main__": + main() diff --git a/fng_sim_comparison.py b/fng_sim_comparison.py new file mode 100644 index 0000000..8cc1197 --- /dev/null +++ b/fng_sim_comparison.py @@ -0,0 +1,382 @@ +"""F&G 필터 전후 수익 비교 시뮬레이션 + +필터 없음 vs F&G ≥ 41 필터 적용 시 1년치 성과를 직접 비교. + +표시: + - 거래 수, 승률, 평균 PnL, 총 누적 PnL + - 거래당 고정 자본 100만 원 기준 KRW 환산 손익 + - 월별 손익 흐름 (계절성 확인) + - 극공포 차단 일수 통계 + +결과는 Oracle DB(backtest_results)에 저장. +데이터: 1년치 1h 캔들 (배치 수집) +""" +from __future__ import annotations + +import datetime +import json +import sys +import time +import urllib.request +from dataclasses import dataclass + +import pandas as pd +import pyupbit + +# ── DB 저장 ───────────────────────────────────────────────── +try: + from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk + DB_ENABLED = True +except Exception as e: + print(f" [DB 비활성화] {e}") + DB_ENABLED = False + +TICKERS = [ + "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE", + "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK", + "KRW-SUI", "KRW-HBAR", + "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO", + "KRW-KAVA", "KRW-KNC", +] + +CAPITAL_PER_TRADE = 1_000_000 # 거래당 고정 자본 (KRW) + +# 전략 파라미터 (현행) +VOL_MULT = 2.0 +QUIET_2H = 2.0 +SIG_TO_H = 8 +MOM_THR = 3.0 +SIG_CANCEL = 3.0 +TRAIL_STOP = 0.015 +TIME_H = 24 +TIME_MIN = 3.0 + +FNG_MIN = 41 # 이 값 미만이면 진입 차단 + + +# ── 데이터 수집 ────────────────────────────────────────────── +def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None: + all_dfs = [] + end = datetime.datetime.now() + batch = 1440 + prev_oldest = None + + while True: + df = pyupbit.get_ohlcv( + ticker, interval="minute60", count=batch, + to=end.strftime("%Y-%m-%d %H:%M:%S"), + ) + if df is None or df.empty: + break + all_dfs.append(df) + oldest = df.index[0] + if prev_oldest is not None and oldest >= prev_oldest: + break + prev_oldest = oldest + cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) + if oldest <= cutoff: + break + end = oldest + time.sleep(0.12) + + if not all_dfs: + return None + combined = pd.concat(all_dfs).sort_index() + combined = combined[~combined.index.duplicated(keep="last")] + cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days) + return combined[combined.index >= cutoff] + + +def load_fng() -> dict[str, int]: + url = "https://api.alternative.me/fng/?limit=400&format=json" + with urllib.request.urlopen(url, timeout=10) as r: + data = json.loads(r.read()) + return { + datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"): + int(d["value"]) + for d in data["data"] + } + + +def fng_val(fng_map, ts) -> int: + return fng_map.get(ts.strftime("%Y-%m-%d"), 50) + + +# ── 시뮬레이션 ────────────────────────────────────────────── +@dataclass +class Trade: + pnl: float + h: int + fng: int + exit: str + date: str # YYYY-MM + + +def simulate(df, fng_map, fng_min: int | None = None) -> list[Trade]: + closes = df["close"].values + vols = df["volume"].values + idx = df.index + trades: list[Trade] = [] + sig_px = sig_i = None + pos_buy = pos_peak = pos_i = pos_fng = None + + for i in range(7, len(closes) - max(TIME_H + 4, 10)): + if pos_buy is not None: + cur = closes[i] + if cur > pos_peak: + pos_peak = cur + if (pos_peak - cur) / pos_peak >= TRAIL_STOP: + trades.append(Trade( + (cur - pos_buy) / pos_buy * 100, + i - pos_i, pos_fng, "trail", + idx[i].strftime("%Y-%m"), + )) + pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None + continue + if i - pos_i >= TIME_H: + pnl = (cur - pos_buy) / pos_buy * 100 + if pnl < TIME_MIN: + trades.append(Trade( + pnl, i - pos_i, pos_fng, "time", + idx[i].strftime("%Y-%m"), + )) + pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None + continue + continue + + if sig_px is not None: + if i - sig_i > SIG_TO_H: + sig_px = sig_i = None + elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL: + sig_px = sig_i = None + + if sig_px is None: + vol_avg = vols[i - 6:i - 1].mean() + if vol_avg <= 0: + continue + if vols[i - 1] / vol_avg >= VOL_MULT: + if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H: + sig_px = closes[i] + sig_i = i + continue + + fv = fng_val(fng_map, idx[i]) + if fng_min is not None and fv < fng_min: + continue + + if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR: + pos_buy = pos_peak = closes[i] + pos_i = i + pos_fng = fv + sig_px = sig_i = None + + return trades + + +def stats(trades: list[Trade]) -> dict: + if not trades: + return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0, + avg_win=0, avg_loss=0, max_dd=0, krw_total=0) + wins = [t for t in trades if t.pnl > 0] + losses = [t for t in trades if t.pnl <= 0] + aw = sum(t.pnl for t in wins) / len(wins) if wins else 0 + al = sum(t.pnl for t in losses) / len(losses) if losses else 0 + cum = pk = max_dd = 0.0 + for t in trades: + cum += t.pnl + if cum > pk: pk = cum + if pk - cum > max_dd: max_dd = pk - cum + total_pnl = sum(t.pnl for t in trades) + return dict( + n=len(trades), wr=len(wins) / len(trades) * 100, + avg_pnl=total_pnl / len(trades), + total_pnl=total_pnl, + rr=abs(aw / al) if al else 0, + avg_win=aw, avg_loss=al, max_dd=max_dd, + krw_total=total_pnl / 100 * CAPITAL_PER_TRADE, + ) + + +def monthly_pnl(trades: list[Trade]) -> dict[str, float]: + """월별 누적 PnL(%) 반환.""" + monthly: dict[str, float] = {} + for t in trades: + monthly[t.date] = monthly.get(t.date, 0) + t.pnl + return dict(sorted(monthly.items())) + + +def main(): + print("F&G 데이터 로드...") + fng_map = load_fng() + + # F&G 분포 + block_days = sum(1 for v in fng_map.values() if v < FNG_MIN) + total_days = len(fng_map) + print(f" 1년 F&G 분포: 진입차단(< {FNG_MIN}) = {block_days}일 / {total_days}일 " + f"({block_days/total_days*100:.1f}%)") + print(f" 진입허용(≥ {FNG_MIN}) = {total_days - block_days}일 ({(total_days-block_days)/total_days*100:.1f}%)\n") + + print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...") + datasets: dict[str, pd.DataFrame] = {} + for i, tk in enumerate(TICKERS): + try: + df = fetch_1y(tk, total_days=365) + if df is not None and len(df) > 100: + datasets[tk] = df + sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ") + except Exception as e: + sys.stderr.write(f"\r {tk} 실패: {e} ") + sys.stderr.write("\n") + print(f" 완료: {len(datasets)}개 종목\n") + + # ── 두 가지 조건 시뮬레이션 ────────────────────────────── + # A: 필터 없음 (현행) + # B: F&G ≥ 41 (신규) + all_trades_A: list[Trade] = [] + all_trades_B: list[Trade] = [] + per_ticker_A: dict[str, list[Trade]] = {} + per_ticker_B: dict[str, list[Trade]] = {} + + for tk, df in datasets.items(): + ta = simulate(df, fng_map, fng_min=None) + tb = simulate(df, fng_map, fng_min=FNG_MIN) + all_trades_A.extend(ta) + all_trades_B.extend(tb) + per_ticker_A[tk] = ta + per_ticker_B[tk] = tb + + sa = stats(all_trades_A) + sb = stats(all_trades_B) + + # ── 결과 출력 ───────────────────────────────────────────── + print("=" * 80) + print(f" F&G 필터 전후 비교 (1년치 / {len(datasets)}개 종목 / 1h캔들 / 자본 {CAPITAL_PER_TRADE:,}원/거래)") + print("=" * 80) + print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} " + f"{'손익비':>5} {'총PnL':>8} {'MaxDD':>7} {'KRW손익':>14}") + print(" " + "-" * 76) + + for label, s in [("필터 없음 (현행)", sa), (f"F&G≥{FNG_MIN} 필터 (신규)", sb)]: + krw_str = f"{s['krw_total']:>+,.0f}원" + print( + f" {label:<26} {s['n']:>5}건 {s['wr']:>5.1f}% " + f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} " + f"{s['total_pnl']:>+7.1f}% -{s['max_dd']:>5.1f}% {krw_str:>14}" + ) + + diff_trades = sb["n"] - sa["n"] + diff_krw = sb["krw_total"] - sa["krw_total"] + diff_wr = sb["wr"] - sa["wr"] + print(f"\n 변화: 거래수 {diff_trades:+d}건 | 승률 {diff_wr:+.1f}%p | " + f"KRW손익 {diff_krw:>+,.0f}원") + + # ── 월별 손익 흐름 ──────────────────────────────────────── + print() + print(" 월별 손익 비교 (필터없음 vs F&G≥41):") + print(f" {'월':>8} {'차단일수':>6} {'필터없음':>9} {'F&G필터':>9} {'개선':>8} {'누적(필터)':>12}") + print(" " + "-" * 62) + + ma = monthly_pnl(all_trades_A) + mb = monthly_pnl(all_trades_B) + all_months = sorted(set(ma.keys()) | set(mb.keys())) + + cum_b = 0.0 + for m in all_months: + pa = ma.get(m, 0.0) + pb = mb.get(m, 0.0) + cum_b += pb + diff = pb - pa + # 해당 월 차단 일수 + yr, mo = int(m[:4]), int(m[5:]) + blocked = sum( + 1 for d, v in fng_map.items() + if d.startswith(m) and v < FNG_MIN + ) + bar = "▓" * min(int(abs(pb) / 3), 12) if pb > 0 else "░" * min(int(abs(pb) / 3), 12) + sign = "+" if pb > 0 else "" + diff_sign = "▲" if diff > 0 else ("▼" if diff < 0 else "=") + print( + f" {m} {blocked:>4}일차단 " + f"{pa:>+8.1f}% {sign}{pb:>8.1f}% " + f"{diff_sign}{abs(diff):>6.1f}% {cum_b:>+10.1f}%" + ) + + # ── 종목별 비교 (상위/하위) ─────────────────────────────── + print() + print(" 종목별 성과 비교 (필터없음 vs F&G≥41):") + print(f" {'종목':<14} {'현행거래':>6} {'현행PnL':>8} {'필터거래':>7} {'필터PnL':>8} {'개선':>8}") + print(" " + "-" * 58) + + ticker_rows = [] + for tk in sorted(datasets.keys()): + ta_list = per_ticker_A.get(tk, []) + tb_list = per_ticker_B.get(tk, []) + pa = sum(t.pnl for t in ta_list) if ta_list else 0 + pb = sum(t.pnl for t in tb_list) if tb_list else 0 + ticker_rows.append((tk, len(ta_list), pa, len(tb_list), pb, pb - pa)) + + for row in sorted(ticker_rows, key=lambda x: x[5], reverse=True): + tk, na, pa, nb, pb, delta = row + mark = "▲" if delta > 1 else ("▼" if delta < -1 else " =") + print( + f" {tk:<14} {na:>6}건 {pa:>+7.1f}% {nb:>6}건 {pb:>+7.1f}% " + f"{mark}{abs(delta):>6.1f}%" + ) + + # ── 극공포 차단 효과 분석 ───────────────────────────────── + print() + print(f" F&G < {FNG_MIN} 구간(차단) 거래 성과 분석:") + blocked_trades = [t for t in all_trades_A if t.fng < FNG_MIN] + if blocked_trades: + sb2 = stats(blocked_trades) + print(f" → 차단된 거래 수: {sb2['n']}건") + print(f" → 차단 거래 승률: {sb2['wr']:.1f}%") + print(f" → 차단 거래 평균 PnL: {sb2['avg_pnl']:+.3f}%") + print(f" → 차단으로 절약된 손실: {sb2['krw_total']:>+,.0f}원 " + f"({CAPITAL_PER_TRADE:,}원 × {sb2['n']}거래 기준)") + else: + print(" → 차단된 거래 없음") + + # ── 최적 임계값 확인 ───────────────────────────────────── + print() + print(f" F&G 임계값별 성과 비교 (현행 기준 비교):") + print(f" {'임계값':>8} {'거래':>5} {'승률':>6} {'평균PnL':>9} {'KRW손익':>14}") + print(" " + "-" * 52) + for thr in [25, 30, 35, 41, 45, 50]: + filtered = [t for t in all_trades_A if t.fng >= thr] + if not filtered: + continue + sf = stats(filtered) + marker = " ◀ 채택" if thr == FNG_MIN else "" + print( + f" {thr:>5}이상 {sf['n']:>5}건 {sf['wr']:>5.1f}% " + f"{sf['avg_pnl']:>+8.3f}% {sf['krw_total']:>+14,.0f}원{marker}" + ) + + # ── DB 저장 ─────────────────────────────────────────────── + if DB_ENABLED: + try: + ensure_tables() + params = { + "tickers": len(datasets), "days": 365, "candle": "1h", + "trail_stop": TRAIL_STOP, "mom_thr": MOM_THR, + "fng_min_new": FNG_MIN, "capital_per_trade": CAPITAL_PER_TRADE, + } + run_id = insert_run( + "fng_sim_comparison", + f"F&G 필터 전후 비교 시뮬레이션 (1년치 / F&G≥{FNG_MIN})", + params, + ) + insert_result(run_id, "필터 없음 (현행)", sa, None, None) + insert_result(run_id, f"F&G≥{FNG_MIN} 필터 (신규)", sb, FNG_MIN, None) + for tk, t_list in per_ticker_A.items(): + insert_trades_bulk(run_id, "필터없음", tk, t_list) + for tk, t_list in per_ticker_B.items(): + insert_trades_bulk(run_id, f"fng_ge{FNG_MIN}", tk, t_list) + print(f"\n [DB 저장 완료] run_id: {run_id}") + except Exception as e: + print(f"\n [DB 저장 실패] {e}") + + +if __name__ == "__main__": + main()