From cfbacdacbcf9a25dd0877866f9ee305751089e76 Mon Sep 17 00:00:00 2001 From: joungmin Date: Wed, 4 Mar 2026 09:28:13 +0900 Subject: [PATCH] feat: rewrite strategy to 10m vol-lead with undying signal + watch alert - core/strategy.py: full rewrite to Volume Lead strategy - 10m candle direct detection (no 40m resampling) - F&G 3-tier vol threshold: <=40->6x, 41-50->5x, >50->blocked - Undying signal: price drop does not cancel signal (sig_p fixed) - Vol refresh: stronger vol_r updates signal price and timer - Watch alert: 4x-6x approaching threshold notifies via Telegram - WATCH_VOL_THRESH=4.0, WATCH_COOLDOWN_MIN=30, WATCH_VOL_JUMP=0.5 - daemon/runner.py: remove FNG_MIN_ENTRY block and Bear regime block - Only FNG_MAX_ENTRY(>50) blocks scan (greed/extreme greed) - Fast-poll loop cleaned of regime check - core/notify.py: add notify_watch() for near-signal Telegram alerts - Shows vol_r, distance to threshold, price, quiet pct - tests/: add 1y data collection and simulation scripts - collect_1y_data.py, refresh_cache.py - sim_10m_vol.py, sim_current.py, sim_regime_1y.py - sim_regime_sweep.py, sim_vol_override.py Co-Authored-By: Claude Sonnet 4.6 --- core/notify.py | 34 ++- core/strategy.py | 267 ++++++++------------- daemon/runner.py | 49 ++-- tests/collect_1y_data.py | 153 ++++++++++++ tests/refresh_cache.py | 81 +++++++ tests/sim_10m_vol.py | 382 ++++++++++++++++++++++++++++++ tests/sim_45m40.py | 4 +- tests/sim_current.py | 448 +++++++++++++++++++++++++++++++++++ tests/sim_regime_1y.py | 474 ++++++++++++++++++++++++++++++++++++++ tests/sim_regime_sweep.py | 322 ++++++++++++++++++++++++++ tests/sim_vol_override.py | 353 ++++++++++++++++++++++++++++ 11 files changed, 2358 insertions(+), 209 deletions(-) create mode 100644 tests/collect_1y_data.py create mode 100644 tests/refresh_cache.py create mode 100644 tests/sim_10m_vol.py create mode 100644 tests/sim_current.py create mode 100644 tests/sim_regime_1y.py create mode 100644 tests/sim_regime_sweep.py create mode 100644 tests/sim_vol_override.py diff --git a/core/notify.py b/core/notify.py index 7693778..c913d6e 100644 --- a/core/notify.py +++ b/core/notify.py @@ -75,7 +75,6 @@ def notify_sell( def notify_signal(ticker: str, signal_price: float, vol_mult: float, fng: int = 0) -> None: """거래량 축적 신호 감지 알림.""" - from .fng import FNG_MIN_ENTRY fng_label = ( "극탐욕" if fng >= 76 else "탐욕" if fng >= 56 else @@ -85,17 +84,36 @@ def notify_signal(ticker: str, signal_price: float, vol_mult: float, fng: int = "극공포" ) if fng else "" fng_line = f"F&G: {fng} ({fng_label})\n" if fng else "" - warn_line = ( - f"⚠️ F&G={fng} < {FNG_MIN_ENTRY} → 진입차단중\n" - if fng and fng < FNG_MIN_ENTRY else "" - ) + from .strategy import TREND_AFTER_VOL + target = signal_price * (1 + TREND_AFTER_VOL / 100) _send( f"🔍 [축적감지] {ticker}\n" f"신호가: {signal_price:,.2f}원\n" - f"거래량: {vol_mult:.1f}x 급증 + 2h 횡보\n" + f"거래량: {vol_mult:.1f}x 급증 + 횡보\n" + f"{fng_line}" + f"진입 목표: {target:,.2f}원 (+{TREND_AFTER_VOL}%)" + ) + + +def notify_watch( + ticker: str, price: float, vol_r: float, vth: float, chg: float, fng: int = 0 +) -> None: + """거래량 근접 관찰 알림 (신호 임계값에 가까워진 종목).""" + fng_label = ( + "극탐욕" if fng >= 76 else + "탐욕" if fng >= 56 else + "중립" if fng >= 46 else + "약공포" if fng >= 41 else + "공포" if fng >= 26 else + "극공포" + ) if fng else "" + fng_line = f"F&G: {fng} ({fng_label})\n" if fng else "" + need = vth - vol_r + _send( + f"👀 [관찰] {ticker}\n" + f"거래량: {vol_r:.1f}x (신호까지 +{need:.1f}x 부족)\n" + f"현재가: {price:,.2f}원 | 횡보: {chg:.1f}%\n" f"{fng_line}" - f"{warn_line}" - f"진입 목표: {signal_price * 1.048:,.2f}원 (+4.8%)" ) diff --git a/core/strategy.py b/core/strategy.py index 1e06f8f..d61d58e 100644 --- a/core/strategy.py +++ b/core/strategy.py @@ -1,18 +1,20 @@ -"""Volume Lead 전략: 거래량 축적(급증+횡보) 감지 후 +TREND_AFTER_VOL% 상승 시 선진입. +"""Volume Lead 전략: 10분봉 거래량 급증 + 횡보 감지 후 +THRESH% 상승 시 진입. 흐름: - 1. 직전 40분봉 거래량 > 로컬 5h(7봉) 평균 × VOL_MULT AND - 2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적) + 1. 직전 완성 10분봉 거래량 > 로컬 LV봉(280분) 평균 × VOL_THRESH AND + QN봉(120분) 이전 종가 대비 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적) → 신호가(signal_price) + 거래량비율(vol_ratio) 기록 - 2. signal_price 대비 +임계값% 이상 상승 시 진입 - 임계값은 vol_ratio 강도에 따라 자동 조정 (강한 신호 → 낮은 임계값 → 조기 진입) - - vol_ratio ≥ 5.0x → +1.0% - - vol_ratio ≥ 3.5x → +2.0% - - vol_ratio ≥ 2.5x → +3.0% - - 기본 → +TREND_AFTER_VOL% - 3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화 + * 더 강한 vol(> 기존 sig vol_ratio)이 오면 sig_p 갱신 + 2. signal_price 대비 +TREND_AFTER_VOL%(4.8%) 이상 상승 시 진입 + 3. 신호불사: 가격이 신호가 아래로 내려가도 신호 유지 (sig_p 고정, 만료까지 대기) + 4. SIGNAL_TIMEOUT_MIN(480분=8h) 초과 시 신호 초기화 -캔들: minute10 데이터를 40분봉으로 리샘플링하여 사용 +거래량 임계값 + 진입 차단 (F&G 기반 3구간): + - F&G ≤ FNG_FEAR_THRESHOLD(40): VOL_THRESH_FEAR(6.0x) ← 공포/극공포 + - F&G 41 ~ FNG_MAX_ENTRY(50): VOL_THRESH_NORMAL(5.0x) ← 중립 + - F&G > FNG_MAX_ENTRY(50): 진입 차단 ← 탐욕/극탐욕 + +캔들: minute10 데이터 직접 사용 (40분봉 리샘플링 없음) """ from __future__ import annotations @@ -23,61 +25,37 @@ import time import pyupbit -from .fng import FNG_MIN_ENTRY, is_entry_allowed +from .fng import get_fng from .market import get_current_price -from .market_regime import get_regime -from .notify import notify_signal -from .price_db import get_price_n_hours_ago +from .notify import notify_signal, notify_watch logger = logging.getLogger(__name__) -# 축적 감지 파라미터 -PRICE_QUIET_PCT = float(os.getenv("PRICE_QUIET_PCT", "2.0")) # 2h 횡보 기준 (%) -TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "5.0")) # 진입 임계값 (신호가 대비 %) -SIGNAL_TIMEOUT_H = float(os.getenv("SIGNAL_TIMEOUT_H", "8.0")) # 신호 유효 시간 (h) +# 10분봉 직접 사용 파라미터 +LOCAL_VOL_CANDLES = int(os.getenv("LOCAL_VOL_CANDLES", "28")) # 로컬 vol 평균 구간 (280분) +QUIET_CANDLES = int(os.getenv("QUIET_CANDLES", "12")) # 횡보 체크 구간 (120분) +PRICE_QUIET_PCT = float(os.getenv("PRICE_QUIET_PCT", "2.0")) # 횡보 기준 (%) +TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "4.8")) # 진입 임계값 (신호가 대비 %) +SIGNAL_TIMEOUT_MIN = int(os.getenv("SIGNAL_TIMEOUT_MIN", "480")) # 신호 유효 시간 (분=8h) -# 거래량 파라미터 -LOCAL_VOL_CANDLES = 7 # 5h를 40분봉으로 환산 (int(5 * 60/40) = 7) -VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) +# F&G 기반 거래량 임계값 + 진입 차단 +VOL_THRESH_NORMAL = float(os.getenv("VOL_THRESH_NORMAL", "5.0")) # 중립 구간 (F&G 41~FNG_MAX_ENTRY) +VOL_THRESH_FEAR = float(os.getenv("VOL_THRESH_FEAR", "6.0")) # 공포/극공포 (F&G ≤ FNG_FEAR_THRESHOLD) +FNG_FEAR_THRESHOLD = int(os.getenv("FNG_FEAR_THRESHOLD", "40")) # 공포 기준 (이하 → FEAR 임계값) +FNG_MAX_ENTRY = int(os.getenv("FNG_MAX_ENTRY", "50")) # 진입 허용 최대 F&G (초과 → 차단) -# 40분봉 리샘플링 파라미터 -_CANDLE_MIN = 40 -_FETCH_10M = (LOCAL_VOL_CANDLES + 3) * (_CANDLE_MIN // 10) # 40 개의 10분봉 +# 관찰 알림 (신호 임계값에 근접했지만 미달인 종목) +WATCH_VOL_THRESH = float(os.getenv("WATCH_VOL_THRESH", "4.0")) # 관찰 시작 임계값 +WATCH_COOLDOWN_MIN = int(os.getenv("WATCH_COOLDOWN_MIN", "30")) # 같은 종목 재알림 최소 간격 (분) +WATCH_VOL_JUMP = float(os.getenv("WATCH_VOL_JUMP", "0.5")) # 쿨다운 무시 vol 상승폭 -# 신호 강도별 진입 임계값 단계 (vol_ratio 최소값, 진입 임계값%) -# 강한 신호일수록 낮은 임계값으로 조기 진입 -_ENTRY_TIERS: list[tuple[float, float]] = [ - (5.0, 1.0), # 매우 강한 신호 (≥5x) → +1.0% 즉시 진입 - (3.5, 2.0), # 강한 신호 (≥3.5x) → +2.0% 조기 진입 - (2.5, 3.0), # 중간 신호 (≥2.5x) → +3.0% 반조기 진입 -] -# 위 조건 미충족 시 TREND_AFTER_VOL 사용 - -# 속도(velocity) 기반 조기 진입 파라미터 -# 신호 후 가격이 빠르게 상승 중이면 거리 임계값 도달 전에 선진입 -VELOCITY_THRESHOLD = float(os.getenv("VELOCITY_THRESHOLD", "0.10")) # %/분 (0.10 = 6%/h) -VELOCITY_MIN_MOVE = float(os.getenv("VELOCITY_MIN_MOVE", "0.5")) # 최소 이동 % (잡음 제거) -VELOCITY_MIN_AGE_M = float(os.getenv("VELOCITY_MIN_AGE_M", "5.0")) # 최소 경과 시간(분) - - -def _calc_entry_threshold(vol_ratio: float) -> float: - """거래량 비율에 따른 진입 임계값 반환. 강한 신호일수록 낮은 값.""" - for min_ratio, threshold in _ENTRY_TIERS: - if vol_ratio >= min_ratio: - return threshold - return TREND_AFTER_VOL - - -def _resample_40m(df): - """minute10 DataFrame → 40분봉으로 리샘플링.""" - return ( - df.resample("40min") - .agg({"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}) - .dropna(subset=["close"]) - ) +# 10분봉 조회 수 +_FETCH_10M = LOCAL_VOL_CANDLES + QUIET_CANDLES + 5 # 45봉 # 축적 신호 상태: ticker → {"price": float, "time": float(unix), "vol_ratio": float} _accum_signals: dict[str, dict] = {} +# 관찰 알림 상태: ticker → {"time": float, "vol_ratio": float} +_watch_notified: dict[str, dict] = {} def get_active_signals() -> dict[str, dict]: @@ -89,50 +67,23 @@ def get_active_signals() -> dict[str, dict]: return dict(_accum_signals) -def _check_vol_spike(ticker: str, vol_mult: float) -> bool: - """직전 완성 40분봉 거래량이 로컬 5h(7봉) 평균의 vol_mult 배 이상인지 확인.""" - try: - df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M) - except Exception: - return False - if df10 is None or len(df10) < _CANDLE_MIN // 10 * 2: - return False - - df = _resample_40m(df10) - if len(df) < LOCAL_VOL_CANDLES + 1: - return False - - recent_vol = df["volume"].iloc[-2] # 직전 완성된 40분봉 - local_avg = df["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() # 이전 7봉(≈5h) 평균 - if local_avg <= 0: - return False - - ratio = recent_vol / local_avg - result = ratio >= vol_mult - if result: - logger.debug( - f"[거래량↑] {ticker} 40m={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)" - ) - else: - logger.debug( - f"[거래량✗] {ticker} {ratio:.2f}x < {vol_mult}x" - ) - return result - - def should_buy(ticker: str) -> bool: - """Volume Lead 전략. + """Volume Lead 전략 (10분봉 직접 감지). - 1단계: F&G 필터 — 공포탐욕지수 < FNG_MIN_ENTRY(41)이면 즉시 차단 - 2단계: 거래량 급증 + 2h 횡보 → 신호가 기록 + 1단계: F&G 값으로 vol 임계값 동적 설정 (≤40→6x, >40→5x) + 2단계: 10분봉 거래량 급증 + QN봉 횡보 → 신호가 기록 (더 강한 vol이면 갱신) 3단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입 + 신호불사: 가격이 신호가 아래로 내려가도 신호 유지 (sig_p 고정) """ - # ── F&G 진입 필터 ───────────────────────────────────── - if not is_entry_allowed(): + fng = get_fng() + + # F&G 탐욕/극탐욕 구간 진입 차단 + if fng > FNG_MAX_ENTRY: + logger.debug(f"[F&G차단] {ticker} F&G={fng} > {FNG_MAX_ENTRY} (탐욕) → 진입 금지") return False - regime = get_regime() - vol_mult = regime["vol_mult"] + # F&G 구간별 vol 임계값 + vth = VOL_THRESH_FEAR if fng <= FNG_FEAR_THRESHOLD else VOL_THRESH_NORMAL current = get_current_price(ticker) if not current: @@ -140,101 +91,81 @@ def should_buy(ticker: str) -> bool: now = time.time() - # ── 기존 신호 유효성 확인 ──────────────────────────────── + # ── 신호 만료 체크 ──────────────────────────────────── sig = _accum_signals.get(ticker) if sig is not None: - age_h = (now - sig["time"]) / 3600 - if age_h > SIGNAL_TIMEOUT_H: + age_min = (now - sig["time"]) / 60 + if age_min > SIGNAL_TIMEOUT_MIN: del _accum_signals[ticker] sig = None - logger.debug(f"[축적타임아웃] {ticker} {age_h:.1f}h 경과 → 신호 초기화") + logger.debug(f"[축적타임아웃] {ticker} {age_min:.0f}분 경과 → 신호 초기화") - # ── 신호 없음: 축적 조건 체크 ──────────────────────────── - if sig is None: - # 2h 가격 횡보 확인 (DB 가격 활용) - price_2h = get_price_n_hours_ago(ticker, 2) - if price_2h is None: - return False + # ── 10분봉 데이터 조회 ──────────────────────────────── + try: + df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M) + except Exception: + return False + if df10 is None or len(df10) < LOCAL_VOL_CANDLES + QUIET_CANDLES: + return False - quiet = abs(current - price_2h) / price_2h * 100 < PRICE_QUIET_PCT + # ── 거래량 비율 계산 (직전 완성봉 기준) ─────────────── + vol_prev = float(df10["volume"].iloc[-2]) + vol_avg = float(df10["volume"].iloc[-(LOCAL_VOL_CANDLES + 2):-2].mean()) + vol_r = vol_prev / vol_avg if vol_avg > 0 else 0.0 - if not quiet: - logger.debug( - f"[횡보✗] {ticker} 2h변동={(current - price_2h) / price_2h * 100:+.1f}% " - f"(기준={PRICE_QUIET_PCT}%)" + # ── 횡보 체크 (QN봉 이전 종가 기준) ────────────────── + close_qn = float(df10["close"].iloc[-(QUIET_CANDLES + 1)]) + chg = abs(current - close_qn) / close_qn * 100 if close_qn > 0 else 999.0 + + # ── 관찰 알림: WATCH_VOL_THRESH ≤ vol_r < vth + 횡보 ── + if WATCH_VOL_THRESH <= vol_r < vth and chg < PRICE_QUIET_PCT: + prev = _watch_notified.get(ticker) + age_min = (now - prev["time"]) / 60 if prev else 999.0 + vol_jump = vol_r - prev["vol_ratio"] if prev else vol_r + if prev is None or age_min >= WATCH_COOLDOWN_MIN or vol_jump >= WATCH_VOL_JUMP: + _watch_notified[ticker] = {"time": now, "vol_ratio": vol_r} + logger.info( + f"[관찰] {ticker} vol={vol_r:.2f}x (신호기준={vth:.1f}x) + 횡보({chg:.1f}%) | F&G={fng}" ) - return False + notify_watch(ticker, current, vol_r, vth, chg, fng=fng) + elif vol_r < WATCH_VOL_THRESH: + _watch_notified.pop(ticker, None) - # 거래량 급증 확인 - if not _check_vol_spike(ticker, vol_mult): - return False + # ── vol 스파이크 + 횡보 → 신호 설정/갱신 ──────────── + if vol_r >= vth and chg < PRICE_QUIET_PCT: + if sig is None or vol_r > sig.get("vol_ratio", 0.0): + _accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": vol_r} + sig = _accum_signals[ticker] + logger.info( + f"[축적감지] {ticker} 10m vol={vol_r:.2f}x ≥ {vth:.1f}x + 횡보({chg:.1f}%) " + f"→ 신호가={current:,.2f}원 (F&G={fng})" + ) + notify_signal(ticker, current, vol_r, fng=fng) - # 거래량 비율 계산 후 신호 기록 - ratio = 0.0 - try: - df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M) - df_h = _resample_40m(df10) if df10 is not None else None - if df_h is not None and len(df_h) >= LOCAL_VOL_CANDLES + 1: - recent_vol = df_h["volume"].iloc[-2] - local_avg = df_h["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() - ratio = recent_vol / local_avg if local_avg > 0 else 0.0 - except Exception: - pass - - entry_thr = _calc_entry_threshold(ratio) - _accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": ratio} - from .fng import get_fng - fng_now = get_fng() - logger.info( - f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}원 " - f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}% | F&G={fng_now})" - ) - notify_signal(ticker, current, ratio, fng=fng_now) - return False # 신호 첫 발생 시는 진입 안 함 - - # ── 신호 있음: 상승 확인 → 진입 ───────────────────────── - signal_price = sig["price"] - vol_ratio = sig.get("vol_ratio", 0.0) - entry_thr = _calc_entry_threshold(vol_ratio) - move_pct = (current - signal_price) / signal_price * 100 - age_min = (now - sig["time"]) / 60 - - if current < signal_price: - # 신호가 이하 하락 → 축적 실패 - del _accum_signals[ticker] + if sig is None: logger.debug( - f"[축적실패] {ticker} 신호가={signal_price:,.2f} → 현재={current:,.2f} (하락) → 초기화" + f"[축적✗] {ticker} vol={vol_r:.2f}x (기준={vth:.1f}x) / 횡보={chg:.1f}%" ) return False - # ── 거리 기반 진입 ───────────────────────────────────── - if move_pct >= entry_thr: + # ── 진입 확인: 신호가 대비 +TREND_AFTER_VOL% 이상 ── + signal_price = sig["price"] + vol_ratio = sig["vol_ratio"] + move_pct = (current - signal_price) / signal_price * 100 + age_min = (now - sig["time"]) / 60 + + if move_pct >= TREND_AFTER_VOL: del _accum_signals[ticker] logger.info( f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}원 " - f"(+{move_pct:.1f}% ≥ {entry_thr:.1f}% | 거래량={vol_ratio:.2f}x)" + f"(+{move_pct:.1f}% ≥ {TREND_AFTER_VOL}% | 거래량={vol_ratio:.2f}x | F&G={fng})" ) return True - # ── 속도 기반 조기 진입 ──────────────────────────────── - # 신호 후 N분 이상 경과 + 최소 이동 + 분당 상승률 기준 충족 시 선진입 - if age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE: - velocity = move_pct / age_min # %/분 - if velocity >= VELOCITY_THRESHOLD: - del _accum_signals[ticker] - logger.info( - f"[속도진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}원 " - f"(+{move_pct:.1f}% | {velocity:.3f}%/분 ≥ {VELOCITY_THRESHOLD}%/분 | " - f"경과={age_min:.1f}분 | 거래량={vol_ratio:.2f}x)" - ) - return True - + # 신호불사: 가격 하락해도 신호 유지 (sig_p 고정, 만료까지 대기) logger.debug( f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} " - f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}% | " - f"속도={move_pct/age_min:.3f}%/분 | 경과={age_min:.1f}분)" - if age_min > 0 else - f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} " - f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}%)" + f"({move_pct:+.1f}% / 목표={TREND_AFTER_VOL}% | " + f"거래량={vol_ratio:.2f}x | 경과={age_min:.0f}분)" ) return False diff --git a/daemon/runner.py b/daemon/runner.py index f3b2517..5efa585 100644 --- a/daemon/runner.py +++ b/daemon/runner.py @@ -6,10 +6,9 @@ import threading import time from core import trader -from core.fng import FNG_MIN_ENTRY, get_fng +from core.fng import get_fng from core.market import get_top_tickers -from core.market_regime import get_regime -from core.strategy import get_active_signals, should_buy +from core.strategy import FNG_MAX_ENTRY, get_active_signals, should_buy logger = logging.getLogger(__name__) @@ -28,21 +27,19 @@ def _fast_poll_loop() -> None: try: signals = get_active_signals() if signals: - regime = get_regime() - if regime["name"] != "bear": - positions = trader.get_positions() - for ticker in list(signals): - if ticker in positions: - continue - if len(positions) >= trader.MAX_POSITIONS: - break - try: - if should_buy(ticker): - logger.info(f"[빠른감시] 매수 신호: {ticker}") - trader.buy(ticker) - time.sleep(0.1) - except Exception as e: - logger.error(f"[빠른감시] 오류 {ticker}: {e}") + positions = trader.get_positions() + for ticker in list(signals): + if ticker in positions: + continue + if len(positions) >= trader.MAX_POSITIONS: + break + try: + if should_buy(ticker): + logger.info(f"[빠른감시] 매수 신호: {ticker}") + trader.buy(ticker) + time.sleep(0.1) + except Exception as e: + logger.error(f"[빠른감시] 오류 {ticker}: {e}") except Exception as e: logger.error(f"신호 감시 루프 오류: {e}") @@ -64,26 +61,16 @@ def run_scanner() -> None: time.sleep(SCAN_INTERVAL) continue - # Bear 레짐 시 신규 매수 완전 차단 - regime = get_regime() - if regime["name"] == "bear": - logger.info( - f"[Bear차단] 레짐={regime['emoji']} BEAR " - f"(score={regime['score']:+.2f}%) — 신규 매수 스킵" - ) - time.sleep(SCAN_INTERVAL) - continue - - # F&G 진입 필터 — 차단 구간이면 전체 스캔 스킵 + # F&G 탐욕/극탐욕 구간 → 전체 스캔 스킵 (strategy.py와 동일 기준) fv = get_fng() fng_label = ( "극탐욕" if fv >= 76 else "탐욕" if fv >= 56 else "중립" if fv >= 46 else "약공포" if fv >= 41 else "공포" if fv >= 26 else "극공포" ) - if fv < FNG_MIN_ENTRY: + if fv > FNG_MAX_ENTRY: logger.info( - f"[F&G차단] F&G={fv} ({fng_label}) < {FNG_MIN_ENTRY} — 신규 매수 스킵" + f"[F&G차단] F&G={fv} ({fng_label}) > {FNG_MAX_ENTRY} — 탐욕 구간 스캔 스킵" ) time.sleep(SCAN_INTERVAL) continue diff --git a/tests/collect_1y_data.py b/tests/collect_1y_data.py new file mode 100644 index 0000000..bc3aa52 --- /dev/null +++ b/tests/collect_1y_data.py @@ -0,0 +1,153 @@ +"""1년치 데이터 수집 — 10분봉 OHLCV + F&G 히스토리. + +생성 파일: + data/sim1y_cache.pkl — {"10m": {ticker: DataFrame}} (10분봉, 365일) + data/fng_1y.json — {"YYYY-MM-DD": int, ...} (Fear & Greed 1년치) + +소요 시간: 약 10~15분 (20종목 × 263 API 호출) +""" +import os as _os, sys as _sys +_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) + +import json +import pickle +import time +import urllib.request +from datetime import datetime, timedelta +from pathlib import Path + +import pandas as pd +import pyupbit +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") + +# ── 설정 ───────────────────────────────────────────────── +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" +FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" +TOP30_FILE = Path(__file__).parent.parent / "data" / "top30_tickers.pkl" +DAYS = 365 +TOP_N = 20 + + +# ── 10분봉 수집 ─────────────────────────────────────────── +def fetch_10m(ticker: str, days: int) -> "pd.DataFrame | None": + target_start = datetime.now() - timedelta(days=days) + all_dfs, to, prev_oldest = [], None, None + while True: + kwargs = dict(ticker=ticker, interval="minute10", count=200) + if to: + kwargs["to"] = to.strftime("%Y-%m-%d %H:%M:%S") + try: + df = pyupbit.get_ohlcv(**kwargs) + except Exception: + time.sleep(0.5) + break + if df is None or df.empty: + break + all_dfs.append(df) + oldest = df.index[0] + if prev_oldest is not None and oldest >= prev_oldest: + break + prev_oldest = oldest + if oldest <= target_start: + break + to = oldest + time.sleep(0.12) + if not all_dfs: + return None + combined = pd.concat(all_dfs).sort_index() + combined = combined[~combined.index.duplicated(keep="last")] + return combined[combined.index >= target_start] + + +# ── F&G 1년치 수집 ──────────────────────────────────────── +def fetch_fng(limit: int = 400) -> dict: + url = f"https://api.alternative.me/fng/?limit={limit}&format=json" + with urllib.request.urlopen(url, timeout=15) as r: + data = json.loads(r.read()) + result = {} + for e in data["data"]: + dt = datetime.fromtimestamp(int(e["timestamp"])) + result[dt.strftime("%Y-%m-%d")] = int(e["value"]) + return result + + +# ── 메인 ───────────────────────────────────────────────── +def main(): + # ── 종목 목록 ───────────────────────────────────────── + try: + from core.market import get_top_tickers + tickers = get_top_tickers()[:TOP_N] + print(f"Top{TOP_N} 종목 API 조회: {tickers}\n") + # top30 파일 갱신 + pickle.dump(tickers, open(TOP30_FILE, "wb")) + except Exception as e: + print(f" [경고] 종목 API 실패: {e}") + if TOP30_FILE.exists(): + tickers = pickle.load(open(TOP30_FILE, "rb"))[:TOP_N] + print(f" 기존 top30 파일 사용: {tickers}\n") + else: + print(" [오류] 종목 목록 없음. 종료.") + return + + # ── F&G 1년치 ───────────────────────────────────────── + print("F&G 1년치 수집...") + try: + fng_map = fetch_fng(limit=400) + sorted_dates = sorted(fng_map.keys()) + print(f" 기간: {sorted_dates[0]} ~ {sorted_dates[-1]} ({len(fng_map)}일)") + # 분포 + zones = {"극공포(≤25)": 0, "공포(26~40)": 0, "중립(41~55)": 0, + "탐욕(56~75)": 0, "극탐욕(76+)": 0} + for v in fng_map.values(): + if v <= 25: zones["극공포(≤25)"] += 1 + elif v <= 40: zones["공포(26~40)"] += 1 + elif v <= 55: zones["중립(41~55)"] += 1 + elif v <= 75: zones["탐욕(56~75)"] += 1 + else: zones["극탐욕(76+)"] += 1 + total = sum(zones.values()) + for name, cnt in zones.items(): + print(f" {name:12} {cnt:>3}일 ({cnt/total*100:.1f}%)") + json.dump(fng_map, open(FNG_FILE, "w")) + print(f" 저장: {FNG_FILE}\n") + except Exception as e: + print(f" [오류] F&G 수집 실패: {e}\n") + fng_map = {} + + # ── 10분봉 1년치 ────────────────────────────────────── + print(f"10분봉 {DAYS}일치 수집 중 ({len(tickers)}종목)...") + print(f" 예상 소요: {len(tickers) * 265 * 0.12 / 60:.0f}~{len(tickers) * 265 * 0.15 / 60:.0f}분\n") + + data = {"10m": {}} + for i, ticker in enumerate(tickers, 1): + start_t = time.time() + df = fetch_10m(ticker, DAYS) + elapsed = time.time() - start_t + if df is not None and len(df) > 500: + data["10m"][ticker] = df + candles = len(df) + period = f"{df.index[0].strftime('%Y-%m-%d')}~{df.index[-1].strftime('%Y-%m-%d')}" + print(f" {i:>2}/{len(tickers)} {ticker:<15} {candles:>6}봉 {period} ({elapsed:.0f}s)") + else: + print(f" {i:>2}/{len(tickers)} {ticker:<15} 데이터 부족 ({elapsed:.0f}s)") + time.sleep(0.15) + + # ── 저장 ────────────────────────────────────────────── + print(f"\n수집 완료: {len(data['10m'])}종목") + if data["10m"]: + sample = next(iter(data["10m"].values())) + print(f"기간: {sample.index[0].strftime('%Y-%m-%d')} ~ {sample.index[-1].strftime('%Y-%m-%d')}") + print(f"봉 수: {len(sample)}개 (10분봉)") + # 파일 크기 추정 + import sys + size_mb = sys.getsizeof(pickle.dumps(data)) / 1024 / 1024 + print(f"예상 크기: {size_mb:.1f} MB") + + pickle.dump(data, open(CACHE_FILE, "wb")) + print(f"\n캐시 저장: {CACHE_FILE}") + print("완료!") + + +if __name__ == "__main__": + main() diff --git a/tests/refresh_cache.py b/tests/refresh_cache.py new file mode 100644 index 0000000..6292021 --- /dev/null +++ b/tests/refresh_cache.py @@ -0,0 +1,81 @@ +"""10분봉 캐시 갱신 스크립트 — 최신 45일 데이터를 Upbit API로 재수집.""" + +import os, sys, pickle, time +from pathlib import Path +from datetime import datetime, timedelta + +import pyupbit +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") +sys.path.insert(0, str(Path(__file__).parent.parent)) + +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl" +TOP30_FILE = Path(__file__).parent.parent / "data" / "top30_tickers.pkl" +SIM_DAYS = 45 +TOP_N = 20 + + +def fetch_10m(ticker: str, days: int) -> "pd.DataFrame | None": + import pandas as pd + target_start = datetime.now() - timedelta(days=days) + all_dfs, to, prev_oldest = [], None, None + while True: + kwargs = dict(ticker=ticker, interval="minute10", count=200) + if to: + kwargs["to"] = to.strftime("%Y-%m-%d %H:%M:%S") + try: + df = pyupbit.get_ohlcv(**kwargs) + except Exception: + time.sleep(0.5) + break + if df is None or df.empty: + break + all_dfs.append(df) + oldest = df.index[0] + if prev_oldest is not None and oldest >= prev_oldest: + break + prev_oldest = oldest + if oldest <= target_start: + break + to = oldest + time.sleep(0.12) + if not all_dfs: + return None + combined = pd.concat(all_dfs).sort_index() + combined = combined[~combined.index.duplicated(keep="last")] + return combined[combined.index >= target_start] + + +def main(): + # 현재 Top20 종목 가져오기 + from core.market import get_top_tickers + print("Top20 종목 조회...") + tickers = get_top_tickers()[:TOP_N] + print(f" {tickers}\n") + + data = {"10m": {}} + for i, ticker in enumerate(tickers, 1): + print(f"\r {i:>2}/{len(tickers)} {ticker} ", end="", flush=True) + df = fetch_10m(ticker, SIM_DAYS) + if df is not None and len(df) > 100: + data["10m"][ticker] = df + time.sleep(0.15) + + print(f"\n\n종목: {len(data['10m'])}개") + if data["10m"]: + sample = next(iter(data["10m"].values())) + print(f"기간: {sample.index[0].strftime('%Y-%m-%d')} ~ {sample.index[-1].strftime('%Y-%m-%d')}") + print(f"레코드: {len(sample)}개") + + # 저장 + pickle.dump(data, open(CACHE_FILE, "wb")) + print(f"\n캐시 저장: {CACHE_FILE}") + + # top30 갱신 + pickle.dump(tickers, open(TOP30_FILE, "wb")) + print(f"종목 저장: {TOP30_FILE}") + + +if __name__ == "__main__": + main() diff --git a/tests/sim_10m_vol.py b/tests/sim_10m_vol.py new file mode 100644 index 0000000..d0d7d9e --- /dev/null +++ b/tests/sim_10m_vol.py @@ -0,0 +1,382 @@ +"""10분봉 vol 감지 vs 40분봉 vol 감지 비교 시뮬레이션. + +40분봉 집계 시 10분봉 spike가 희석되는 문제를 해결하기 위해 +신호 감지를 10분봉 기준으로 실행하고 40분봉 전략과 노이즈/수익 비교. + +비교 모드 (각 필터 조합 × 2개 봉 단위): + 10분봉 detection: + A. 10m 필터없음 + B. 10m F&G≥41 + BEAR차단N5 + C. 10m vol≥5x 오버라이드 (F&G+레짐 무시) + 40분봉 detection (기준선): + D. 40m 필터없음 + E. 40m F&G≥41 + BEAR차단N5 + F. 40m vol≥5x 오버라이드 + +데이터: data/sim1y_cache.pkl (10분봉 1년치) +""" +import os as _os, sys as _sys +_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) + +import json +import pickle +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") + +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" +FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" +TOP_N = 20 + +BUDGET = 15_000_000 +MIN_BUDGET = BUDGET * 3 // 10 +MAX_POS = 3 +FEE = 0.0005 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 +THRESH = 4.8 +QUIET_PCT = 2.0 +BEAR_THRESHOLD = -0.5 +BULL_THRESHOLD = 1.5 +FNG_MIN_ENTRY = 41 + +# ── 40분봉 파라미터 ──────────────────────────────── +P40 = dict( + local_vol_n = 7, + quiet_n = 3, + signal_to_n = 12, + atr_n = 7, + ts_n = 12, + time_stop_pct = 3.0, + vol_mult = 2.0, +) + +# ── 10분봉 파라미터 (벽시계 기준 동등) ─────────────── +# LOCAL_VOL_N: 40m×7=280min → 10min×28 +# QUIET_N: 40m×3=120min → 10min×12 +# SIGNAL_TO_N: 40m×12=480min → 10min×48 +# ATR_N: 40m×7=280min → 10min×28 +# TS_N: 40m×12=480min → 10min×48 +P10 = dict( + local_vol_n = 28, + quiet_n = 12, + signal_to_n = 48, + atr_n = 28, + ts_n = 48, + time_stop_pct = 3.0, + vol_mult = 2.0, +) + +REGIME_N = 5 # 40분봉 기준 +REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, + "KRW-SOL": 0.15, "KRW-XRP": 0.15} +WF_WINDOW = 4 +WF_MIN_WIN_RATE = 0.01 +WF_SHADOW_WINS = 2 +VOL_OVERRIDE_THRESH = 5.0 + + +# ───────────────────────────────────────────────────────────────────────────── +def resample_40m(df): + return (df.resample("40min") + .agg({"open":"first","high":"max","low":"min", + "close":"last","volume":"sum"}) + .dropna(subset=["close"])) + + +def build_regime_series(dfs40): + weighted = None + for ticker, w in REGIME_WEIGHTS.items(): + if ticker not in dfs40: continue + pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100 + weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) + return weighted if weighted is not None else pd.Series(dtype=float) + + +def regime_to_10m(regime_40m: pd.Series, df_10m: pd.DataFrame) -> pd.Series: + """40분봉 레짐 시리즈를 10분봉 인덱스에 ffill 매핑.""" + combined = regime_40m.reindex( + regime_40m.index.union(df_10m.index) + ).ffill() + return combined.reindex(df_10m.index) + + +def calc_atr(df, buy_idx, atr_n): + sub = df.iloc[max(0, buy_idx - atr_n - 1):buy_idx] + if len(sub) < 3: + return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-atr_n:].mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +def simulate_pos(df, buy_idx, buy_price, stop_pct, ts_n, time_stop_pct): + peak = buy_price + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + if row["high"] > peak: + peak = row["high"] + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[i], pnl + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= ts_n and pnl_now < time_stop_pct: + pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[i], pnl + last = df.iloc[-1]["close"] + pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[-1], pnl + + +def run_strategy(df, ticker, regime_series, fng_map, p, + use_fng, use_regime, vol_override_thresh): + """ + 공통 전략 함수. df = 봉 단위 OHLCV (10분봉 또는 40분봉). + regime_series: df 인덱스와 정렬된 레짐 시리즈. + + 우선순위: + ① 포지션 청산 + ② 축적 신호 감지 (필터 무관, 항상 실행) + ③ 진입: vol_strong → 모든 필터 skip; 아니면 F&G+레짐 체크 + """ + local_vol_n = p["local_vol_n"] + quiet_n = p["quiet_n"] + signal_to_n = p["signal_to_n"] + atr_n = p["atr_n"] + ts_n = p["ts_n"] + time_stop_pct = p["time_stop_pct"] + vol_mult = p["vol_mult"] + + trades = [] + sig_i = sig_p = sig_vr = None + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(local_vol_n + 2, quiet_n + 1) + + while i < len(df): + ts = df.index[i] + row = df.iloc[i] + cur = row["close"] + + # ── ① 포지션 청산 ───────────────────────────────── + if in_pos: + is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, + ts_n, time_stop_pct) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker)) + in_pos = False; sig_i = sig_p = sig_vr = None; i = next_i + continue + + # 신호 만료 체크 + if sig_i is not None and (i - sig_i) > signal_to_n: + sig_i = sig_p = sig_vr = None + + # ── ② 축적 신호 감지 (항상 실행) ────────────────────── + if sig_i is None: + vol_p = df.iloc[i-1]["volume"] + vol_avg = df.iloc[i-1-local_vol_n:i-1]["volume"].mean() + vol_r = vol_p / vol_avg if vol_avg > 0 else 0 + close_qh = df.iloc[i-quiet_n]["close"] + chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 + if chg_qh < QUIET_PCT and vol_r >= vol_mult: + sig_i = i; sig_p = cur; sig_vr = vol_r + i += 1 + continue + + # 신호 이후 가격 하락 → 초기화 + if cur < sig_p: + sig_i = sig_p = sig_vr = None + i += 1 + continue + + # ── ③ 진입 체크 ───────────────────────────────────── + vol_strong = (vol_override_thresh > 0 + and sig_vr is not None + and sig_vr >= vol_override_thresh) + + if not vol_strong: + # F&G 필터 + if use_fng and fng_map: + fv = fng_map.get(ts.strftime("%Y-%m-%d"), 50) + if fv < FNG_MIN_ENTRY: + i += 1 + continue + # 레짐 BEAR 차단 + if use_regime and not regime_series.empty and ts in regime_series.index: + v = regime_series.loc[ts] + if not pd.isna(v) and float(v) < BEAR_THRESHOLD: + i += 1 + continue + + move_pct = (cur - sig_p) / sig_p * 100 + if move_pct >= THRESH: + in_pos = True; buy_idx = i; buy_price = cur + stop_pct = calc_atr(df, i, atr_n) + sig_i = sig_p = sig_vr = None + i += 1 + return trades + + +def apply_wf(trades): + history = []; shadow = 0; blocked = False; accepted = []; cnt = 0 + for t in trades: + is_win = int(t[0]) + if not blocked: + accepted.append(t); history.append(is_win) + if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: + blocked = True; shadow = 0 + else: + cnt += 1 + if is_win: + shadow += 1 + if shadow >= WF_SHADOW_WINS: + blocked = False; history = []; shadow = 0 + else: + shadow = 0 + return accepted, cnt + + +def apply_max_pos(trades): + open_exits = []; accepted = []; skipped = [] + for t in trades: + buy_dt, sell_dt = t[2], t[3] + open_exits = [s for s in open_exits if s > buy_dt] + if len(open_exits) < MAX_POS: + open_exits.append(sell_dt); accepted.append(t) + else: + skipped.append(t) + return accepted, skipped + + +def run_compound(accepted): + portfolio = float(BUDGET); total_krw = 0.0 + wins = 0; peak = BUDGET; max_dd = 0.0; pf = float(BUDGET) + for is_win, pnl, buy_dt, sell_dt, ticker in accepted: + pos_size = max(portfolio, MIN_BUDGET) / MAX_POS + krw_profit = pos_size * pnl / 100 + portfolio = max(portfolio + krw_profit, MIN_BUDGET) + total_krw += krw_profit + wins += int(is_win) + pf = max(pf + max(pf, MIN_BUDGET) / MAX_POS * pnl / 100, MIN_BUDGET) + peak = max(peak, pf) + max_dd = max(max_dd, (peak - pf) / peak * 100) + return { + "portfolio": portfolio, "total_krw": total_krw, + "roi_pct": (portfolio - BUDGET) / BUDGET * 100, + "total": len(accepted), "wins": wins, + "wr": wins / len(accepted) * 100 if accepted else 0, + "max_dd": max_dd, + } + + +def sim_mode(dfs_per_ticker, regime_map, fng_map, p, + use_fng, use_regime, vol_override_thresh): + """ + dfs_per_ticker: {ticker: DataFrame (10m 또는 40m)} + regime_map: {ticker: regime Series (같은 인덱스로 정렬됨)} + """ + all_trades = []; wf_total = 0 + for ticker, df in dfs_per_ticker.items(): + rs = regime_map.get(ticker, pd.Series(dtype=float)) + raw = run_strategy(df, ticker, rs, fng_map, p, + use_fng, use_regime, vol_override_thresh) + filtered, blocked = apply_wf(raw) + wf_total += blocked + all_trades.extend(filtered) + all_trades.sort(key=lambda x: x[2]) + accepted, skipped = apply_max_pos(all_trades) + return run_compound(accepted), wf_total, len(skipped) + + +def fmt(r, wf, skip): + if r["total"] == 0: + return "진입없음" + return (f"{r['total']:>5}건 {r['wr']:>4.1f}% " + f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 " + f"-{r['max_dd']:>4.1f}% wf:{wf} skip:{skip}") + + +def main(): + print("캐시 로드 중...") + cache = pickle.load(open(CACHE_FILE, "rb")) + fng_map = json.loads(FNG_FILE.read_text()) + + tickers = [t for t in list(cache["10m"].keys())[:TOP_N] + if len(cache["10m"][t]) > 500] + print(f" 종목: {len(tickers)}개") + + dfs10 = {t: cache["10m"][t] for t in tickers} + dfs40 = {t: resample_40m(df) for t, df in dfs10.items()} + + # 레짐 (40분봉 기반) + regime_40m = build_regime_series(dfs40) + + # 40분봉용 regime map + regime_map_40 = {t: regime_40m for t in tickers} + + # 10분봉용 regime map (ffill 매핑) + regime_map_10 = { + t: regime_to_10m(regime_40m, dfs10[t]) + for t in tickers + } + + sample = next(iter(dfs10.values())) + start_dt = sample.index[0].strftime("%Y-%m-%d") + end_dt = sample.index[-1].strftime("%Y-%m-%d") + + print(f"\n{'='*80}") + print(f" 10분봉 vs 40분봉 vol 감지 비교 | {start_dt} ~ {end_dt} | {len(tickers)}종목") + print(f" vol override: ≥{VOL_OVERRIDE_THRESH}x | F&G≥{FNG_MIN_ENTRY} | BEAR차단N{REGIME_N}") + print(f"{'='*80}") + print(f" {'모드':<32} │ {'진입':>5} {'승률':>5} │ {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>6}") + print(f" {'─'*76}") + + # ── 10분봉 모드들 ───────────────────────────────────────────────────── + print(f"\n [10분봉 vol 감지 — local_vol_n={P10['local_vol_n']}봉({P10['local_vol_n']*10}분) quiet_n={P10['quiet_n']}봉({P10['quiet_n']*10}분)]") + + modes_10 = [ + ("A. 10m 필터없음", False, False, 0.0), + ("B. 10m F&G+BEAR차단", True, True, 0.0), + (f"C. 10m vol≥{VOL_OVERRIDE_THRESH}x 오버라이드", True, True, VOL_OVERRIDE_THRESH), + ] + for label, uf, ur, vt in modes_10: + r, wf, skip = sim_mode(dfs10, regime_map_10, fng_map, P10, uf, ur, vt) + print(f" {label:<32} │ {fmt(r, wf, skip)}") + + # ── 40분봉 모드들 (기준선) ────────────────────────────────────────── + print(f"\n [40분봉 vol 감지 — local_vol_n={P40['local_vol_n']}봉({P40['local_vol_n']*40}분) quiet_n={P40['quiet_n']}봉({P40['quiet_n']*40}분)]") + + modes_40 = [ + ("D. 40m 필터없음", False, False, 0.0), + ("E. 40m F&G+BEAR차단", True, True, 0.0), + (f"F. 40m vol≥{VOL_OVERRIDE_THRESH}x 오버라이드", True, True, VOL_OVERRIDE_THRESH), + ] + for label, uf, ur, vt in modes_40: + r, wf, skip = sim_mode(dfs40, regime_map_40, fng_map, P40, uf, ur, vt) + print(f" {label:<32} │ {fmt(r, wf, skip)}") + + print(f"\n{'='*80}") + + # ── vol≥5x 신호 품질 비교 (10m vs 40m) ───────────────────────────── + print("\n [vol≥5x 오버라이드 신호 품질 비교 — 필터 없음 조건에서 override 효과]") + print(f" {'모드':<32} │ {'진입':>5} {'승률':>5} │ {'수익률':>8} {'낙폭':>6}") + for label, dfs, rmap, p in [ + ("10m vol≥5x (filter none)", dfs10, regime_map_10, P10), + ("40m vol≥5x (filter none)", dfs40, regime_map_40, P40), + ]: + r, wf, skip = sim_mode(dfs, rmap, fng_map, p, False, False, VOL_OVERRIDE_THRESH) + if r["total"]: + print(f" {label:<32} │ {r['total']:>5}건 {r['wr']:>4.1f}% " + f"{r['roi_pct']:>+7.2f}% -{r['max_dd']:>4.1f}%") + print(f"{'='*80}") + + +if __name__ == "__main__": + main() diff --git a/tests/sim_45m40.py b/tests/sim_45m40.py index 8538b12..70cfb53 100644 --- a/tests/sim_45m40.py +++ b/tests/sim_45m40.py @@ -15,8 +15,8 @@ load_dotenv(dotenv_path=Path(__file__).parent / ".env") sys.path.insert(0, str(Path(__file__).parent)) # ── 파라미터 ───────────────────────────────────────────── -CACHE_FILE = Path("sim10m_cache.pkl") -TOP30_FILE = Path("top30_tickers.pkl") +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl" +TOP30_FILE = Path(__file__).parent.parent / "data" / "top30_tickers.pkl" TOP_N = 20 BUDGET = 15_000_000 diff --git a/tests/sim_current.py b/tests/sim_current.py new file mode 100644 index 0000000..44571c8 --- /dev/null +++ b/tests/sim_current.py @@ -0,0 +1,448 @@ +"""현재 전략 기준 45일 복리 시뮬레이션 — 40분봉. + +sim_45m40.py의 검증된 코어 로직을 기반으로 +현재 전략 추가사항만 반영: + + F&G 필터 (FNG_MIN_ENTRY=41) + + 시장 레짐 필터 (BEAR < -0.5% → 차단, BULL ≥ 1.5% → vol_mult 완화) + + 신호 강도별 진입 임계값 티어 (5x→1%, 3.5x→2%, 2.5x→3%, 기본→5%) + + 속도 기반 조기 진입 (0.10%/분) +""" + +import json +import os +import pickle +import sys +import urllib.request +import datetime +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") +sys.path.insert(0, str(Path(__file__).parent.parent)) + +# ── 파라미터 ───────────────────────────────────────────── +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl" +TOP_N = 20 + +BUDGET = 15_000_000 +MIN_BUDGET = BUDGET * 3 // 10 +MAX_POS = 3 + +FEE = 0.0005 +TIME_STOP_MIN_PCT= 3.0 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 + +VOL_MULT_NEUTRAL = 2.0 # NEUTRAL 레짐 +VOL_MULT_BULL = 1.5 # BULL 레짐 +QUIET_PCT = 2.0 +THRESH_BASE = 5.0 # 기본 진입 임계값 (TREND_AFTER_VOL) + +# 신호 강도별 임계값 티어 +ENTRY_TIERS = [(5.0, 1.0), (3.5, 2.0), (2.5, 3.0)] + +# 속도 진입 +VELOCITY_THRESHOLD = 0.10 # %/분 +VELOCITY_MIN_MOVE = 0.5 # 최소 이동 % +VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분 + +# F&G +FNG_MIN_ENTRY = int(os.getenv("FNG_MIN_ENTRY", "41")) + +# 레짐 +BEAR_THRESHOLD = -0.5 +BULL_THRESHOLD = 1.5 +REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, + "KRW-SOL": 0.15, "KRW-XRP": 0.15} + +# 40분봉 봉수 환산 +LOCAL_VOL_N = 7 # 5h +QUIET_N = 3 # 2h +SIGNAL_TO_N = 12 # 8h +ATR_N = 7 +TS_N = 12 # 8h (타임스탑) +REGIME_N = 3 # 2h (레짐 추세) + + +# ── F&G 히스토리 ───────────────────────────────────────── +def load_fng_history() -> dict: + try: + url = "https://api.alternative.me/fng/?limit=90&format=json" + with urllib.request.urlopen(url, timeout=10) as r: + data = json.loads(r.read()) + result = {} + for e in data["data"]: + dt = datetime.datetime.fromtimestamp(int(e["timestamp"])) + result[dt.strftime("%Y-%m-%d")] = int(e["value"]) + return result + except Exception as ex: + print(f" [경고] F&G 로드 실패: {ex} → 필터 비활성화") + return {} + + +# ── 리샘플링 ───────────────────────────────────────────── +def resample_40m(df: pd.DataFrame) -> pd.DataFrame: + return ( + df.resample("40min") + .agg({"open": "first", "high": "max", "low": "min", + "close": "last", "volume": "sum"}) + .dropna(subset=["close"]) + ) + + +# ── 레짐 시리즈 ────────────────────────────────────────── +def build_regime_series(dfs: dict) -> pd.Series: + weighted = None + for ticker, w in REGIME_WEIGHTS.items(): + if ticker not in dfs: + continue + pct = dfs[ticker]["close"].pct_change(REGIME_N) * 100 + weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) + return weighted if weighted is not None else pd.Series(dtype=float) + + +# ── 임계값 ─────────────────────────────────────────────── +def calc_entry_threshold(vol_ratio: float) -> float: + for min_r, thr in ENTRY_TIERS: + if vol_ratio >= min_r: + return thr + return THRESH_BASE + + +# ── ATR ────────────────────────────────────────────────── +def calc_atr(df: pd.DataFrame, buy_idx: int) -> float: + sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx] + if len(sub) < 3: + return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +# ── 포지션 시뮬 (기존 sim_45m40.py와 동일) ─────────────── +def simulate_pos(df: pd.DataFrame, buy_idx: int, + buy_price: float, stop_pct: float): + peak = buy_price + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + ts = df.index[i] + if row["high"] > peak: + peak = row["high"] + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, ts, pnl, "trailing_stop" + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT: + pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, ts, pnl, "time_stop" + last = df.iloc[-1]["close"] + pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, df.index[-1], pnl, "end_of_data" + + +# ── vol-lead 전략 (현재 전략 파라미터 전체 반영) ────────── +def run_vol_lead(df: pd.DataFrame, ticker: str, + fng_map: dict, regime_series: pd.Series) -> list: + trades = [] + sig_i = None # 신호 봉 인덱스 + sig_p = None # 신호가 + sig_vr = 0.0 # 신호 vol_ratio + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(LOCAL_VOL_N + 2, QUIET_N + 1) + + while i < len(df): + ts = df.index[i] + row = df.iloc[i] + cur = row["close"] + + # ── 포지션 보유 중: 청산 체크 ───────────────────── + if in_pos: + is_win, sdt, pnl, reason = simulate_pos(df, buy_idx, buy_price, stop_pct) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, reason)) + in_pos = False + sig_i = sig_p = None + i = next_i + continue + + # ── F&G 필터 ────────────────────────────────────── + date_str = ts.strftime("%Y-%m-%d") + if fng_map: + fv = fng_map.get(date_str, 50) + if fv < FNG_MIN_ENTRY: + sig_i = sig_p = None # 신호 초기화 + i += 1 + continue + + # ── 레짐 필터 ───────────────────────────────────── + score = 0.0 + if not regime_series.empty and ts in regime_series.index: + v = regime_series.loc[ts] + score = float(v) if not pd.isna(v) else 0.0 + + if score < BEAR_THRESHOLD: + sig_i = sig_p = None + i += 1 + continue + + vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_NEUTRAL + + # ── 신호 타임아웃 ────────────────────────────────── + if sig_i is not None and (i - sig_i) > SIGNAL_TO_N: + sig_i = sig_p = None + + # ── 신호 있음: 진입 체크 ────────────────────────── + if sig_i is not None: + move_pct = (cur - sig_p) / sig_p * 100 + age_min = (i - sig_i) * 40 # 봉수 → 분 + entry_thr = calc_entry_threshold(sig_vr) + + if cur < sig_p: + # 신호가 이하 하락 → 초기화 + sig_i = sig_p = None + elif move_pct >= entry_thr: + # 거리 기반 진입 + in_pos = True + buy_idx = i + buy_price = cur + stop_pct = calc_atr(df, i) + sig_i = sig_p = None + elif age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE: + velocity = move_pct / age_min + if velocity >= VELOCITY_THRESHOLD: + # 속도 기반 조기 진입 + in_pos = True + buy_idx = i + buy_price = cur + stop_pct = calc_atr(df, i) + sig_i = sig_p = None + i += 1 + continue + + # ── 신호 없음: 축적 조건 체크 ──────────────────── + vol_p = df.iloc[i - 1]["volume"] + vol_avg = df.iloc[i - 1 - LOCAL_VOL_N:i - 1]["volume"].mean() + vol_r = vol_p / vol_avg if vol_avg > 0 else 0 + + close_qh = df.iloc[i - QUIET_N]["close"] + chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 + quiet = chg_qh < QUIET_PCT + spike = vol_r >= vol_mult + + if quiet and spike: + if sig_i is None: + sig_i = i + sig_p = cur + sig_vr = vol_r + else: + if sig_i is not None and cur < sig_p: + sig_i = sig_p = None + + i += 1 + return trades + + +# ── WF 필터 (기존 동일) ────────────────────────────────── +def apply_wf(trades: list) -> tuple: + history = [] + shadow_streak = 0 + blocked = False + accepted = [] + blocked_cnt = 0 + + for trade in trades: + is_win = int(trade[0]) + if not blocked: + accepted.append(trade) + history.append(is_win) + if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: + blocked = True + shadow_streak = 0 + else: + blocked_cnt += 1 + if is_win: + shadow_streak += 1 + if shadow_streak >= WF_SHADOW_WINS: + blocked = False + history = [] + shadow_streak = 0 + else: + shadow_streak = 0 + return accepted, blocked_cnt + + +WF_WINDOW = 4 +WF_MIN_WIN_RATE = 0.01 +WF_SHADOW_WINS = 2 + + +# ── MAX_POSITIONS (기존 동일) ──────────────────────────── +def apply_max_positions(all_trades: list) -> tuple: + open_exits, accepted, skipped = [], [], [] + for trade in all_trades: + buy_dt, sell_dt = trade[2], trade[3] + open_exits = [s for s in open_exits if s > buy_dt] + if len(open_exits) < MAX_POS: + open_exits.append(sell_dt) + accepted.append(trade) + else: + skipped.append(trade) + return accepted, skipped + + +# ── 복리 시뮬 (기존 동일) ─────────────────────────────── +def simulate(accepted: list) -> dict: + portfolio = float(BUDGET) + total_krw = 0.0 + monthly = {} + trade_log = [] + reason_cnt = {} + + for trade in accepted: + is_win, pnl, buy_dt, sell_dt, ticker, reason = trade + pos_size = max(portfolio, MIN_BUDGET) / MAX_POS + krw_profit = pos_size * pnl / 100 + portfolio = max(portfolio + krw_profit, MIN_BUDGET) + total_krw += krw_profit + reason_cnt[reason] = reason_cnt.get(reason, 0) + 1 + + ym = buy_dt.strftime("%Y-%m") + if ym not in monthly: + monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0} + monthly[ym]["trades"] += 1 + monthly[ym]["wins"] += int(is_win) + monthly[ym]["pnl_krw"] += krw_profit + + trade_log.append({ + "buy_dt": buy_dt, "sell_dt": sell_dt, "ticker": ticker, + "is_win": is_win, "pnl_pct": pnl, "reason": reason, + "pos_size": pos_size, "krw_profit": krw_profit, + "portfolio": portfolio, + }) + + wins = sum(1 for t in accepted if t[0]) + return { + "portfolio": portfolio, "total_krw": total_krw, + "roi_pct": (portfolio - BUDGET) / BUDGET * 100, + "total": len(accepted), "wins": wins, + "wr": wins / len(accepted) * 100 if accepted else 0, + "monthly": monthly, "trade_log": trade_log, + "reason_cnt": reason_cnt, + } + + +# ── 메인 ───────────────────────────────────────────────── +def main(): + print("=" * 62) + print("현재 전략 기준 시뮬 (F&G + 레짐 + 티어임계 + 속도진입)") + print("=" * 62) + + print("F&G 히스토리 로드...") + fng_map = load_fng_history() + if fng_map: + vals = sorted(fng_map.items()) + print(f" {vals[0][0]} ~ {vals[-1][0]} ({len(fng_map)}일)") + else: + print(" F&G 데이터 없음 — 필터 비활성화") + + print("캐시 로드...") + cache = pickle.load(open(CACHE_FILE, "rb")) + tickers = [t for t in list(cache["10m"].keys())[:TOP_N] + if len(cache["10m"][t]) > 200] + print(f" 종목: {len(tickers)}개\n") + + dfs_40m = {t: resample_40m(cache["10m"][t]) for t in tickers} + + print("레짐 시리즈 계산...") + regime_series = build_regime_series(dfs_40m) + + sample_df = next(iter(dfs_40m.values())) + start_date = sample_df.index[0].strftime("%Y-%m-%d") + end_date = sample_df.index[-1].strftime("%Y-%m-%d") + print(f" 기간: {start_date} ~ {end_date}\n") + + # F&G 차단 일수 + if fng_map: + period_dates = [d for d in fng_map if start_date <= d <= end_date] + fng_blocked = sum(1 for d in period_dates if fng_map.get(d, 50) < FNG_MIN_ENTRY) + fng_allowed = len(period_dates) - fng_blocked + else: + fng_blocked = fng_allowed = 0 + + all_trades = [] + wf_blocked = 0 + for ticker in tickers: + df40 = dfs_40m[ticker] + raw = run_vol_lead(df40, ticker, fng_map, regime_series) + filtered, blocked = apply_wf(raw) + wf_blocked += blocked + all_trades.extend(filtered) + + all_trades.sort(key=lambda x: x[2]) # buy_dt 기준 정렬 + accepted, skipped = apply_max_positions(all_trades) + result = simulate(accepted) + + # 최대 낙폭 + peak = BUDGET + max_dd = 0.0 + for t in result["trade_log"]: + peak = max(peak, t["portfolio"]) + dd = (peak - t["portfolio"]) / peak * 100 + max_dd = max(max_dd, dd) + + total = result["total"] + wins = result["wins"] + + print(f"{'='*62}") + print(f" 기간: {start_date} ~ {end_date} ({len(tickers)}종목 / 40분봉)") + print(f" F&G 차단: {fng_blocked}일 / 허용: {fng_allowed}일 (기준 FNG≥{FNG_MIN_ENTRY})") + print(f"{'='*62}") + print(f" 신호 발생: {len(all_trades)+wf_blocked:>4}건 (WF 차단: {wf_blocked}건)") + print(f" 실제 진입: {total:>4}건 ({len(skipped)}건 MAX_POS 스킵)") + print(f" 승 / 패: {wins}승 {total-wins}패 (승률 {result['wr']:.1f}%)" + if total else " 진입 없음") + print(f" {'─'*52}") + print(f" 초기 예산: {BUDGET:>15,}원") + print(f" 최종 자산: {result['portfolio']:>15,.0f}원") + print(f" 순수익: {result['total_krw']:>+15,.0f}원") + print(f" 수익률: {result['roi_pct']:>+14.2f}%") + print(f" 최대 낙폭: {-max_dd:>+14.2f}%" + f" ({-max_dd/100*BUDGET:>+,.0f}원)") + monthly_krw = [m["pnl_krw"] for m in result["monthly"].values()] + avg_m = sum(monthly_krw) / len(monthly_krw) if monthly_krw else 0 + print(f" 월평균 수익: {avg_m:>+13,.0f}원") + + print(f"\n── 청산 사유 {'─'*44}") + label_map = {"trailing_stop": "트레일링스탑", "time_stop": "타임스탑", + "end_of_data": "데이터종료"} + for r, cnt in sorted(result["reason_cnt"].items(), key=lambda x: -x[1]): + print(f" {label_map.get(r, r):12}: {cnt:>3}건") + + print(f"\n── 월별 수익 {'─'*44}") + print(f" {'월':^8} │ {'거래':>4} {'승률':>5} │" + f" {'월수익(KRW)':>13} {'누적수익(KRW)':>14}") + cum = 0.0 + for ym, m in sorted(result["monthly"].items()): + wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0 + cum += m["pnl_krw"] + print(f" {ym:^8} │ {m['trades']:>4}건 {wr:>4.0f}% │" + f" {m['pnl_krw']:>+13,.0f}원 {cum:>+14,.0f}원") + + print(f"\n── 파라미터 {'─'*46}") + print(f" F&G≥{FNG_MIN_ENTRY} 레짐BEAR<{BEAR_THRESHOLD}% BULL≥{BULL_THRESHOLD}%") + print(f" VOL: {VOL_MULT_NEUTRAL}x(중립)/{VOL_MULT_BULL}x(강세) 횡보<{QUIET_PCT}%") + print(f" 임계: 5x→1% / 3.5x→2% / 2.5x→3% / 기본→{THRESH_BASE}%") + print(f" 속도: ≥{VELOCITY_THRESHOLD}%/분 (≥{VELOCITY_MIN_MOVE}% / ≥{VELOCITY_MIN_AGE_M}분)") + print(f" ATR: ×{ATR_MULT} ({ATR_MIN*100:.0f}~{ATR_MAX*100:.0f}%) 타임스탑: 8h/{TIME_STOP_MIN_PCT}%") + print(f"{'='*62}") + + +if __name__ == "__main__": + main() diff --git a/tests/sim_regime_1y.py b/tests/sim_regime_1y.py new file mode 100644 index 0000000..b90752a --- /dev/null +++ b/tests/sim_regime_1y.py @@ -0,0 +1,474 @@ +"""레짐 기반 1년 시뮬레이션 — BULL 진입 vs Bear차단 vs 필터없음. + +sim_45m40.py 검증된 코어 로직 기반. +데이터: data/sim1y_cache.pkl (10분봉 1년치) + data/fng_1y.json (F&G 1년치) + +비교 구성: + 1. 필터 없음 — 레짐/F&G 무관 진입 + 2. BEAR 차단 — 레짐 score < -0.5% 이면 차단 (현재 전략) + 3. BULL 진입만 — 레짐 score ≥ 1.5% 일 때만 진입 ← 사용자 제안 + 4. BULL 진입 + F&G≥41 — BULL 조건에 F&G 필터 추가 +""" +import os as _os, sys as _sys +_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) + +import json +import pickle +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") + +# ── 데이터 경로 ─────────────────────────────────────────── +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" +FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" +TOP_N = 20 + +# ── 전략 파라미터 (sim_45m40.py 동일) ──────────────────── +BUDGET = 15_000_000 +MIN_BUDGET = BUDGET * 3 // 10 +MAX_POS = 3 + +FEE = 0.0005 +TIME_STOP_MIN_PCT = 3.0 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 + +VOL_MULT_DEFAULT = 2.0 # 기본 (NEUTRAL / 필터없음) +VOL_MULT_BULL = 1.5 # BULL 레짐 완화 +QUIET_PCT = 2.0 +THRESH = 4.8 # sim_45m40.py 기준값 + +# 40분봉 봉수 환산 +LOCAL_VOL_N = 7 # 5h +QUIET_N = 3 # 2h +SIGNAL_TO_N = 12 # 8h +ATR_N = 7 +TS_N = 12 # 8h (타임스탑) +REGIME_N = 3 # 2h (레짐 추세) + +# 레짐 임계 +BEAR_THRESHOLD = -0.5 +BULL_THRESHOLD = 1.5 + +# 레짐 계산 가중치 +REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, + "KRW-SOL": 0.15, "KRW-XRP": 0.15} + +# WF 파라미터 +WF_WINDOW = 4 +WF_MIN_WIN_RATE = 0.01 +WF_SHADOW_WINS = 2 + + +# ── 리샘플링 ───────────────────────────────────────────── +def resample_40m(df: pd.DataFrame) -> pd.DataFrame: + return ( + df.resample("40min") + .agg({"open": "first", "high": "max", "low": "min", + "close": "last", "volume": "sum"}) + .dropna(subset=["close"]) + ) + + +# ── 레짐 시리즈 ────────────────────────────────────────── +def build_regime_series(dfs40: dict) -> pd.Series: + weighted = None + for ticker, w in REGIME_WEIGHTS.items(): + if ticker not in dfs40: + continue + pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100 + weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) + return weighted if weighted is not None else pd.Series(dtype=float) + + +# ── ATR ────────────────────────────────────────────────── +def calc_atr(df: pd.DataFrame, buy_idx: int) -> float: + sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx] + if len(sub) < 3: + return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +# ── 포지션 시뮬 ────────────────────────────────────────── +def simulate_pos(df: pd.DataFrame, buy_idx: int, + buy_price: float, stop_pct: float): + peak = buy_price + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + ts = df.index[i] + if row["high"] > peak: + peak = row["high"] + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, ts, pnl + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT: + pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, ts, pnl + last = df.iloc[-1]["close"] + pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, df.index[-1], pnl + + +# ── 전략 실행 ───────────────────────────────────────────── +def run_strategy(df: pd.DataFrame, ticker: str, + regime_series: pd.Series, fng_map: dict, + mode: str) -> list: + """ + mode: + 'none' — 레짐/F&G 필터 없음 + 'bear_off' — BEAR 차단만 (score < BEAR_THRESHOLD 시 스킵) + 'bull_only'— BULL 진입만 (score >= BULL_THRESHOLD 일 때만) + 'bull_fng' — BULL + F&G≥41 + """ + trades = [] + sig_i = sig_p = None + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(LOCAL_VOL_N + 2, QUIET_N + 1) + + while i < len(df): + ts = df.index[i] + row = df.iloc[i] + cur = row["close"] + + # ── 포지션 보유 중 ──────────────────────────────── + if in_pos: + is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker)) + in_pos = False + sig_i = sig_p = None + i = next_i + continue + + # ── 레짐 스코어 계산 ───────────────────────────── + score = 0.0 + if not regime_series.empty and ts in regime_series.index: + v = regime_series.loc[ts] + score = float(v) if not pd.isna(v) else 0.0 + + # ── 모드별 진입 필터 ───────────────────────────── + if mode == "bear_off": + if score < BEAR_THRESHOLD: + sig_i = sig_p = None + i += 1 + continue + vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_DEFAULT + + elif mode == "bull_only": + if score < BULL_THRESHOLD: + sig_i = sig_p = None + i += 1 + continue + vol_mult = VOL_MULT_BULL + + elif mode == "bull_fng": + if score < BULL_THRESHOLD: + sig_i = sig_p = None + i += 1 + continue + date_str = ts.strftime("%Y-%m-%d") + fv = fng_map.get(date_str, 50) if fng_map else 50 + if fv < 41: + sig_i = sig_p = None + i += 1 + continue + vol_mult = VOL_MULT_BULL + + else: # 'none' + vol_mult = VOL_MULT_DEFAULT + + # ── 신호 타임아웃 ───────────────────────────────── + if sig_i is not None and (i - sig_i) > SIGNAL_TO_N: + sig_i = sig_p = None + + # ── 신호 있음: 진입 체크 ────────────────────────── + if sig_i is not None: + move_pct = (cur - sig_p) / sig_p * 100 + if cur < sig_p: + sig_i = sig_p = None + elif move_pct >= THRESH: + in_pos = True + buy_idx = i + buy_price = cur + stop_pct = calc_atr(df, i) + sig_i = sig_p = None + i += 1 + continue + + # ── 신호 없음: 축적 조건 체크 ──────────────────── + vol_p = df.iloc[i - 1]["volume"] + vol_avg = df.iloc[i - 1 - LOCAL_VOL_N:i - 1]["volume"].mean() + vol_r = vol_p / vol_avg if vol_avg > 0 else 0 + + close_qh = df.iloc[i - QUIET_N]["close"] + chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 + quiet = chg_qh < QUIET_PCT + spike = vol_r >= vol_mult + + if quiet and spike: + if sig_i is None: + sig_i = i + sig_p = cur + else: + if sig_i is not None and cur < sig_p: + sig_i = sig_p = None + + i += 1 + return trades + + +# ── WF 필터 ────────────────────────────────────────────── +def apply_wf(trades: list) -> tuple: + history = [] + shadow_streak = 0 + blocked = False + accepted = [] + blocked_cnt = 0 + for trade in trades: + is_win = int(trade[0]) + if not blocked: + accepted.append(trade) + history.append(is_win) + if len(history) >= WF_WINDOW: + wr = sum(history[-WF_WINDOW:]) / WF_WINDOW + if wr < WF_MIN_WIN_RATE: + blocked = True + shadow_streak = 0 + else: + blocked_cnt += 1 + if is_win: + shadow_streak += 1 + if shadow_streak >= WF_SHADOW_WINS: + blocked = False + history = [] + shadow_streak = 0 + else: + shadow_streak = 0 + return accepted, blocked_cnt + + +# ── MAX_POSITIONS ──────────────────────────────────────── +def apply_max_positions(all_trades: list) -> tuple: + open_exits, accepted, skipped = [], [], [] + for trade in all_trades: + buy_dt, sell_dt = trade[2], trade[3] + open_exits = [s for s in open_exits if s > buy_dt] + if len(open_exits) < MAX_POS: + open_exits.append(sell_dt) + accepted.append(trade) + else: + skipped.append(trade) + return accepted, skipped + + +# ── 복리 시뮬 ──────────────────────────────────────────── +def run_compound(accepted: list) -> dict: + portfolio = float(BUDGET) + total_krw = 0.0 + monthly = {} + trade_log = [] + + for is_win, pnl, buy_dt, sell_dt, ticker in accepted: + pos_size = max(portfolio, MIN_BUDGET) / MAX_POS + krw_profit = pos_size * pnl / 100 + portfolio = max(portfolio + krw_profit, MIN_BUDGET) + total_krw += krw_profit + + ym = buy_dt.strftime("%Y-%m") + if ym not in monthly: + monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0} + monthly[ym]["trades"] += 1 + monthly[ym]["wins"] += int(is_win) + monthly[ym]["pnl_krw"] += krw_profit + trade_log.append({"buy_dt": buy_dt, "sell_dt": sell_dt, + "ticker": ticker, "is_win": is_win, + "pnl_pct": pnl, "portfolio": portfolio}) + + wins = sum(1 for t in accepted if t[0]) + return { + "portfolio": portfolio, + "total_krw": total_krw, + "roi_pct": (portfolio - BUDGET) / BUDGET * 100, + "total": len(accepted), + "wins": wins, + "wr": wins / len(accepted) * 100 if accepted else 0, + "monthly": monthly, + "trade_log": trade_log, + } + + +# ── 결과 출력 ──────────────────────────────────────────── +def print_result(label: str, result: dict, skipped: int, wf_blocked: int): + r = result + peak = BUDGET + max_dd = 0.0 + for t in r["trade_log"]: + peak = max(peak, t["portfolio"]) + dd = (peak - t["portfolio"]) / peak * 100 + max_dd = max(max_dd, dd) + + monthly_krw = [m["pnl_krw"] for m in r["monthly"].values()] + avg_m = sum(monthly_krw) / len(monthly_krw) if monthly_krw else 0 + + total = r["total"] + wins = r["wins"] + + print(f"\n{'─'*60}") + print(f" [{label}]") + print(f" 진입: {total}건 (WF차단: {wf_blocked} / MAX_POS스킵: {skipped})") + if total: + print(f" 승패: {wins}승 {total-wins}패 (승률 {r['wr']:.1f}%)") + print(f" 초기 예산: {BUDGET:>15,}원") + print(f" 최종 자산: {r['portfolio']:>15,.0f}원") + print(f" 순수익: {r['total_krw']:>+15,.0f}원") + print(f" 수익률: {r['roi_pct']:>+14.2f}%") + print(f" 최대 낙폭: {-max_dd:>+14.2f}%") + print(f" 월평균 수익: {avg_m:>+13,.0f}원") + + +def print_monthly(result: dict): + print(f" {'월':^8} │ {'거래':>4} {'승률':>5} │ {'월수익(KRW)':>13} {'누적수익(KRW)':>14}") + cum = 0.0 + for ym, m in sorted(result["monthly"].items()): + wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0 + cum += m["pnl_krw"] + print(f" {ym:^8} │ {m['trades']:>4}건 {wr:>4.0f}% │" + f" {m['pnl_krw']:>+13,.0f}원 {cum:>+14,.0f}원") + + +# ── 메인 ───────────────────────────────────────────────── +def main(): + # ── 데이터 로드 ─────────────────────────────────────── + if not CACHE_FILE.exists(): + print(f"[오류] 캐시 없음: {CACHE_FILE}") + print(" 먼저 tests/collect_1y_data.py 를 실행하세요.") + return + + print("캐시 로드 중...") + cache = pickle.load(open(CACHE_FILE, "rb")) + all_tickers = list(cache["10m"].keys())[:TOP_N] + tickers = [t for t in all_tickers if len(cache["10m"][t]) > 500] + print(f" 유효 종목: {len(tickers)}개") + + # F&G 로드 + fng_map: dict = {} + if FNG_FILE.exists(): + fng_map = json.load(open(FNG_FILE)) + fng_dates = sorted(fng_map.keys()) + print(f" F&G: {fng_dates[0]} ~ {fng_dates[-1]} ({len(fng_map)}일)") + else: + print(" [경고] F&G 데이터 없음") + + # 리샘플링 + dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers} + + # 레짐 시리즈 + regime_series = build_regime_series(dfs40) + + # 기간 정보 + sample = next(iter(dfs40.values())) + start_dt = sample.index[0].strftime("%Y-%m-%d") + end_dt = sample.index[-1].strftime("%Y-%m-%d") + + # 레짐 분포 계산 + if not regime_series.empty: + valid = regime_series.dropna() + bull_cnt = (valid >= BULL_THRESHOLD).sum() + bear_cnt = (valid < BEAR_THRESHOLD).sum() + neut_cnt = len(valid) - bull_cnt - bear_cnt + total_cnt = len(valid) + print(f"\n 레짐 분포 ({total_cnt}봉 기준):") + print(f" BULL (≥{BULL_THRESHOLD}%) : {bull_cnt:>6}봉 ({bull_cnt/total_cnt*100:.1f}%)") + print(f" NEUTRAL : {neut_cnt:>6}봉 ({neut_cnt/total_cnt*100:.1f}%)") + print(f" BEAR (<{BEAR_THRESHOLD}%) : {bear_cnt:>6}봉 ({bear_cnt/total_cnt*100:.1f}%)") + + # F&G 분포 (해당 기간) + if fng_map: + period_fng = {k: v for k, v in fng_map.items() if start_dt <= k <= end_dt} + zones = {"극공포(≤25)": 0, "공포(26~40)": 0, "중립+(≥41)": 0} + for v in period_fng.values(): + if v <= 25: zones["극공포(≤25)"] += 1 + elif v <= 40: zones["공포(26~40)"] += 1 + else: zones["중립+(≥41)"] += 1 + tot = sum(zones.values()) + print(f"\n F&G 분포 (동 기간 {tot}일):") + for name, cnt in zones.items(): + print(f" {name:12} {cnt:>3}일 ({cnt/tot*100:.1f}%)") + + print(f"\n{'='*60}") + print(f" 레짐 BULL 진입 시뮬 | 1년 | {len(tickers)}종목 | 40분봉") + print(f" 기간: {start_dt} ~ {end_dt}") + print(f"{'='*60}") + + # ── 4가지 시뮬 실행 ─────────────────────────────────── + CONFIGS = [ + ("none", "필터 없음"), + ("bear_off", "BEAR 차단 (현재)"), + ("bull_only","BULL 진입만"), + ("bull_fng", "BULL + F&G≥41"), + ] + + results = {} + for mode, label in CONFIGS: + all_trades = [] + wf_total = 0 + for ticker in tickers: + df40 = dfs40[ticker] + raw = run_strategy(df40, ticker, regime_series, fng_map, mode) + filtered, blocked = apply_wf(raw) + wf_total += blocked + all_trades.extend(filtered) + + all_trades.sort(key=lambda x: x[2]) + accepted, skipped = apply_max_positions(all_trades) + result = run_compound(accepted) + results[label] = result + print_result(label, result, len(skipped), wf_total) + + # ── 요약 비교 ───────────────────────────────────────── + print(f"\n{'='*60}") + print(f" 요약 비교") + print(f"{'='*60}") + print(f" {'구성':<22} {'진입':>5} {'승률':>6} {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}") + print(f" {'─'*58}") + for mode, label in CONFIGS: + r = results[label] + total = r["total"] + if total == 0: + print(f" {label:<22} {'진입없음':>34}") + continue + peak = BUDGET + max_dd = 0.0 + for t in r["trade_log"]: + peak = max(peak, t["portfolio"]) + dd = (peak - t["portfolio"]) / peak * 100 + max_dd = max(max_dd, dd) + print( + f" {label:<22} {total:>5}건 {r['wr']:>5.1f}% " + f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{max_dd:.1f}%" + ) + + # ── 월별 상세 (BULL 진입만) ─────────────────────────── + print(f"\n{'='*60}") + print(f" 월별 상세 — BULL 진입만") + print(f"{'='*60}") + print_monthly(results["BULL 진입만"]) + print(f"\n 월별 상세 — BEAR 차단 (현재)") + print_monthly(results["BEAR 차단 (현재)"]) + print(f"{'='*60}") + + +if __name__ == "__main__": + main() diff --git a/tests/sim_regime_sweep.py b/tests/sim_regime_sweep.py new file mode 100644 index 0000000..4b0fd9f --- /dev/null +++ b/tests/sim_regime_sweep.py @@ -0,0 +1,322 @@ +"""레짐 REGIME_N 스윕 — BULL 진입 기준 봉수 최적화. + +REGIME_N (pct_change 봉수) 를 1~8봉(40분~320분) 으로 변화시키며 +BULL 진입만 / BEAR 차단 / 필터없음 비교. + +데이터: data/sim1y_cache.pkl (10분봉 1년치) +""" +import os as _os, sys as _sys +_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) + +import json +import pickle +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") + +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" +FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" +TOP_N = 20 + +BUDGET = 15_000_000 +MIN_BUDGET = BUDGET * 3 // 10 +MAX_POS = 3 +FEE = 0.0005 +TIME_STOP_MIN_PCT = 3.0 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 +VOL_MULT_DEFAULT = 2.0 +VOL_MULT_BULL = 1.5 +QUIET_PCT = 2.0 +THRESH = 4.8 +LOCAL_VOL_N = 7 +QUIET_N = 3 +SIGNAL_TO_N = 12 +ATR_N = 7 +TS_N = 12 +BEAR_THRESHOLD = -0.5 +BULL_THRESHOLD = 1.5 +WF_WINDOW = 4 +WF_MIN_WIN_RATE = 0.01 +WF_SHADOW_WINS = 2 +REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, + "KRW-SOL": 0.15, "KRW-XRP": 0.15} + + +def resample_40m(df): + return (df.resample("40min") + .agg({"open":"first","high":"max","low":"min", + "close":"last","volume":"sum"}) + .dropna(subset=["close"])) + + +def build_regime_series(dfs40, regime_n): + weighted = None + for ticker, w in REGIME_WEIGHTS.items(): + if ticker not in dfs40: + continue + pct = dfs40[ticker]["close"].pct_change(regime_n) * 100 + weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) + return weighted if weighted is not None else pd.Series(dtype=float) + + +def calc_atr(df, buy_idx): + sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx] + if len(sub) < 3: + return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +def simulate_pos(df, buy_idx, buy_price, stop_pct): + peak = buy_price + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + if row["high"] > peak: + peak = row["high"] + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[i], pnl + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT: + pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[i], pnl + last = df.iloc[-1]["close"] + pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[-1], pnl + + +def run_strategy(df, ticker, regime_series, mode): + trades = [] + sig_i = sig_p = None + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(LOCAL_VOL_N + 2, QUIET_N + 1) + + while i < len(df): + ts = df.index[i] + row = df.iloc[i] + cur = row["close"] + + if in_pos: + is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker)) + in_pos = False; sig_i = sig_p = None; i = next_i + continue + + score = 0.0 + if not regime_series.empty and ts in regime_series.index: + v = regime_series.loc[ts] + score = float(v) if not pd.isna(v) else 0.0 + + if mode == "bear_off": + if score < BEAR_THRESHOLD: + sig_i = sig_p = None; i += 1; continue + vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_DEFAULT + elif mode == "bull_only": + if score < BULL_THRESHOLD: + sig_i = sig_p = None; i += 1; continue + vol_mult = VOL_MULT_BULL + else: + vol_mult = VOL_MULT_DEFAULT + + if sig_i is not None and (i - sig_i) > SIGNAL_TO_N: + sig_i = sig_p = None + + if sig_i is not None: + move_pct = (cur - sig_p) / sig_p * 100 + if cur < sig_p: + sig_i = sig_p = None + elif move_pct >= THRESH: + in_pos = True; buy_idx = i; buy_price = cur + stop_pct = calc_atr(df, i); sig_i = sig_p = None + i += 1; continue + + vol_p = df.iloc[i-1]["volume"] + vol_avg = df.iloc[i-1-LOCAL_VOL_N:i-1]["volume"].mean() + vol_r = vol_p / vol_avg if vol_avg > 0 else 0 + close_qh = df.iloc[i-QUIET_N]["close"] + chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 + if chg_qh < QUIET_PCT and vol_r >= vol_mult: + if sig_i is None: + sig_i = i; sig_p = cur + else: + if sig_i is not None and cur < sig_p: + sig_i = sig_p = None + i += 1 + return trades + + +def apply_wf(trades): + history = []; shadow = 0; blocked = False; accepted = []; cnt = 0 + for t in trades: + is_win = int(t[0]) + if not blocked: + accepted.append(t); history.append(is_win) + if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: + blocked = True; shadow = 0 + else: + cnt += 1 + if is_win: + shadow += 1 + if shadow >= WF_SHADOW_WINS: + blocked = False; history = []; shadow = 0 + else: + shadow = 0 + return accepted, cnt + + +def apply_max_pos(trades): + open_exits = []; accepted = []; skipped = [] + for t in trades: + buy_dt, sell_dt = t[2], t[3] + open_exits = [s for s in open_exits if s > buy_dt] + if len(open_exits) < MAX_POS: + open_exits.append(sell_dt); accepted.append(t) + else: + skipped.append(t) + return accepted, skipped + + +def run_compound(accepted): + portfolio = float(BUDGET); total_krw = 0.0; monthly = {} + for is_win, pnl, buy_dt, sell_dt, ticker in accepted: + pos_size = max(portfolio, MIN_BUDGET) / MAX_POS + krw_profit = pos_size * pnl / 100 + portfolio = max(portfolio + krw_profit, MIN_BUDGET) + total_krw += krw_profit + ym = buy_dt.strftime("%Y-%m") + if ym not in monthly: + monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0} + monthly[ym]["trades"] += 1 + monthly[ym]["wins"] += int(is_win) + monthly[ym]["pnl_krw"] += krw_profit + wins = sum(1 for t in accepted if t[0]) + peak = BUDGET; max_dd = 0.0 + pf = float(BUDGET) + for is_win, pnl, buy_dt, sell_dt, ticker in accepted: + pf = max(pf + max(pf, MIN_BUDGET) / MAX_POS * pnl / 100, MIN_BUDGET) + peak = max(peak, pf); max_dd = max(max_dd, (peak-pf)/peak*100) + return { + "portfolio": portfolio, "total_krw": total_krw, + "roi_pct": (portfolio-BUDGET)/BUDGET*100, + "total": len(accepted), "wins": wins, + "wr": wins/len(accepted)*100 if accepted else 0, + "monthly": monthly, "max_dd": max_dd, + } + + +def sim_one(dfs40, regime_n, mode): + rs = build_regime_series(dfs40, regime_n) + all_trades = []; wf_total = 0 + for ticker, df40 in dfs40.items(): + raw = run_strategy(df40, ticker, rs, mode) + filtered, blocked = apply_wf(raw) + wf_total += blocked + all_trades.extend(filtered) + all_trades.sort(key=lambda x: x[2]) + accepted, skipped = apply_max_pos(all_trades) + result = run_compound(accepted) + # BULL 비율 + if not rs.empty: + valid = rs.dropna() + bull_pct = (valid >= BULL_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0 + bear_pct = (valid < BEAR_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0 + else: + bull_pct = bear_pct = 0 + return result, bull_pct, bear_pct, wf_total, len(skipped) + + +def main(): + print("캐시 로드 중...") + cache = pickle.load(open(CACHE_FILE, "rb")) + tickers = [t for t in list(cache["10m"].keys())[:TOP_N] + if len(cache["10m"][t]) > 500] + print(f" 종목: {len(tickers)}개\n") + dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers} + + sample = next(iter(dfs40.values())) + start_dt = sample.index[0].strftime("%Y-%m-%d") + end_dt = sample.index[-1].strftime("%Y-%m-%d") + + SWEEP_N = [1, 2, 3, 4, 5, 6, 8, 10] # 40분 ~ 400분 (6.7h) + + # ── BULL 진입만 스윕 ────────────────────────────────── + print(f"{'='*72}") + print(f" REGIME_N 스윕 (40분봉 × N봉 변화율 기준 | BULL≥{BULL_THRESHOLD}%)") + print(f" 기간: {start_dt} ~ {end_dt} / {len(tickers)}종목") + print(f"{'='*72}") + print(f" {'N봉':>4} {'시간':>5} │ {'BULL%':>6} {'BEAR%':>6} │ " + f"{'진입':>5} {'승률':>5} │ {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}") + print(f" {'─'*68}") + + bull_results = {} + for n in SWEEP_N: + r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bull_only") + bull_results[n] = r + mins = n * 40 + h = mins // 60 + m = mins % 60 + time_label = f"{h}h{m:02d}m" if m else f"{h}h" + if r["total"] == 0: + print(f" {n:>4}봉 {time_label:>5} │ {bull_pct:>5.1f}% {bear_pct:>5.1f}% │ " + f"{'진입없음':>34}") + else: + print(f" {n:>4}봉 {time_label:>5} │ {bull_pct:>5.1f}% {bear_pct:>5.1f}% │ " + f"{r['total']:>5}건 {r['wr']:>4.1f}% │ " + f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%") + + # ── BEAR 차단 스윕 ──────────────────────────────────── + print(f"\n{'='*72}") + print(f" REGIME_N 스윕 (BEAR 차단 모드 | BEAR<{BEAR_THRESHOLD}%)") + print(f"{'='*72}") + print(f" {'N봉':>4} {'시간':>5} │ {'BULL%':>6} {'BEAR%':>6} │ " + f"{'진입':>5} {'승률':>5} │ {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}") + print(f" {'─'*68}") + + bear_results = {} + for n in SWEEP_N: + r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bear_off") + bear_results[n] = r + mins = n * 40 + h = mins // 60; m = mins % 60 + time_label = f"{h}h{m:02d}m" if m else f"{h}h" + print(f" {n:>4}봉 {time_label:>5} │ {bull_pct:>5.1f}% {bear_pct:>5.1f}% │ " + f"{r['total']:>5}건 {r['wr']:>4.1f}% │ " + f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%") + + # ── 베이스라인 (필터없음) ───────────────────────────── + r_none, _, _, _, _ = sim_one(dfs40, 1, "none") + print(f"\n 베이스라인 (필터없음): {r_none['total']}건 {r_none['wr']:.1f}% " + f"{r_none['roi_pct']:+.2f}% {r_none['total_krw']:+,.0f}원 -{r_none['max_dd']:.1f}%") + + # ── 최적 BULL 구간 ──────────────────────────────────── + valid_bull = {n: r for n, r in bull_results.items() if r["total"] >= 5} + if valid_bull: + best_n = max(valid_bull, key=lambda n: valid_bull[n]["roi_pct"]) + best_r = valid_bull[best_n] + print(f"\n ★ BULL 진입 최적 N: {best_n}봉({best_n*40}분) " + f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}건 " + f"승률 {best_r['wr']:.1f}%") + + valid_bear = {n: r for n, r in bear_results.items() if r["total"] >= 5} + if valid_bear: + best_n = max(valid_bear, key=lambda n: valid_bear[n]["roi_pct"]) + best_r = valid_bear[best_n] + print(f" ★ BEAR 차단 최적 N: {best_n}봉({best_n*40}분) " + f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}건 " + f"승률 {best_r['wr']:.1f}%") + print(f"{'='*72}") + + +if __name__ == "__main__": + main() diff --git a/tests/sim_vol_override.py b/tests/sim_vol_override.py new file mode 100644 index 0000000..7c876c5 --- /dev/null +++ b/tests/sim_vol_override.py @@ -0,0 +1,353 @@ +"""볼륨 강도 기반 레짐+F&G 오버라이드 시뮬 — 1년치. + +우선순위 로직: + 1순위: vol_ratio ≥ VOL_OVERRIDE_THRESH → 레짐/F&G 무관 즉시 진입 허용 + 2순위: F&G < FNG_MIN_ENTRY → 차단 + 3순위: 레짐 BEAR → 차단 + 4순위: 일반 vol-lead 로직 + +비교 구성: + 1. 필터 없음 + 2. F&G≥41 + BEAR차단N5 (현재 전략 레짐 적용) + 3. F&G≥41 + BEAR차단N5 + vol≥5x 오버라이드 (레짐+F&G 동시 오버라이드) + 4. F&G≥41 + BEAR차단N5 + vol≥4x 오버라이드 + 5. F&G≥41 + BEAR차단N5 + vol≥3x 오버라이드 + +데이터: data/sim1y_cache.pkl / data/fng_1y.json +""" +import os as _os, sys as _sys +_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) + +import json +import pickle +from pathlib import Path +import pandas as pd +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") + +CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" +FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" +TOP_N = 20 + +BUDGET = 15_000_000 +MIN_BUDGET = BUDGET * 3 // 10 +MAX_POS = 3 +FEE = 0.0005 +TIME_STOP_MIN_PCT = 3.0 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 +VOL_MULT_DEFAULT = 2.0 +VOL_MULT_BULL = 1.5 +QUIET_PCT = 2.0 +THRESH = 4.8 +LOCAL_VOL_N = 7 +QUIET_N = 3 +SIGNAL_TO_N = 12 +ATR_N = 7 +TS_N = 12 +BEAR_THRESHOLD = -0.5 +BULL_THRESHOLD = 1.5 +REGIME_N = 5 +FNG_MIN_ENTRY = 41 +WF_WINDOW = 4 +WF_MIN_WIN_RATE = 0.01 +WF_SHADOW_WINS = 2 +REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30, + "KRW-SOL": 0.15, "KRW-XRP": 0.15} + + +def resample_40m(df): + return (df.resample("40min") + .agg({"open":"first","high":"max","low":"min", + "close":"last","volume":"sum"}) + .dropna(subset=["close"])) + + +def build_regime_series(dfs40): + weighted = None + for ticker, w in REGIME_WEIGHTS.items(): + if ticker not in dfs40: continue + pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100 + weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0) + return weighted if weighted is not None else pd.Series(dtype=float) + + +def calc_atr(df, buy_idx): + sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx] + if len(sub) < 3: return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +def simulate_pos(df, buy_idx, buy_price, stop_pct): + peak = buy_price + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + if row["high"] > peak: peak = row["high"] + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[i], pnl + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT: + pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[i], pnl + last = df.iloc[-1]["close"] + pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100 + return pnl > 0, df.index[-1], pnl + + +def run_strategy(df, ticker, regime_series, fng_map, + use_fng, use_regime, vol_override_thresh): + """ + 우선순위 구조: + ① 포지션 청산 체크 + ② 볼륨 스파이크 감지 → 신호 기록 (F&G/레짐 무관, 항상 실행) + ③ 진입 시점에서: + vol_strong(sig_vr≥thresh) → F&G+레짐 필터 전부 건너뜀 + 아니면 → F&G≥41 AND 레짐 BEAR 아닐 때만 진입 허용 + """ + trades = [] + sig_i = sig_p = sig_vr = None + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(LOCAL_VOL_N + 2, QUIET_N + 1) + + while i < len(df): + ts = df.index[i] + row = df.iloc[i] + cur = row["close"] + + # ── ① 포지션 청산 ──────────────────────────────── + if in_pos: + is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker)) + in_pos = False; sig_i = sig_p = sig_vr = None; i = next_i + continue + + # 신호 타임아웃 + if sig_i is not None and (i - sig_i) > SIGNAL_TO_N: + sig_i = sig_p = sig_vr = None + + # ── ② 신호 없을 때: 축적 감지 (필터 무관, 항상) ── + # F&G=14 극공포여도 vol 스파이크면 신호 기록 → ③에서 override 결정 + if sig_i is None: + vol_p = df.iloc[i-1]["volume"] + vol_avg = df.iloc[i-1-LOCAL_VOL_N:i-1]["volume"].mean() + vol_r = vol_p / vol_avg if vol_avg > 0 else 0 + close_qh = df.iloc[i-QUIET_N]["close"] + chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999 + if chg_qh < QUIET_PCT and vol_r >= VOL_MULT_DEFAULT: + sig_i = i; sig_p = cur; sig_vr = vol_r + i += 1 + continue + + # 신호가 이하 하락 → 초기화 + if cur < sig_p: + sig_i = sig_p = sig_vr = None + i += 1 + continue + + # ── ③ 진입 체크 — vol_strong이면 필터 전부 스킵 ── + vol_strong = (vol_override_thresh > 0 + and sig_vr is not None + and sig_vr >= vol_override_thresh) + + if not vol_strong: + # F&G 필터 (신호 유지, 진입만 보류) + if use_fng and fng_map: + fv = fng_map.get(ts.strftime("%Y-%m-%d"), 50) + if fv < FNG_MIN_ENTRY: + i += 1; continue + + # 레짐 필터 (신호 유지, 진입만 보류) + if use_regime and not regime_series.empty and ts in regime_series.index: + v = regime_series.loc[ts] + score = float(v) if not pd.isna(v) else 0.0 + if score < BEAR_THRESHOLD: + i += 1; continue + + move_pct = (cur - sig_p) / sig_p * 100 + if move_pct >= THRESH: + in_pos = True; buy_idx = i; buy_price = cur + stop_pct = calc_atr(df, i); sig_i = sig_p = sig_vr = None + + i += 1 + return trades + + +def apply_wf(trades): + history = []; shadow = 0; blocked = False; accepted = []; cnt = 0 + for t in trades: + is_win = int(t[0]) + if not blocked: + accepted.append(t); history.append(is_win) + if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: + blocked = True; shadow = 0 + else: + cnt += 1 + if is_win: + shadow += 1 + if shadow >= WF_SHADOW_WINS: blocked = False; history = []; shadow = 0 + else: + shadow = 0 + return accepted, cnt + + +def apply_max_pos(trades): + open_exits = []; accepted = []; skipped = [] + for t in trades: + buy_dt, sell_dt = t[2], t[3] + open_exits = [s for s in open_exits if s > buy_dt] + if len(open_exits) < MAX_POS: + open_exits.append(sell_dt); accepted.append(t) + else: + skipped.append(t) + return accepted, skipped + + +def run_compound(accepted): + portfolio = float(BUDGET); total_krw = 0.0; monthly = {}; trade_log = [] + for is_win, pnl, buy_dt, sell_dt, ticker in accepted: + pos_size = max(portfolio, MIN_BUDGET) / MAX_POS + krw_profit = pos_size * pnl / 100 + portfolio = max(portfolio + krw_profit, MIN_BUDGET) + total_krw += krw_profit + ym = buy_dt.strftime("%Y-%m") + if ym not in monthly: + monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0} + monthly[ym]["trades"] += 1; monthly[ym]["wins"] += int(is_win) + monthly[ym]["pnl_krw"] += krw_profit + trade_log.append({"portfolio": portfolio}) + + wins = sum(1 for t in accepted if t[0]) + peak = BUDGET; max_dd = 0.0 + for t in trade_log: + peak = max(peak, t["portfolio"]) + max_dd = max(max_dd, (peak - t["portfolio"]) / peak * 100) + return {"portfolio": portfolio, "total_krw": total_krw, + "roi_pct": (portfolio-BUDGET)/BUDGET*100, + "total": len(accepted), "wins": wins, + "wr": wins/len(accepted)*100 if accepted else 0, + "monthly": monthly, "max_dd": max_dd} + + +def sim_one(dfs40, regime_series, fng_map, use_fng, use_regime, vol_override): + all_trades = []; wf_total = 0 + for ticker, df40 in dfs40.items(): + raw = run_strategy(df40, ticker, regime_series, fng_map, + use_fng, use_regime, vol_override) + filtered, blocked = apply_wf(raw) + wf_total += blocked; all_trades.extend(filtered) + all_trades.sort(key=lambda x: x[2]) + accepted, skipped = apply_max_pos(all_trades) + return run_compound(accepted), wf_total, len(skipped) + + +def print_monthly(result, label): + print(f"\n ── 월별 상세: {label}") + print(f" {'월':^8} │ {'거래':>4} {'승률':>5} │ {'월수익(KRW)':>13} {'누적(KRW)':>14}") + cum = 0.0 + for ym, m in sorted(result["monthly"].items()): + wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0 + cum += m["pnl_krw"] + flag = " ✓" if m["pnl_krw"] > 0 else "" + print(f" {ym:^8} │ {m['trades']:>4}건 {wr:>4.0f}% │" + f" {m['pnl_krw']:>+13,.0f}원 {cum:>+13,.0f}원{flag}") + + +def main(): + print("캐시 로드 중...") + cache = pickle.load(open(CACHE_FILE, "rb")) + tickers = [t for t in list(cache["10m"].keys())[:TOP_N] + if len(cache["10m"][t]) > 500] + print(f" 종목: {len(tickers)}개") + + fng_map = {} + if FNG_FILE.exists(): + fng_map = json.load(open(FNG_FILE)) + dates = sorted(fng_map.keys()) + print(f" F&G: {dates[0]} ~ {dates[-1]} ({len(fng_map)}일)") + + dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers} + regime_series = build_regime_series(dfs40) + + sample = next(iter(dfs40.values())) + start_dt = sample.index[0].strftime("%Y-%m-%d") + end_dt = sample.index[-1].strftime("%Y-%m-%d") + + # 필터 적용 일수 통계 + if fng_map: + period_fng = {k: v for k, v in fng_map.items() if start_dt <= k <= end_dt} + fng_blocked = sum(1 for v in period_fng.values() if v < FNG_MIN_ENTRY) + fng_allowed = len(period_fng) - fng_blocked + print(f" F&G 차단: {fng_blocked}일 / 허용: {fng_allowed}일 (기준 ≥{FNG_MIN_ENTRY})") + valid = regime_series.dropna() + bear_pct = (valid < BEAR_THRESHOLD).sum() / len(valid) * 100 + print(f" 레짐 BEAR: {bear_pct:.1f}%봉 (REGIME_N={REGIME_N}봉={REGIME_N*40}분)\n") + + # ── 시뮬 구성 ───────────────────────────────────────── + CONFIGS = [ + # (use_fng, use_regime, vol_override, label) + (False, False, 0, "① 필터 없음"), + (True, True, 0, f"② F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), + (True, True, 5.0, f"③ [1순위:vol≥5x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), + (True, True, 4.0, f"④ [1순위:vol≥4x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), + (True, True, 3.0, f"⑤ [1순위:vol≥3x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"), + ] + + print(f"{'='*72}") + print(f" vol 오버라이드 (레짐+F&G 동시) 시뮬 | 1년 | {len(tickers)}종목") + print(f" 기간: {start_dt} ~ {end_dt}") + print(f" 우선순위: vol≥Nx(오버라이드) > F&G필터 > 레짐필터 > vol-lead 로직") + print(f"{'='*72}") + print(f" {'구성':<48} {'진입':>5} {'승률':>5} {'수익률':>8} {'순수익':>12} {'낙폭':>6}") + print(f" {'─'*70}") + + results = {} + for use_fng, use_regime, vol_ov, label in CONFIGS: + r, wf_b, skip = sim_one(dfs40, regime_series, fng_map, + use_fng, use_regime, vol_ov) + results[label] = r + n = r["total"] + print(f" {label:<48} {n:>5}건 {r['wr']:>4.1f}%" + f" {r['roi_pct']:>+7.2f}% {r['total_krw']:>+11,.0f}원 -{r['max_dd']:.1f}%") + + # ── 월별 상세 ───────────────────────────────────────── + print(f"\n{'='*72}") + for use_fng, use_regime, vol_ov, label in CONFIGS: + if label in results: + print_monthly(results[label], label) + + # ── 비교 요약 ───────────────────────────────────────── + print(f"\n{'='*72}") + base_label = f"② F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}" + base_r = results.get(base_label) + if base_r: + print(f" 오버라이드 효과 (vs {base_label}):") + for _, _, vol_ov, label in CONFIGS[2:]: + r = results.get(label) + if r and r["total"] > 0: + d_roi = r["roi_pct"] - base_r["roi_pct"] + d_n = r["total"] - base_r["total"] + d_wr = r["wr"] - base_r["wr"] + d_dd = r["max_dd"] - base_r["max_dd"] + print(f" vol≥{vol_ov:.0f}x: 수익률 {d_roi:>+.2f}%p " + f"진입 {d_n:>+d}건 승률 {d_wr:>+.1f}%p 낙폭 {d_dd:>+.1f}%p") + + best_label = max(results, key=lambda k: results[k]["roi_pct"]) + best = results[best_label] + print(f"\n ★ 최고 수익률: {best_label}") + print(f" 수익률 {best['roi_pct']:+.2f}% / 순수익 {best['total_krw']:+,.0f}원 " + f"/ 낙폭 -{best['max_dd']:.1f}%") + print(f"{'='*72}") + + +if __name__ == "__main__": + main()