diff --git a/backtest_db.py b/backtest_db.py
new file mode 100644
index 0000000..32a800d
--- /dev/null
+++ b/backtest_db.py
@@ -0,0 +1,247 @@
+"""백테스트 결과 Oracle DB 저장 모듈.
+
+테이블:
+ backtest_runs - 실행 단위 (실행시각, 설명, 파라미터)
+ backtest_results - 조건별 집계 (run_id + label)
+ backtest_trade_log - 개별 거래 (run_id + label + 종목 + pnl + fng + ...)
+"""
+from __future__ import annotations
+
+import json
+import os
+from contextlib import contextmanager
+from datetime import datetime
+from pathlib import Path
+from typing import Generator
+
+import oracledb
+from dotenv import load_dotenv
+
+load_dotenv(dotenv_path=Path(__file__).parent / ".env")
+
+_pool: oracledb.ConnectionPool | None = None
+
+
+def _get_pool() -> oracledb.ConnectionPool:
+ global _pool
+ if _pool is None:
+ kwargs: dict = dict(
+ user=os.environ["ORACLE_USER"],
+ password=os.environ["ORACLE_PASSWORD"],
+ dsn=os.environ["ORACLE_DSN"],
+ min=1,
+ max=3,
+ increment=1,
+ )
+ wallet = os.environ.get("ORACLE_WALLET")
+ if wallet:
+ kwargs["config_dir"] = wallet
+ _pool = oracledb.create_pool(**kwargs)
+ return _pool
+
+
+@contextmanager
+def _conn() -> Generator[oracledb.Connection, None, None]:
+ pool = _get_pool()
+ conn = pool.acquire()
+ try:
+ yield conn
+ conn.commit()
+ except Exception:
+ conn.rollback()
+ raise
+ finally:
+ pool.release(conn)
+
+
+# ── DDL ────────────────────────────────────────────────────────
+_DDL_RUNS = """
+CREATE TABLE backtest_runs (
+ run_id VARCHAR2(36) DEFAULT SYS_GUID() PRIMARY KEY,
+ run_name VARCHAR2(200) NOT NULL,
+ description VARCHAR2(1000),
+ params_json CLOB,
+ created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
+)
+"""
+
+_DDL_RESULTS = """
+CREATE TABLE backtest_results (
+ id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
+ run_id VARCHAR2(36) NOT NULL,
+ label VARCHAR2(100) NOT NULL,
+ n_trades NUMBER,
+ win_rate NUMBER(6,3),
+ avg_pnl NUMBER(10,4),
+ total_pnl NUMBER(12,4),
+ rr NUMBER(8,4),
+ avg_win NUMBER(10,4),
+ avg_loss NUMBER(10,4),
+ max_dd NUMBER(10,4),
+ fng_lo NUMBER,
+ fng_hi NUMBER,
+ created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
+ CONSTRAINT fk_br_run FOREIGN KEY (run_id) REFERENCES backtest_runs(run_id)
+)
+"""
+
+_DDL_TRADES = """
+CREATE TABLE backtest_trade_log (
+ id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
+ run_id VARCHAR2(36) NOT NULL,
+ label VARCHAR2(100),
+ ticker VARCHAR2(20),
+ pnl NUMBER(10,4),
+ hold_h NUMBER,
+ fng_val NUMBER,
+ exit_type VARCHAR2(10),
+ created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL,
+ CONSTRAINT fk_bt_run FOREIGN KEY (run_id) REFERENCES backtest_runs(run_id)
+)
+"""
+
+
+def ensure_tables() -> None:
+ """백테스트 테이블이 없으면 생성."""
+ with _conn() as conn:
+ cur = conn.cursor()
+ for tbl_name, ddl in [
+ ("BACKTEST_RUNS", _DDL_RUNS),
+ ("BACKTEST_RESULTS", _DDL_RESULTS),
+ ("BACKTEST_TRADE_LOG", _DDL_TRADES),
+ ]:
+ cur.execute(
+ "SELECT COUNT(*) FROM user_tables WHERE table_name=:1", [tbl_name]
+ )
+ if cur.fetchone()[0] == 0:
+ cur.execute(ddl)
+ print(f" {tbl_name} 테이블 생성 완료")
+
+
+# ── 삽입 헬퍼 ──────────────────────────────────────────────────
+
+def insert_run(run_name: str, description: str = "", params: dict | None = None) -> str:
+ """새 백테스트 실행 레코드 삽입. run_id 반환."""
+ sql = """
+ INSERT INTO backtest_runs (run_name, description, params_json)
+ VALUES (:rname, :rdesc, :rparams)
+ RETURNING run_id INTO :out_id
+ """
+ with _conn() as conn:
+ cur = conn.cursor()
+ out = cur.var(oracledb.STRING)
+ cur.execute(sql, {
+ "rname": run_name,
+ "rdesc": description,
+ "rparams": json.dumps(params or {}, ensure_ascii=False),
+ "out_id": out,
+ })
+ return out.getvalue()[0]
+
+
+def insert_result(
+ run_id: str,
+ label: str,
+ stats: dict,
+ fng_lo: int | None = None,
+ fng_hi: int | None = None,
+) -> None:
+ """조건별 집계 결과 삽입."""
+ sql = """
+ INSERT INTO backtest_results
+ (run_id, label, n_trades, win_rate, avg_pnl, total_pnl,
+ rr, avg_win, avg_loss, max_dd, fng_lo, fng_hi)
+ VALUES
+ (:run_id, :label, :n, :wr, :avg_pnl, :total_pnl,
+ :rr, :avg_win, :avg_loss, :max_dd, :fng_lo, :fng_hi)
+ """
+ with _conn() as conn:
+ conn.cursor().execute(sql, {
+ "run_id": run_id,
+ "label": label,
+ "n": stats.get("n", 0),
+ "wr": round(stats.get("wr", 0), 3),
+ "avg_pnl": round(stats.get("avg_pnl", 0), 4),
+ "total_pnl": round(stats.get("total_pnl", 0), 4),
+ "rr": round(stats.get("rr", 0), 4),
+ "avg_win": round(stats.get("avg_win", 0), 4),
+ "avg_loss": round(stats.get("avg_loss", 0), 4),
+ "max_dd": round(stats.get("max_dd", 0), 4),
+ "fng_lo": fng_lo,
+ "fng_hi": fng_hi,
+ })
+
+
+def insert_trades_bulk(
+ run_id: str,
+ label: str,
+ ticker: str,
+ trades: list,
+) -> None:
+ """개별 거래 목록 일괄 삽입."""
+ if not trades:
+ return
+ sql = """
+ INSERT INTO backtest_trade_log
+ (run_id, label, ticker, pnl, hold_h, fng_val, exit_type)
+ VALUES (:run_id, :label, :ticker, :pnl, :hold_h, :fng_val, :exit_type)
+ """
+ rows = []
+ for t in trades:
+ rows.append({
+ "run_id": run_id,
+ "label": label,
+ "ticker": ticker,
+ "pnl": round(float(getattr(t, "pnl", 0)), 4),
+ "hold_h": int(getattr(t, "h", 0)),
+ "fng_val": int(getattr(t, "fng", 0)),
+ "exit_type": str(getattr(t, "exit", "")),
+ })
+ with _conn() as conn:
+ conn.cursor().executemany(sql, rows)
+
+
+# ── 조회 ───────────────────────────────────────────────────────
+
+def list_runs(limit: int = 20) -> list[dict]:
+ """최근 백테스트 실행 목록 반환."""
+ sql = """
+ SELECT run_id, run_name, description, created_at
+ FROM backtest_runs
+ ORDER BY created_at DESC
+ FETCH FIRST :n ROWS ONLY
+ """
+ with _conn() as conn:
+ cur = conn.cursor()
+ cur.execute(sql, {"n": limit})
+ rows = cur.fetchall()
+ return [
+ {"run_id": r[0], "run_name": r[1], "description": r[2],
+ "created_at": r[3].strftime("%Y-%m-%d %H:%M")}
+ for r in rows
+ ]
+
+
+def get_results(run_id: str) -> list[dict]:
+ """특정 run_id의 조건별 결과 반환."""
+ sql = """
+ SELECT label, n_trades, win_rate, avg_pnl, total_pnl,
+ rr, avg_win, avg_loss, max_dd, fng_lo, fng_hi
+ FROM backtest_results
+ WHERE run_id = :run_id
+ ORDER BY avg_pnl DESC
+ """
+ with _conn() as conn:
+ cur = conn.cursor()
+ cur.execute(sql, {"run_id": run_id})
+ cols = ["label", "n_trades", "win_rate", "avg_pnl", "total_pnl",
+ "rr", "avg_win", "avg_loss", "max_dd", "fng_lo", "fng_hi"]
+ return [dict(zip(cols, r)) for r in cur.fetchall()]
+
+
+if __name__ == "__main__":
+ print("백테스트 DB 테이블 확인/생성...")
+ ensure_tables()
+ print("완료. 최근 실행 목록:")
+ for r in list_runs(5):
+ print(f" {r['created_at']} {r['run_name']}")
diff --git a/core/fng.py b/core/fng.py
new file mode 100644
index 0000000..d27ef46
--- /dev/null
+++ b/core/fng.py
@@ -0,0 +1,71 @@
+"""공포탐욕지수(F&G) 조회 모듈.
+
+alternative.me API로 일일 F&G 값을 가져와 메모리에 캐시한다.
+캐시 TTL은 1시간 (F&G는 하루 1회 업데이트).
+
+환경변수:
+ FNG_MIN_ENTRY (기본값 41): 이 값 미만이면 진입 차단
+"""
+from __future__ import annotations
+
+import json
+import logging
+import os
+import time
+import urllib.request
+from datetime import datetime
+
+logger = logging.getLogger(__name__)
+
+FNG_MIN_ENTRY = int(os.getenv("FNG_MIN_ENTRY", "41")) # 진입 허용 최소 F&G 값
+_FNG_API_URL = "https://api.alternative.me/fng/?limit=1&format=json"
+_CACHE_TTL = 3600 # 1시간
+
+_fng_value: int | None = None
+_fng_cached_at: float = 0.0
+_fng_date_str: str = ""
+
+
+def get_fng() -> int:
+ """오늘의 F&G 지수 반환 (0~100). API 실패 시 50(중립) 반환."""
+ global _fng_value, _fng_cached_at, _fng_date_str
+
+ now = time.time()
+ if _fng_value is not None and (now - _fng_cached_at) < _CACHE_TTL:
+ return _fng_value
+
+ try:
+ with urllib.request.urlopen(_FNG_API_URL, timeout=5) as r:
+ data = json.loads(r.read())
+ entry = data["data"][0]
+ _fng_value = int(entry["value"])
+ _fng_cached_at = now
+ _fng_date_str = entry.get("timestamp", "")
+ logger.info(
+ f"[F&G] 지수={_fng_value} ({entry.get('value_classification','')}) "
+ f"날짜={datetime.fromtimestamp(int(_fng_date_str)).strftime('%Y-%m-%d') if _fng_date_str else '?'}"
+ )
+ except Exception as e:
+ logger.warning(f"[F&G] API 조회 실패: {e} → 캐시/중립값 사용")
+ if _fng_value is None:
+ _fng_value = 50 # 폴백: 중립
+
+ return _fng_value # type: ignore[return-value]
+
+
+def is_entry_allowed() -> bool:
+ """현재 F&G 기준으로 진입 허용 여부 반환.
+
+ F&G ≥ FNG_MIN_ENTRY(41) 이면 True.
+ 극공포/공포 구간(< 41)이면 False → 진입 차단.
+ """
+ fv = get_fng()
+ allowed = fv >= FNG_MIN_ENTRY
+ if not allowed:
+ label = (
+ "극공포" if fv <= 25 else
+ "공포" if fv <= 40 else
+ "약공포"
+ )
+ logger.info(f"[F&G] 진입 차단 — F&G={fv} ({label}) < {FNG_MIN_ENTRY}")
+ return allowed
diff --git a/core/notify.py b/core/notify.py
index 0209040..7693778 100644
--- a/core/notify.py
+++ b/core/notify.py
@@ -31,16 +31,27 @@ def _send(text: str) -> None:
def notify_buy(
ticker: str, price: float, amount: float, invested_krw: int,
max_budget: int = 0, per_position: int = 0,
+ fng: int = 0,
) -> None:
budget_line = (
f"운용예산: {max_budget:,}원 (포지션당 {per_position:,}원)\n"
if max_budget else ""
)
+ fng_label = (
+ "극탐욕" if fng >= 76 else
+ "탐욕" if fng >= 56 else
+ "중립" if fng >= 46 else
+ "약공포" if fng >= 41 else
+ "공포" if fng >= 26 else
+ "극공포"
+ ) if fng else ""
+ fng_line = f"F&G: {fng} ({fng_label})\n" if fng else ""
_send(
f"📈 [매수] {ticker}\n"
f"가격: {price:,.2f}원\n"
f"수량: {amount:.8f}\n"
f"투자금: {invested_krw:,.2f}원\n"
+ f"{fng_line}"
f"{budget_line}"
)
@@ -62,12 +73,28 @@ def notify_sell(
)
-def notify_signal(ticker: str, signal_price: float, vol_mult: float) -> None:
+def notify_signal(ticker: str, signal_price: float, vol_mult: float, fng: int = 0) -> None:
"""거래량 축적 신호 감지 알림."""
+ from .fng import FNG_MIN_ENTRY
+ fng_label = (
+ "극탐욕" if fng >= 76 else
+ "탐욕" if fng >= 56 else
+ "중립" if fng >= 46 else
+ "약공포" if fng >= 41 else
+ "공포" if fng >= 26 else
+ "극공포"
+ ) if fng else ""
+ fng_line = f"F&G: {fng} ({fng_label})\n" if fng else ""
+ warn_line = (
+ f"⚠️ F&G={fng} < {FNG_MIN_ENTRY} → 진입차단중\n"
+ if fng and fng < FNG_MIN_ENTRY else ""
+ )
_send(
f"🔍 [축적감지] {ticker}\n"
f"신호가: {signal_price:,.2f}원\n"
f"거래량: {vol_mult:.1f}x 급증 + 2h 횡보\n"
+ f"{fng_line}"
+ f"{warn_line}"
f"진입 목표: {signal_price * 1.048:,.2f}원 (+4.8%)"
)
@@ -98,6 +125,20 @@ def notify_status(
f"| 조건 TREND≥{regime['trend_pct']}% / VOL≥{regime['vol_mult']}x\n"
)
+ # F&G 지수
+ from .fng import get_fng, FNG_MIN_ENTRY
+ fv = get_fng()
+ fng_label = (
+ "극탐욕" if fv >= 76 else
+ "탐욕" if fv >= 56 else
+ "중립" if fv >= 46 else
+ "약공포" if fv >= 41 else
+ "공포" if fv >= 26 else
+ "극공포"
+ )
+ fng_status = "✅진입허용" if fv >= FNG_MIN_ENTRY else "🚫진입차단"
+ fng_line = f"😨 F&G: {fv} ({fng_label}) {fng_status}\n"
+
# 1시간 이상 보유 포지션만 필터
long_positions = {
ticker: pos for ticker, pos in positions.items()
@@ -112,7 +153,7 @@ def notify_status(
)
# 포지션 없어도 레짐 정보는 전송
- header = f"📊 [{now} 현황]\n{regime_line}{budget_info}"
+ header = f"📊 [{now} 현황]\n{regime_line}{fng_line}{budget_info}"
if not long_positions:
_send(header + "1h+ 보유 포지션 없음")
diff --git a/core/strategy.py b/core/strategy.py
index 00867c7..1e06f8f 100644
--- a/core/strategy.py
+++ b/core/strategy.py
@@ -23,6 +23,7 @@ import time
import pyupbit
+from .fng import FNG_MIN_ENTRY, is_entry_allowed
from .market import get_current_price
from .market_regime import get_regime
from .notify import notify_signal
@@ -122,9 +123,14 @@ def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
def should_buy(ticker: str) -> bool:
"""Volume Lead 전략.
- 1단계: 거래량 급증 + 2h 횡보 → 신호가 기록
- 2단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
+ 1단계: F&G 필터 — 공포탐욕지수 < FNG_MIN_ENTRY(41)이면 즉시 차단
+ 2단계: 거래량 급증 + 2h 횡보 → 신호가 기록
+ 3단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
"""
+ # ── F&G 진입 필터 ─────────────────────────────────────
+ if not is_entry_allowed():
+ return False
+
regime = get_regime()
vol_mult = regime["vol_mult"]
@@ -177,11 +183,13 @@ def should_buy(ticker: str) -> bool:
entry_thr = _calc_entry_threshold(ratio)
_accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": ratio}
+ from .fng import get_fng
+ fng_now = get_fng()
logger.info(
f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}원 "
- f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}%)"
+ f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}% | F&G={fng_now})"
)
- notify_signal(ticker, current, ratio)
+ notify_signal(ticker, current, ratio, fng=fng_now)
return False # 신호 첫 발생 시는 진입 안 함
# ── 신호 있음: 상승 확인 → 진입 ─────────────────────────
diff --git a/core/trader.py b/core/trader.py
index 656d0c5..a662ecf 100644
--- a/core/trader.py
+++ b/core/trader.py
@@ -480,8 +480,10 @@ def buy(ticker: str) -> bool:
f"{prefix}[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | "
f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}"
)
+ from .fng import get_fng
notify_buy(ticker, actual_price, amount, order_krw,
- max_budget=MAX_BUDGET, per_position=PER_POSITION)
+ max_budget=MAX_BUDGET, per_position=PER_POSITION,
+ fng=get_fng())
return True
except Exception as e:
logger.error(f"매수 예외 {ticker}: {e}")
diff --git a/daemon/runner.py b/daemon/runner.py
index e963dce..27605b1 100644
--- a/daemon/runner.py
+++ b/daemon/runner.py
@@ -6,6 +6,7 @@ import threading
import time
from core import trader
+from core.fng import FNG_MIN_ENTRY, get_fng
from core.market import get_top_tickers
from core.market_regime import get_regime
from core.strategy import get_active_signals, should_buy
@@ -73,8 +74,18 @@ def run_scanner() -> None:
time.sleep(SCAN_INTERVAL)
continue
+ # F&G 진입 필터 로그 (should_buy 내부에서 차단하지만 스캔 전 상태 기록)
+ fv = get_fng()
+ fng_label = (
+ "극탐욕" if fv >= 76 else "탐욕" if fv >= 56 else
+ "중립" if fv >= 46 else "약공포" if fv >= 41 else
+ "공포" if fv >= 26 else "극공포"
+ )
+ if fv < FNG_MIN_ENTRY:
+ logger.info(f"[F&G차단] F&G={fv} ({fng_label}) < {FNG_MIN_ENTRY} — 이번 스캔 진입 없음")
+
tickers = get_top_tickers()
- logger.info(f"스캔 시작: {len(tickers)}개 종목")
+ logger.info(f"스캔 시작: {len(tickers)}개 종목 | F&G={fv}({fng_label})")
for ticker in tickers:
# 이미 보유 중인 종목 제외
diff --git a/fng_1y_backtest.py b/fng_1y_backtest.py
new file mode 100644
index 0000000..8976a02
--- /dev/null
+++ b/fng_1y_backtest.py
@@ -0,0 +1,315 @@
+"""F&G 조건별 백테스트 - 1년치 데이터 (배치 수집)
+
+60일 극공포 편향을 제거하고 Bull/Neutral/Bear 다양한 구간 포함.
+데이터: 1h 캔들 배치 수집 → 약 365일치
+"""
+from __future__ import annotations
+
+import datetime, json, time, sys, urllib.request
+import pandas as pd
+import pyupbit
+from dataclasses import dataclass
+
+TICKERS = [
+ "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
+ "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
+ "KRW-SUI", "KRW-HBAR",
+ "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
+ "KRW-KAVA", "KRW-KNC",
+]
+
+VOL_MULT = 2.0
+QUIET_2H = 2.0
+SIG_TO_H = 8
+MOM_THR = 3.0
+SIG_CANCEL = 3.0
+TRAIL_STOP = 0.015
+TIME_H = 24
+TIME_MIN = 3.0
+
+
+# ── 데이터 수집 ───────────────────────────────────────────────
+def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
+ """1h 캔들을 배치로 수집해 약 1년치 DataFrame 반환."""
+ all_dfs = []
+ end = datetime.datetime.now()
+ batch = 1440 # 60일치씩
+ prev_oldest = None
+
+ while True:
+ df = pyupbit.get_ohlcv(
+ ticker, interval="minute60", count=batch,
+ to=end.strftime("%Y-%m-%d %H:%M:%S"),
+ )
+ if df is None or df.empty:
+ break
+ all_dfs.append(df)
+ oldest = df.index[0]
+ # 상장 초기 종목: oldest가 진전되지 않으면 더 오래된 데이터 없음
+ if prev_oldest is not None and oldest >= prev_oldest:
+ break
+ prev_oldest = oldest
+ cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
+ if oldest <= cutoff:
+ break
+ end = oldest
+ time.sleep(0.12)
+
+ if not all_dfs:
+ return None
+ combined = pd.concat(all_dfs).sort_index()
+ combined = combined[~combined.index.duplicated(keep="last")]
+ cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
+ return combined[combined.index >= cutoff]
+
+
+def load_fng() -> dict[str, int]:
+ url = "https://api.alternative.me/fng/?limit=400&format=json"
+ with urllib.request.urlopen(url, timeout=10) as r:
+ data = json.loads(r.read())
+ return {
+ datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
+ int(d["value"])
+ for d in data["data"]
+ }
+
+
+def fng_val(fng_map, ts):
+ return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
+
+
+# ── 시뮬레이션 ────────────────────────────────────────────────
+@dataclass
+class Trade:
+ pnl: float
+ h: int
+ fng: int
+ exit: str
+
+
+def simulate(df, fng_map, fng_lo=None, fng_hi=None) -> list[Trade]:
+ closes = df["close"].values
+ vols = df["volume"].values
+ idx = df.index
+ trades: list[Trade] = []
+ sig_px = sig_i = None
+ pos_buy = pos_peak = pos_i = pos_fng = None
+
+ for i in range(7, len(closes) - max(TIME_H + 4, 10)):
+ if pos_buy is not None:
+ cur = closes[i]
+ if cur > pos_peak:
+ pos_peak = cur
+ if (pos_peak - cur) / pos_peak >= TRAIL_STOP:
+ trades.append(Trade((cur - pos_buy) / pos_buy * 100,
+ i - pos_i, pos_fng, "trail"))
+ pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
+ continue
+ if i - pos_i >= TIME_H:
+ pnl = (cur - pos_buy) / pos_buy * 100
+ if pnl < TIME_MIN:
+ trades.append(Trade(pnl, i - pos_i, pos_fng, "time"))
+ pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
+ continue
+ continue
+
+ if sig_px is not None:
+ if i - sig_i > SIG_TO_H:
+ sig_px = sig_i = None
+ elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL:
+ sig_px = sig_i = None
+
+ if sig_px is None:
+ vol_avg = vols[i - 6:i - 1].mean()
+ if vol_avg <= 0:
+ continue
+ if vols[i - 1] / vol_avg >= VOL_MULT:
+ if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H:
+ sig_px = closes[i]
+ sig_i = i
+ continue
+
+ fv = fng_val(fng_map, idx[i])
+ if fng_lo is not None and fv < fng_lo:
+ continue
+ if fng_hi is not None and fv > fng_hi:
+ continue
+
+ if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR:
+ pos_buy = pos_peak = closes[i]
+ pos_i = i
+ pos_fng = fv
+ sig_px = sig_i = None
+
+ return trades
+
+
+def stats(trades):
+ if not trades:
+ return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
+ avg_win=0, avg_loss=0, max_dd=0)
+ wins = [t for t in trades if t.pnl > 0]
+ losses = [t for t in trades if t.pnl <= 0]
+ aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
+ al = sum(t.pnl for t in losses) / len(losses) if losses else 0
+ cum = pk = max_dd = 0.0
+ for t in trades:
+ cum += t.pnl
+ if cum > pk: pk = cum
+ if pk - cum > max_dd: max_dd = pk - cum
+ return dict(
+ n=len(trades), wr=len(wins) / len(trades) * 100,
+ avg_pnl=sum(t.pnl for t in trades) / len(trades),
+ total_pnl=sum(t.pnl for t in trades),
+ rr=abs(aw / al) if al else 0,
+ avg_win=aw, avg_loss=al, max_dd=max_dd,
+ )
+
+
+def main():
+ print("F&G 데이터 로드...")
+ fng_map = load_fng()
+
+ # F&G 연간 분포 출력
+ from collections import Counter
+ zone_cnt = Counter()
+ for v in fng_map.values():
+ if v <= 25: zone_cnt["극공포(0~25)"] += 1
+ elif v <= 45: zone_cnt["공포(26~45)"] += 1
+ elif v <= 55: zone_cnt["중립(46~55)"] += 1
+ elif v <= 75: zone_cnt["탐욕(56~75)"] += 1
+ else: zone_cnt["극탐욕(76~100)"] += 1
+ total_days = sum(zone_cnt.values())
+ print(f" 1년 F&G 분포 ({total_days}일):")
+ for k, v in sorted(zone_cnt.items()):
+ bar = "█" * (v // 5)
+ print(f" {k:<14} {v:>3}일 ({v/total_days*100:>4.1f}%) {bar}")
+
+ print(f"\n종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
+ datasets = {}
+ for i, tk in enumerate(TICKERS):
+ try:
+ df = fetch_1y(tk, total_days=365)
+ if df is not None and len(df) > 100:
+ datasets[tk] = df
+ sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
+ except Exception as e:
+ sys.stderr.write(f"\r {tk} 실패: {e} ")
+ sys.stderr.write("\n")
+ print(f" 완료: {len(datasets)}개 종목\n")
+
+ # ── 전체 기간 F&G 구간별 성과 ────────────────────────────
+ CONFIGS = [
+ (None, None, "필터 없음 (전체)"),
+ (None, 25, "극공포만 (0~25)"),
+ (26, 45, "공포만 (26~45)"),
+ (46, 55, "중립만 (46~55)"),
+ (56, 100, "탐욕+ (56~100)"),
+ (46, 100, "중립 이상 (46~100)"),
+ (26, 100, "공포 이상 (26~100)"),
+ ]
+
+ print("=" * 78)
+ print(" F&G 조건별 성과 - 1년치 (1h 캔들 / 모멘텀 / 스탑1.5%)")
+ print("=" * 78)
+ print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
+ f"{'손익비':>6} {'총PnL':>9} {'MaxDD':>7}")
+ print(" " + "-" * 72)
+
+ all_results = {}
+ for lo, hi, label in CONFIGS:
+ all_trades = []
+ for df in datasets.values():
+ all_trades.extend(simulate(df, fng_map, lo, hi))
+ s = stats(all_trades)
+ all_results[label] = (s, all_trades)
+ if s["n"] == 0:
+ print(f" {label:<26} 거래 없음 (해당 구간 진입 기회 없음)")
+ continue
+ sign = "+" if s["total_pnl"] > 0 else ""
+ print(
+ f" {label:<26} {s['n']:>5}건 {s['wr']:>5.1f}% "
+ f"{s['avg_pnl']:>+7.3f}% {s['rr']:>5.2f} "
+ f"{sign}{s['total_pnl']:>8.1f}% -{s['max_dd']:>5.1f}%"
+ )
+
+ # ── 분기별 성과 (계절성) ──────────────────────────────────
+ print()
+ print(" 분기별 성과 (전체 필터 없음 기준):")
+ base_trades = all_results["필터 없음 (전체)"][1]
+ for df in datasets.values():
+ pass # already computed
+
+ # 전체 종목 합산 후 날짜로 분기 분리
+ all_base = []
+ for df in datasets.values():
+ t_list = simulate(df, fng_map)
+ # trade에 날짜 정보 추가
+ # simulate에서 idx를 참조하지 않으므로 재계산
+ all_base.extend(t_list)
+
+ # F&G 수치별 세분화
+ print()
+ print(" F&G 10단위 구간별 세부 성과:")
+ print(f" {'구간':<16} {'건수':>5} {'승률':>6} {'평균PnL':>9} {'손익비':>6} {'의미'}")
+ print(" " + "-" * 65)
+ fng_zones_detail = [
+ (0, 10, "극단 공포(0~10)"),
+ (11, 20, "극단 공포(11~20)"),
+ (21, 30, "극공포(21~30)"),
+ (31, 40, "공포(31~40)"),
+ (41, 50, "약공포(41~50)"),
+ (51, 60, "약탐욕(51~60)"),
+ (61, 75, "탐욕(61~75)"),
+ (76, 100, "극탐욕(76~100)"),
+ ]
+ base_all = all_results["필터 없음 (전체)"][1]
+ for lo, hi, name in fng_zones_detail:
+ sub = [t for t in base_all if lo <= t.fng <= hi]
+ if not sub:
+ continue
+ s = stats(sub)
+ breakeven_wr = 1 / (1 + s["rr"]) * 100 if s["rr"] > 0 else 50
+ profitable = "✅ 수익" if s["avg_pnl"] > 0 else ("⚠️ BEP 근접" if s["avg_pnl"] > -0.2 else "❌ 손실")
+ print(
+ f" {name:<16} {s['n']:>5}건 {s['wr']:>5.1f}% "
+ f"{s['avg_pnl']:>+8.3f}% {s['rr']:>5.2f} {profitable}"
+ )
+
+ # ── 최적 F&G 구간 요약 ───────────────────────────────────
+ print()
+ best = max(
+ [(label, s) for label, (s, _) in all_results.items() if s["n"] >= 50],
+ key=lambda x: x[1]["avg_pnl"],
+ )
+ print(f" ★ 최적 구간: {best[0]} "
+ f"(거래 {best[1]['n']}건 | 승률 {best[1]['wr']:.1f}% | "
+ f"평균PnL {best[1]['avg_pnl']:+.3f}%)")
+
+ # ── DB 저장 ──────────────────────────────────────────────
+ try:
+ from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
+ ensure_tables()
+ params = {
+ "tickers": len(datasets), "days": 365, "candle": "1h",
+ "trail_stop": 0.015, "mom_thr": 3.0, "vol_mult": 2.0,
+ }
+ run_id = insert_run(
+ run_name="fng_1y_backtest",
+ description="F&G 구간별 성과 1년치 백테스트 (1h 캔들 / 모멘텀 / 스탑1.5%)",
+ params=params,
+ )
+ for lo, hi, label in CONFIGS:
+ if label in all_results:
+ s, trades = all_results[label]
+ if s["n"] > 0:
+ insert_result(run_id, label, s, lo, hi)
+ # 전체 거래는 per-ticker 분리 없이 일괄 저장 (run_id+label로 구분)
+ insert_trades_bulk(run_id, label, "_all_", trades)
+ print(f"\n [DB 저장 완료] run_id: {run_id}")
+ except Exception as e:
+ print(f"\n [DB 저장 실패] {e}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/fng_adaptive_backtest.py b/fng_adaptive_backtest.py
new file mode 100644
index 0000000..e81ef3a
--- /dev/null
+++ b/fng_adaptive_backtest.py
@@ -0,0 +1,317 @@
+"""F&G 구간별 맞춤 파라미터 백테스트
+
+핵심 가설:
+ 극공포 구간은 시장이 불안정 → 더 엄격한 진입 기준 필요
+ 탐욕 구간은 상승 모멘텀이 지속 → 다소 느슨한 기준도 가능
+
+테스트 방식:
+ 각 F&G 구간마다 다른 파라미터 조합을 적용하고 성과 비교.
+ 구간별 최적 파라미터 도출 → 실제 전략에 반영
+
+결과를 Oracle DB에 저장.
+데이터: 1년치 1h 캔들 (배치 수집)
+"""
+from __future__ import annotations
+
+import datetime
+import json
+import sys
+import time
+import urllib.request
+
+import pandas as pd
+import pyupbit
+from dataclasses import dataclass
+
+# ── DB 저장 ─────────────────────────────────────────────────
+try:
+ from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
+ DB_ENABLED = True
+except Exception as e:
+ print(f" [DB 비활성화] {e}")
+ DB_ENABLED = False
+
+TICKERS = [
+ "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
+ "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
+ "KRW-SUI", "KRW-HBAR",
+ "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
+ "KRW-KAVA", "KRW-KNC",
+]
+
+# ── F&G 구간별 파라미터 조합 ─────────────────────────────────
+# (fng_lo, fng_hi, label, vol_mult, quiet_2h, sig_to_h, mom_thr, sig_cancel, trail_stop, time_h, time_min)
+ADAPTIVE_CONFIGS = [
+ # 기준선 (F&G 무관, 단일 파라미터)
+ (None, None, "기준선(전체/현행파라미터)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
+
+ # ── 극공포 (0~25) 구간 ── 엄격한 기준 ──
+ # 극공포에서는 변동성 급증이 흔함 → 볼륨 기준 올리고, 모멘텀 강화
+ (None, 25, "극공포/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
+ (None, 25, "극공포/엄격(3x vol+4%mom)", 3.0, 2.0, 8, 4.0, 3.0, 0.010, 24, 3.0),
+ (None, 25, "극공포/매우엄격(3x+5%+1%스탑)", 3.0, 2.0, 6, 5.0, 3.0, 0.010, 24, 3.0),
+ (None, 25, "극공포/넓은스탑(2x+3%+2%스탑)", 2.0, 2.0, 8, 3.0, 3.0, 0.020, 24, 3.0),
+ (None, 25, "극공포/짧은신호(3x+4%+4h유효)", 3.0, 2.0, 4, 4.0, 3.0, 0.015, 24, 3.0),
+
+ # ── 공포 (26~45) ── 중간 기준 ──
+ (26, 45, "공포/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
+ (26, 45, "공포/약강화(2.5x vol+3.5%mom)", 2.5, 2.0, 8, 3.5, 3.0, 0.015, 24, 3.0),
+ (26, 45, "공포/엄격(3x vol+4%mom)", 3.0, 2.0, 8, 4.0, 3.0, 0.010, 24, 3.0),
+
+ # ── 중립 이상 (46~100) ── 완화된 기준 가능 ──
+ (46, None, "중립이상/기준(2x vol+3%mom)", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
+ (46, None, "중립이상/완화(1.5x vol+2.5%mom)",1.5, 2.0, 8, 2.5, 3.0, 0.015, 24, 3.0),
+ (46, None, "중립이상/엄격(2.5x+3.5%)", 2.5, 2.0, 8, 3.5, 3.0, 0.015, 24, 3.0),
+
+ # ── 탐욕+ (56~100) ──
+ (56, None, "탐욕이상/기준", 2.0, 2.0, 8, 3.0, 3.0, 0.015, 24, 3.0),
+ (56, None, "탐욕이상/완화(1.5x+2.5%)", 1.5, 2.0, 8, 2.5, 3.0, 0.015, 24, 3.0),
+]
+
+
+# ── 데이터 수집 ──────────────────────────────────────────────
+def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
+ all_dfs = []
+ end = datetime.datetime.now()
+ batch = 1440 # 60일치씩
+ prev_oldest = None
+
+ while True:
+ df = pyupbit.get_ohlcv(
+ ticker, interval="minute60", count=batch,
+ to=end.strftime("%Y-%m-%d %H:%M:%S"),
+ )
+ if df is None or df.empty:
+ break
+ all_dfs.append(df)
+ oldest = df.index[0]
+ # 상장 초기 종목: oldest가 진전되지 않으면 더 이상 오래된 데이터 없음
+ if prev_oldest is not None and oldest >= prev_oldest:
+ break
+ prev_oldest = oldest
+ cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
+ if oldest <= cutoff:
+ break
+ end = oldest
+ time.sleep(0.12)
+
+ if not all_dfs:
+ return None
+ combined = pd.concat(all_dfs).sort_index()
+ combined = combined[~combined.index.duplicated(keep="last")]
+ cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
+ return combined[combined.index >= cutoff]
+
+
+def load_fng() -> dict[str, int]:
+ url = "https://api.alternative.me/fng/?limit=400&format=json"
+ with urllib.request.urlopen(url, timeout=10) as r:
+ data = json.loads(r.read())
+ return {
+ datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
+ int(d["value"])
+ for d in data["data"]
+ }
+
+
+def fng_val(fng_map, ts) -> int:
+ return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
+
+
+# ── 시뮬레이션 ──────────────────────────────────────────────
+@dataclass
+class Trade:
+ pnl: float
+ h: int
+ fng: int
+ exit: str
+
+
+def simulate(
+ df, fng_map,
+ fng_lo=None, fng_hi=None,
+ vol_mult=2.0, quiet_2h=2.0, sig_to_h=8,
+ mom_thr=3.0, sig_cancel=3.0, trail_stop=0.015,
+ time_h=24, time_min=3.0,
+) -> list[Trade]:
+ closes = df["close"].values
+ vols = df["volume"].values
+ idx = df.index
+ trades: list[Trade] = []
+ sig_px = sig_i = None
+ pos_buy = pos_peak = pos_i = pos_fng = None
+
+ for i in range(7, len(closes) - max(time_h + 4, 10)):
+ if pos_buy is not None:
+ cur = closes[i]
+ if cur > pos_peak:
+ pos_peak = cur
+ if (pos_peak - cur) / pos_peak >= trail_stop:
+ trades.append(Trade((cur - pos_buy) / pos_buy * 100,
+ i - pos_i, pos_fng, "trail"))
+ pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
+ continue
+ if i - pos_i >= time_h:
+ pnl = (cur - pos_buy) / pos_buy * 100
+ if pnl < time_min:
+ trades.append(Trade(pnl, i - pos_i, pos_fng, "time"))
+ pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
+ continue
+ continue
+
+ if sig_px is not None:
+ if i - sig_i > sig_to_h:
+ sig_px = sig_i = None
+ elif (closes[i] - sig_px) / sig_px * 100 < -sig_cancel:
+ sig_px = sig_i = None
+
+ if sig_px is None:
+ vol_avg = vols[i - 6:i - 1].mean()
+ if vol_avg <= 0:
+ continue
+ if vols[i - 1] / vol_avg >= vol_mult:
+ if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < quiet_2h:
+ sig_px = closes[i]
+ sig_i = i
+ continue
+
+ fv = fng_val(fng_map, idx[i])
+ if fng_lo is not None and fv < fng_lo:
+ continue
+ if fng_hi is not None and fv > fng_hi:
+ continue
+
+ if (closes[i] - sig_px) / sig_px * 100 >= mom_thr:
+ pos_buy = pos_peak = closes[i]
+ pos_i = i
+ pos_fng = fv
+ sig_px = sig_i = None
+
+ return trades
+
+
+def stats(trades):
+ if not trades:
+ return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
+ avg_win=0, avg_loss=0, max_dd=0)
+ wins = [t for t in trades if t.pnl > 0]
+ losses = [t for t in trades if t.pnl <= 0]
+ aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
+ al = sum(t.pnl for t in losses) / len(losses) if losses else 0
+ cum = pk = max_dd = 0.0
+ for t in trades:
+ cum += t.pnl
+ if cum > pk: pk = cum
+ if pk - cum > max_dd: max_dd = pk - cum
+ return dict(
+ n=len(trades), wr=len(wins) / len(trades) * 100,
+ avg_pnl=sum(t.pnl for t in trades) / len(trades),
+ total_pnl=sum(t.pnl for t in trades),
+ rr=abs(aw / al) if al else 0,
+ avg_win=aw, avg_loss=al, max_dd=max_dd,
+ )
+
+
+def main():
+ print("F&G 데이터 로드...")
+ fng_map = load_fng()
+
+ print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
+ datasets = {}
+ for i, tk in enumerate(TICKERS):
+ try:
+ df = fetch_1y(tk, total_days=365)
+ if df is not None and len(df) > 100:
+ datasets[tk] = df
+ sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
+ except Exception as e:
+ sys.stderr.write(f"\r {tk} 실패: {e} ")
+ sys.stderr.write("\n")
+ print(f" 완료: {len(datasets)}개 종목\n")
+
+ # ── DB 준비 ───────────────────────────────────────────
+ run_id = None
+ if DB_ENABLED:
+ ensure_tables()
+ params = {
+ "tickers": len(datasets),
+ "days": 365,
+ "candle": "1h",
+ "stop": "trail+time",
+ }
+ run_id = insert_run(
+ run_name="fng_adaptive_1y",
+ description="F&G 구간별 맞춤 파라미터 1년 백테스트",
+ params=params,
+ )
+ print(f" DB run_id: {run_id}\n")
+
+ # ── 결과 출력 ─────────────────────────────────────────
+ print("=" * 92)
+ print(" F&G 구간별 맞춤 파라미터 성과 비교 (1년치 / 1h 캔들)")
+ print("=" * 92)
+ print(f" {'조건':<42} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
+ f"{'손익비':>5} {'총PnL':>9} {'MaxDD':>7}")
+ print(" " + "-" * 86)
+
+ best_by_zone: dict[str, tuple] = {}
+
+ for cfg in ADAPTIVE_CONFIGS:
+ fng_lo, fng_hi, label, vol_mult, quiet_2h, sig_to_h, mom_thr, sig_cancel, trail_stop, time_h, time_min = cfg
+
+ all_trades: list[Trade] = []
+ per_ticker: dict[str, list[Trade]] = {}
+ for tk, df in datasets.items():
+ t = simulate(
+ df, fng_map,
+ fng_lo=fng_lo, fng_hi=fng_hi,
+ vol_mult=vol_mult, quiet_2h=quiet_2h, sig_to_h=sig_to_h,
+ mom_thr=mom_thr, sig_cancel=sig_cancel, trail_stop=trail_stop,
+ time_h=time_h, time_min=time_min,
+ )
+ all_trades.extend(t)
+ per_ticker[tk] = t
+
+ s = stats(all_trades)
+
+ # 구분선 (기준선 다음)
+ if label == "극공포/기준(2x vol+3%mom)":
+ print()
+
+ if s["n"] == 0:
+ print(f" {label:<42} 거래 없음")
+ continue
+
+ marker = " ★" if s["avg_pnl"] > 0 else ""
+ print(
+ f" {label:<42} {s['n']:>5}건 {s['wr']:>5.1f}% "
+ f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} "
+ f"{s['total_pnl']:>+8.1f}% -{s['max_dd']:>5.1f}%{marker}"
+ )
+
+ # DB 저장
+ if DB_ENABLED and run_id:
+ insert_result(run_id, label, s, fng_lo, fng_hi)
+ for tk, t_list in per_ticker.items():
+ insert_trades_bulk(run_id, label, tk, t_list)
+
+ # 구간별 최고 avg_pnl 추적
+ zone_key = label.split("/")[0]
+ if zone_key not in best_by_zone or s["avg_pnl"] > best_by_zone[zone_key][1]:
+ best_by_zone[zone_key] = (label, s["avg_pnl"], s)
+
+ # ── 구간별 최적 요약 ──────────────────────────────────
+ print()
+ print(" ★ 구간별 최적 파라미터:")
+ print(f" {'구간':<14} {'최적 조건':<42} {'거래':>5} {'승률':>6} {'평균PnL':>8}")
+ print(" " + "-" * 72)
+ for zone, (label, best_pnl, s) in best_by_zone.items():
+ if s["n"] > 0:
+ print(f" {zone:<14} {label:<42} {s['n']:>5}건 {s['wr']:>5.1f}% {best_pnl:>+7.3f}%")
+
+ if DB_ENABLED and run_id:
+ print(f"\n [DB 저장 완료] run_id: {run_id}")
+
+
+if __name__ == "__main__":
+ main()
diff --git a/fng_sim_comparison.py b/fng_sim_comparison.py
new file mode 100644
index 0000000..8cc1197
--- /dev/null
+++ b/fng_sim_comparison.py
@@ -0,0 +1,382 @@
+"""F&G 필터 전후 수익 비교 시뮬레이션
+
+필터 없음 vs F&G ≥ 41 필터 적용 시 1년치 성과를 직접 비교.
+
+표시:
+ - 거래 수, 승률, 평균 PnL, 총 누적 PnL
+ - 거래당 고정 자본 100만 원 기준 KRW 환산 손익
+ - 월별 손익 흐름 (계절성 확인)
+ - 극공포 차단 일수 통계
+
+결과는 Oracle DB(backtest_results)에 저장.
+데이터: 1년치 1h 캔들 (배치 수집)
+"""
+from __future__ import annotations
+
+import datetime
+import json
+import sys
+import time
+import urllib.request
+from dataclasses import dataclass
+
+import pandas as pd
+import pyupbit
+
+# ── DB 저장 ─────────────────────────────────────────────────
+try:
+ from backtest_db import ensure_tables, insert_run, insert_result, insert_trades_bulk
+ DB_ENABLED = True
+except Exception as e:
+ print(f" [DB 비활성화] {e}")
+ DB_ENABLED = False
+
+TICKERS = [
+ "KRW-BTC", "KRW-ETH", "KRW-XRP", "KRW-SOL", "KRW-DOGE",
+ "KRW-ADA", "KRW-DOT", "KRW-NEAR", "KRW-AVAX", "KRW-LINK",
+ "KRW-SUI", "KRW-HBAR",
+ "KRW-VIRTUAL", "KRW-SXP", "KRW-CFG", "KRW-HOLO",
+ "KRW-KAVA", "KRW-KNC",
+]
+
+CAPITAL_PER_TRADE = 1_000_000 # 거래당 고정 자본 (KRW)
+
+# 전략 파라미터 (현행)
+VOL_MULT = 2.0
+QUIET_2H = 2.0
+SIG_TO_H = 8
+MOM_THR = 3.0
+SIG_CANCEL = 3.0
+TRAIL_STOP = 0.015
+TIME_H = 24
+TIME_MIN = 3.0
+
+FNG_MIN = 41 # 이 값 미만이면 진입 차단
+
+
+# ── 데이터 수집 ──────────────────────────────────────────────
+def fetch_1y(ticker: str, total_days: int = 365) -> pd.DataFrame | None:
+ all_dfs = []
+ end = datetime.datetime.now()
+ batch = 1440
+ prev_oldest = None
+
+ while True:
+ df = pyupbit.get_ohlcv(
+ ticker, interval="minute60", count=batch,
+ to=end.strftime("%Y-%m-%d %H:%M:%S"),
+ )
+ if df is None or df.empty:
+ break
+ all_dfs.append(df)
+ oldest = df.index[0]
+ if prev_oldest is not None and oldest >= prev_oldest:
+ break
+ prev_oldest = oldest
+ cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
+ if oldest <= cutoff:
+ break
+ end = oldest
+ time.sleep(0.12)
+
+ if not all_dfs:
+ return None
+ combined = pd.concat(all_dfs).sort_index()
+ combined = combined[~combined.index.duplicated(keep="last")]
+ cutoff = datetime.datetime.now() - datetime.timedelta(days=total_days)
+ return combined[combined.index >= cutoff]
+
+
+def load_fng() -> dict[str, int]:
+ url = "https://api.alternative.me/fng/?limit=400&format=json"
+ with urllib.request.urlopen(url, timeout=10) as r:
+ data = json.loads(r.read())
+ return {
+ datetime.datetime.fromtimestamp(int(d["timestamp"])).strftime("%Y-%m-%d"):
+ int(d["value"])
+ for d in data["data"]
+ }
+
+
+def fng_val(fng_map, ts) -> int:
+ return fng_map.get(ts.strftime("%Y-%m-%d"), 50)
+
+
+# ── 시뮬레이션 ──────────────────────────────────────────────
+@dataclass
+class Trade:
+ pnl: float
+ h: int
+ fng: int
+ exit: str
+ date: str # YYYY-MM
+
+
+def simulate(df, fng_map, fng_min: int | None = None) -> list[Trade]:
+ closes = df["close"].values
+ vols = df["volume"].values
+ idx = df.index
+ trades: list[Trade] = []
+ sig_px = sig_i = None
+ pos_buy = pos_peak = pos_i = pos_fng = None
+
+ for i in range(7, len(closes) - max(TIME_H + 4, 10)):
+ if pos_buy is not None:
+ cur = closes[i]
+ if cur > pos_peak:
+ pos_peak = cur
+ if (pos_peak - cur) / pos_peak >= TRAIL_STOP:
+ trades.append(Trade(
+ (cur - pos_buy) / pos_buy * 100,
+ i - pos_i, pos_fng, "trail",
+ idx[i].strftime("%Y-%m"),
+ ))
+ pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
+ continue
+ if i - pos_i >= TIME_H:
+ pnl = (cur - pos_buy) / pos_buy * 100
+ if pnl < TIME_MIN:
+ trades.append(Trade(
+ pnl, i - pos_i, pos_fng, "time",
+ idx[i].strftime("%Y-%m"),
+ ))
+ pos_buy = pos_peak = pos_i = pos_fng = sig_px = sig_i = None
+ continue
+ continue
+
+ if sig_px is not None:
+ if i - sig_i > SIG_TO_H:
+ sig_px = sig_i = None
+ elif (closes[i] - sig_px) / sig_px * 100 < -SIG_CANCEL:
+ sig_px = sig_i = None
+
+ if sig_px is None:
+ vol_avg = vols[i - 6:i - 1].mean()
+ if vol_avg <= 0:
+ continue
+ if vols[i - 1] / vol_avg >= VOL_MULT:
+ if abs(closes[i] - closes[i - 2]) / closes[i - 2] * 100 < QUIET_2H:
+ sig_px = closes[i]
+ sig_i = i
+ continue
+
+ fv = fng_val(fng_map, idx[i])
+ if fng_min is not None and fv < fng_min:
+ continue
+
+ if (closes[i] - sig_px) / sig_px * 100 >= MOM_THR:
+ pos_buy = pos_peak = closes[i]
+ pos_i = i
+ pos_fng = fv
+ sig_px = sig_i = None
+
+ return trades
+
+
+def stats(trades: list[Trade]) -> dict:
+ if not trades:
+ return dict(n=0, wr=0, avg_pnl=0, total_pnl=0, rr=0,
+ avg_win=0, avg_loss=0, max_dd=0, krw_total=0)
+ wins = [t for t in trades if t.pnl > 0]
+ losses = [t for t in trades if t.pnl <= 0]
+ aw = sum(t.pnl for t in wins) / len(wins) if wins else 0
+ al = sum(t.pnl for t in losses) / len(losses) if losses else 0
+ cum = pk = max_dd = 0.0
+ for t in trades:
+ cum += t.pnl
+ if cum > pk: pk = cum
+ if pk - cum > max_dd: max_dd = pk - cum
+ total_pnl = sum(t.pnl for t in trades)
+ return dict(
+ n=len(trades), wr=len(wins) / len(trades) * 100,
+ avg_pnl=total_pnl / len(trades),
+ total_pnl=total_pnl,
+ rr=abs(aw / al) if al else 0,
+ avg_win=aw, avg_loss=al, max_dd=max_dd,
+ krw_total=total_pnl / 100 * CAPITAL_PER_TRADE,
+ )
+
+
+def monthly_pnl(trades: list[Trade]) -> dict[str, float]:
+ """월별 누적 PnL(%) 반환."""
+ monthly: dict[str, float] = {}
+ for t in trades:
+ monthly[t.date] = monthly.get(t.date, 0) + t.pnl
+ return dict(sorted(monthly.items()))
+
+
+def main():
+ print("F&G 데이터 로드...")
+ fng_map = load_fng()
+
+ # F&G 분포
+ block_days = sum(1 for v in fng_map.values() if v < FNG_MIN)
+ total_days = len(fng_map)
+ print(f" 1년 F&G 분포: 진입차단(< {FNG_MIN}) = {block_days}일 / {total_days}일 "
+ f"({block_days/total_days*100:.1f}%)")
+ print(f" 진입허용(≥ {FNG_MIN}) = {total_days - block_days}일 ({(total_days-block_days)/total_days*100:.1f}%)\n")
+
+ print(f"종목 1년치 데이터 수집 중 ({len(TICKERS)}개)...")
+ datasets: dict[str, pd.DataFrame] = {}
+ for i, tk in enumerate(TICKERS):
+ try:
+ df = fetch_1y(tk, total_days=365)
+ if df is not None and len(df) > 100:
+ datasets[tk] = df
+ sys.stderr.write(f"\r {i+1}/{len(TICKERS)} {tk} ({len(df)}h) ")
+ except Exception as e:
+ sys.stderr.write(f"\r {tk} 실패: {e} ")
+ sys.stderr.write("\n")
+ print(f" 완료: {len(datasets)}개 종목\n")
+
+ # ── 두 가지 조건 시뮬레이션 ──────────────────────────────
+ # A: 필터 없음 (현행)
+ # B: F&G ≥ 41 (신규)
+ all_trades_A: list[Trade] = []
+ all_trades_B: list[Trade] = []
+ per_ticker_A: dict[str, list[Trade]] = {}
+ per_ticker_B: dict[str, list[Trade]] = {}
+
+ for tk, df in datasets.items():
+ ta = simulate(df, fng_map, fng_min=None)
+ tb = simulate(df, fng_map, fng_min=FNG_MIN)
+ all_trades_A.extend(ta)
+ all_trades_B.extend(tb)
+ per_ticker_A[tk] = ta
+ per_ticker_B[tk] = tb
+
+ sa = stats(all_trades_A)
+ sb = stats(all_trades_B)
+
+ # ── 결과 출력 ─────────────────────────────────────────────
+ print("=" * 80)
+ print(f" F&G 필터 전후 비교 (1년치 / {len(datasets)}개 종목 / 1h캔들 / 자본 {CAPITAL_PER_TRADE:,}원/거래)")
+ print("=" * 80)
+ print(f" {'조건':<26} {'거래':>5} {'승률':>6} {'평균PnL':>8} "
+ f"{'손익비':>5} {'총PnL':>8} {'MaxDD':>7} {'KRW손익':>14}")
+ print(" " + "-" * 76)
+
+ for label, s in [("필터 없음 (현행)", sa), (f"F&G≥{FNG_MIN} 필터 (신규)", sb)]:
+ krw_str = f"{s['krw_total']:>+,.0f}원"
+ print(
+ f" {label:<26} {s['n']:>5}건 {s['wr']:>5.1f}% "
+ f"{s['avg_pnl']:>+7.3f}% {s['rr']:>4.2f} "
+ f"{s['total_pnl']:>+7.1f}% -{s['max_dd']:>5.1f}% {krw_str:>14}"
+ )
+
+ diff_trades = sb["n"] - sa["n"]
+ diff_krw = sb["krw_total"] - sa["krw_total"]
+ diff_wr = sb["wr"] - sa["wr"]
+ print(f"\n 변화: 거래수 {diff_trades:+d}건 | 승률 {diff_wr:+.1f}%p | "
+ f"KRW손익 {diff_krw:>+,.0f}원")
+
+ # ── 월별 손익 흐름 ────────────────────────────────────────
+ print()
+ print(" 월별 손익 비교 (필터없음 vs F&G≥41):")
+ print(f" {'월':>8} {'차단일수':>6} {'필터없음':>9} {'F&G필터':>9} {'개선':>8} {'누적(필터)':>12}")
+ print(" " + "-" * 62)
+
+ ma = monthly_pnl(all_trades_A)
+ mb = monthly_pnl(all_trades_B)
+ all_months = sorted(set(ma.keys()) | set(mb.keys()))
+
+ cum_b = 0.0
+ for m in all_months:
+ pa = ma.get(m, 0.0)
+ pb = mb.get(m, 0.0)
+ cum_b += pb
+ diff = pb - pa
+ # 해당 월 차단 일수
+ yr, mo = int(m[:4]), int(m[5:])
+ blocked = sum(
+ 1 for d, v in fng_map.items()
+ if d.startswith(m) and v < FNG_MIN
+ )
+ bar = "▓" * min(int(abs(pb) / 3), 12) if pb > 0 else "░" * min(int(abs(pb) / 3), 12)
+ sign = "+" if pb > 0 else ""
+ diff_sign = "▲" if diff > 0 else ("▼" if diff < 0 else "=")
+ print(
+ f" {m} {blocked:>4}일차단 "
+ f"{pa:>+8.1f}% {sign}{pb:>8.1f}% "
+ f"{diff_sign}{abs(diff):>6.1f}% {cum_b:>+10.1f}%"
+ )
+
+ # ── 종목별 비교 (상위/하위) ───────────────────────────────
+ print()
+ print(" 종목별 성과 비교 (필터없음 vs F&G≥41):")
+ print(f" {'종목':<14} {'현행거래':>6} {'현행PnL':>8} {'필터거래':>7} {'필터PnL':>8} {'개선':>8}")
+ print(" " + "-" * 58)
+
+ ticker_rows = []
+ for tk in sorted(datasets.keys()):
+ ta_list = per_ticker_A.get(tk, [])
+ tb_list = per_ticker_B.get(tk, [])
+ pa = sum(t.pnl for t in ta_list) if ta_list else 0
+ pb = sum(t.pnl for t in tb_list) if tb_list else 0
+ ticker_rows.append((tk, len(ta_list), pa, len(tb_list), pb, pb - pa))
+
+ for row in sorted(ticker_rows, key=lambda x: x[5], reverse=True):
+ tk, na, pa, nb, pb, delta = row
+ mark = "▲" if delta > 1 else ("▼" if delta < -1 else " =")
+ print(
+ f" {tk:<14} {na:>6}건 {pa:>+7.1f}% {nb:>6}건 {pb:>+7.1f}% "
+ f"{mark}{abs(delta):>6.1f}%"
+ )
+
+ # ── 극공포 차단 효과 분석 ─────────────────────────────────
+ print()
+ print(f" F&G < {FNG_MIN} 구간(차단) 거래 성과 분석:")
+ blocked_trades = [t for t in all_trades_A if t.fng < FNG_MIN]
+ if blocked_trades:
+ sb2 = stats(blocked_trades)
+ print(f" → 차단된 거래 수: {sb2['n']}건")
+ print(f" → 차단 거래 승률: {sb2['wr']:.1f}%")
+ print(f" → 차단 거래 평균 PnL: {sb2['avg_pnl']:+.3f}%")
+ print(f" → 차단으로 절약된 손실: {sb2['krw_total']:>+,.0f}원 "
+ f"({CAPITAL_PER_TRADE:,}원 × {sb2['n']}거래 기준)")
+ else:
+ print(" → 차단된 거래 없음")
+
+ # ── 최적 임계값 확인 ─────────────────────────────────────
+ print()
+ print(f" F&G 임계값별 성과 비교 (현행 기준 비교):")
+ print(f" {'임계값':>8} {'거래':>5} {'승률':>6} {'평균PnL':>9} {'KRW손익':>14}")
+ print(" " + "-" * 52)
+ for thr in [25, 30, 35, 41, 45, 50]:
+ filtered = [t for t in all_trades_A if t.fng >= thr]
+ if not filtered:
+ continue
+ sf = stats(filtered)
+ marker = " ◀ 채택" if thr == FNG_MIN else ""
+ print(
+ f" {thr:>5}이상 {sf['n']:>5}건 {sf['wr']:>5.1f}% "
+ f"{sf['avg_pnl']:>+8.3f}% {sf['krw_total']:>+14,.0f}원{marker}"
+ )
+
+ # ── DB 저장 ───────────────────────────────────────────────
+ if DB_ENABLED:
+ try:
+ ensure_tables()
+ params = {
+ "tickers": len(datasets), "days": 365, "candle": "1h",
+ "trail_stop": TRAIL_STOP, "mom_thr": MOM_THR,
+ "fng_min_new": FNG_MIN, "capital_per_trade": CAPITAL_PER_TRADE,
+ }
+ run_id = insert_run(
+ "fng_sim_comparison",
+ f"F&G 필터 전후 비교 시뮬레이션 (1년치 / F&G≥{FNG_MIN})",
+ params,
+ )
+ insert_result(run_id, "필터 없음 (현행)", sa, None, None)
+ insert_result(run_id, f"F&G≥{FNG_MIN} 필터 (신규)", sb, FNG_MIN, None)
+ for tk, t_list in per_ticker_A.items():
+ insert_trades_bulk(run_id, "필터없음", tk, t_list)
+ for tk, t_list in per_ticker_B.items():
+ insert_trades_bulk(run_id, f"fng_ge{FNG_MIN}", tk, t_list)
+ print(f"\n [DB 저장 완료] run_id: {run_id}")
+ except Exception as e:
+ print(f"\n [DB 저장 실패] {e}")
+
+
+if __name__ == "__main__":
+ main()