diff --git a/STRATEGY.md b/STRATEGY.md index 4218081..c00c422 100644 --- a/STRATEGY.md +++ b/STRATEGY.md @@ -15,7 +15,7 @@ ## 진입 조건 (2단계) ### 1단계: 매집 신호 감지 -다음 두 조건 동시 충족 시 `signal_price` 기록: +다음 두 조건 동시 충족 시 `signal_price` + `vol_ratio` 기록: | 조건 | 파라미터 | 기본값 | |------|----------|--------| @@ -29,12 +29,40 @@ > **2h 횡보 체크**: Oracle DB에 저장된 실시간 가격 기록을 조회 (`get_price_n_hours_ago`) > **거래량 체크**: `minute10` → 40분봉 resample → 직전 완성봉 vs 이전 7봉 평균 -### 2단계: 추세 확인 후 진입 -`signal_price` 대비 +`TREND_AFTER_VOL`% 이상 상승 확인 시 매수: +### 2단계: 추세 확인 후 진입 (거리 기반 OR 속도 기반 — 먼저 충족되는 조건으로 진입) + +**A. 거리 기반**: `signal_price` 대비 +임계값% 이상 상승 시 매수 (vol_ratio에 따라 동적 조정) + +| vol_ratio | 진입 임계값 | 설명 | +|-----------|------------|------| +| ≥ 5.0x | +1.0% | 매우 강한 신호 | +| ≥ 3.5x | +2.0% | 강한 신호 | +| ≥ 2.5x | +3.0% | 중간 신호 | +| < 2.5x | +`TREND_AFTER_VOL`% | 기본 임계값 | + +**B. 속도 기반 (조기 진입)**: 신호 후 가격 상승 속도가 `VELOCITY_THRESHOLD` 이상이면 즉시 진입 | 파라미터 | 기본값 | 설명 | |----------|--------|------| -| `TREND_AFTER_VOL` | 4.8% | 신호가 대비 진입 임계값 | +| `TREND_AFTER_VOL` | 4.8% | 거리 기반 기본 임계값 | +| `VELOCITY_THRESHOLD` | 0.10%/분 | 속도 기준 (6%/h — 가파른 상승 감지) | +| `VELOCITY_MIN_MOVE` | 0.5% | 속도 체크 전 최소 이동 % | +| `VELOCITY_MIN_AGE_M` | 5분 | 속도 체크 전 최소 경과 시간 | + +> **예시 (BTC 23:20 신호)**: 23:30에 +1.26%/6분 = 0.21%/분 → 속도 기준 충족 → 조기 진입 +> 실제 진입(00:34, 100,840,000) 대비 약 1시간 빠른 23:30(97,286,000) 진입 가능 + +### 신호 감시 스레드 (Fast Poll) + +신호 감지 후 전체 스캔 60초를 기다리지 않고 해당 종목만 빠르게 폴링. + +| 파라미터 | 기본값 | 설명 | +|----------|--------|------| +| `SCAN_INTERVAL` | 60초 | 전체 시장 스캔 주기 | +| `SIGNAL_POLL_INTERVAL` | 15초 | 신호 종목 집중 감시 주기 | + +- 신호 발생 시 별도 스레드(`signal-fast-poll`)가 15초마다 해당 종목만 체크 +- 목표 임계값(거리 or 속도) 도달 즉시 매수 → 60초 지연 제거 --- @@ -62,7 +90,7 @@ | `WF_MIN_WIN_RATE` | 0.01 | 최소 승률 임계값 (1%) | | `WF_SHADOW_WINS` | 2 | 차단 해제 조건 (가상 N연승) | -- 직전 2건 모두 손실 → 해당 종목 진입 차단 +- 직전 4건 승률 < 1% → 해당 종목 진입 차단 - 차단 후 가상 추적으로 2연승 달성 시 자동 복귀 - **WF 차단 상태는 Oracle DB(`wf_state` 테이블)에 영속 저장** → 재시작 후에도 복원 @@ -76,14 +104,27 @@ ## 시장 레짐 적응 -| 레짐 | BTC 1h 변동 | 거래량 기준 | -|------|------------|------------| -| BULL | +5% 이상 | 1.5x | -| NEUTRAL | ±5% 이내 | 2.0x | -| BEAR | -5% 이하 | 진입 차단 | +BTC·ETH·SOL·XRP 가중평균 **2h 추세 score**로 레짐 결정. -- BEAR 레짐 감지 시 신규 진입 전면 차단 -- 레짐별 `vol_mult` 조정으로 민감도 제어 +| 종목 | 가중치 | +|------|--------| +| KRW-BTC | 40% | +| KRW-ETH | 30% | +| KRW-SOL | 15% | +| KRW-XRP | 15% | + +| 레짐 | score 기준 | vol_mult | 신규 진입 | +|------|-----------|----------|---------| +| BULL | ≥ +1.5% | 1.5x | 허용 | +| NEUTRAL | -0.5% ~ +1.5% | 2.0x | 허용 | +| BEAR | < -0.5% | 3.5x | **전면 차단** | + +- BEAR 레짐 감지 시 신규 매수 전면 차단 (기존 포지션 청산은 정상 진행) +- 레짐 캐시 TTL: 10분 (API 호출 최소화) +- 현재가는 매 레짐 계산 시 Oracle DB(`price_history`)에 저장 → 2h 전 가격 조회에 재활용 + +> **2026-03-03 조정**: BEAR 기준 -1.0% → **-0.5%**로 강화 +> 완만한 하락장(score ≈ -0.4%)에서 NEUTRAL로 오판하던 문제 수정 --- @@ -96,6 +137,11 @@ TREND_AFTER_VOL=4.8 # 진입 임계값 (신호가 대비 %) SIGNAL_TIMEOUT_H=8.0 # 신호 유효 시간 (h) VOLUME_MULTIPLIER=2.0 # 거래량 배수 기준 +# 속도 기반 조기 진입 +VELOCITY_THRESHOLD=0.10 # %/분 (0.10 = 6%/h) +VELOCITY_MIN_MOVE=0.5 # 최소 이동 % (잡음 제거) +VELOCITY_MIN_AGE_M=5.0 # 최소 경과 시간 (분) + # 청산 TIME_STOP_HOURS=8 # 타임스탑 보유 시간 TIME_STOP_MIN_GAIN_PCT=3 # 타임스탑 최소 수익률 @@ -104,8 +150,11 @@ TIME_STOP_MIN_GAIN_PCT=3 # 타임스탑 최소 수익률 MAX_BUDGET=15000000 # 초기 운용 예산 MAX_POSITIONS=3 # 최대 동시 보유 종목 +# 감시 주기 +SIGNAL_POLL_INTERVAL=15 # 신호 종목 빠른 감시 (초) + # WF 필터 -WF_WINDOW=2 +WF_WINDOW=4 WF_MIN_WIN_RATE=0.01 WF_SHADOW_WINS=2 ``` @@ -167,22 +216,35 @@ WF_SHADOW_WINS=2 | 2.5% | 50.8% | +256% | -5.3% | 1.77% | | 4.0% | 45.9% | -52% | -29.1% | 3.11% | +### D. 속도 진입 효과 비교 — 10분봉 (`velocity_backtest.py`) +> 기간: 2026-01-19 ~ 2026-03-02 / 10분봉 캐시 / 20종목 + +| 설정 | 속도진입 | 승률 | 수익률 | 최대낙폭 | +|------|---------|------|--------|---------| +| A: 거리 기반만 | 0건 | 34.7% | +8.83% | -8.35% | +| B: +속도(0.10) | 89건 | 33.6% | +13.36% | **-4.50%** | +| B: +속도(0.15) | 39건 | 35.7% | +17.19% | -7.84% | + +- **0.10 채택**: 낙폭 -8.35% → -4.50% 개선, 수익률 +4.5%p +- 속도진입 BTC 예시: 0.21%/분 → 0.10 기준 충족 (0.20도 아슬하게 충족) + --- ## 주요 파일 | 파일 | 역할 | |------|------| -| `core/strategy.py` | 진입 신호 로직 (40분봉 vol-lead) | +| `core/strategy.py` | 진입 신호 로직 (40분봉 vol-lead + 속도 기반 조기 진입) | | `core/monitor.py` | ATR 트레일링 스탑 + 타임스탑 (40분봉 ATR) | | `core/trader.py` | 주문 실행 + 복리 예산 관리 | -| `core/market_regime.py` | 시장 레짐 감지 | +| `core/market_regime.py` | 시장 레짐 감지 (BTC/ETH/SOL/XRP 가중 2h 추세) | | `core/price_db.py` | 가격 DB + WF 상태 영속화 | +| `daemon/runner.py` | 전체 스캔 루프 + 신호 종목 fast-poll 스레드 | | `ohlcv_db.py` | OHLCV 시계열 DB 캐시 관리 | | `sim_365.py` | 365일 복리 시뮬레이션 (1h봉, DB) | | `sim_45m40.py` | 45일 복리 시뮬레이션 (40분봉, 캐시) | +| `velocity_backtest.py` | 속도 진입 효과 비교 백테스트 (A vs B vs C) | | `atr_sweep.py` | ATR_MAX_STOP 파라미터 스윕 | -| `sim10m.py` | 10분봉 vs 1h봉 전략 비교 시뮬 | | `interval_sweep.py` | 봉 단위별 성과 비교 (10/20/30/40/50/60분) | --- @@ -196,6 +258,9 @@ python sim_45m40.py # 365일 복리 시뮬 — 1h봉 (DB에서 로드) python sim_365.py +# 속도 진입 효과 비교 +python velocity_backtest.py + # 봉 단위별 비교 (10m 캐시 필요) python interval_sweep.py @@ -208,3 +273,15 @@ python ohlcv_db.py status # 신규 봉 증분 업데이트 python ohlcv_db.py update ``` + +--- + +## 변경 이력 + +| 날짜 | 변경 내용 | +|------|---------| +| 2026-03-03 | BEAR_THRESHOLD -1.0% → **-0.5%** 강화 (완만한 하락장 오판 수정) | +| 2026-03-03 | 속도 기반 조기 진입 추가 (`VELOCITY_THRESHOLD=0.10%/분`) | +| 2026-03-03 | 신호 종목 fast-poll 스레드 추가 (`SIGNAL_POLL_INTERVAL=15s`) | +| 2026-03-03 | `sell_reason` 컬럼 VARCHAR2(100→500) 자동 확장 | +| 2026-03-03 | vol_ratio 강도별 진입 임계값 티어 추가 (5x→1%, 3.5x→2%, 2.5x→3%) | diff --git a/core/market_regime.py b/core/market_regime.py index 0185c52..b4c25cf 100644 --- a/core/market_regime.py +++ b/core/market_regime.py @@ -26,7 +26,7 @@ LEADERS: dict[str, float] = { TREND_HOURS = 2 # 2h 추세 기준 BULL_THRESHOLD = 1.5 # score ≥ 1.5% → Bull -BEAR_THRESHOLD = -1.0 # score < -1.0% → Bear +BEAR_THRESHOLD = -0.5 # score < -0.5% → Bear # 레짐별 매수 조건 파라미터 REGIME_PARAMS: dict[str, dict] = { diff --git a/core/price_db.py b/core/price_db.py index 07bd3ca..23604d9 100644 --- a/core/price_db.py +++ b/core/price_db.py @@ -170,6 +170,13 @@ def ensure_trade_results_table() -> None: except oracledb.DatabaseError as e: if e.args[0].code not in (955, 1408): raise + # sell_reason 컬럼이 100 BYTE 이하이면 500으로 확장 + try: + conn.cursor().execute( + "ALTER TABLE trade_results MODIFY sell_reason VARCHAR2(500)" + ) + except oracledb.DatabaseError: + pass # 이미 500 이상이거나 컬럼 없으면 무시 def record_trade( diff --git a/core/strategy.py b/core/strategy.py index 9a7240a..00867c7 100644 --- a/core/strategy.py +++ b/core/strategy.py @@ -3,8 +3,13 @@ 흐름: 1. 직전 40분봉 거래량 > 로컬 5h(7봉) 평균 × VOL_MULT AND 2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적) - → 신호가(signal_price) 기록 - 2. signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입 + → 신호가(signal_price) + 거래량비율(vol_ratio) 기록 + 2. signal_price 대비 +임계값% 이상 상승 시 진입 + 임계값은 vol_ratio 강도에 따라 자동 조정 (강한 신호 → 낮은 임계값 → 조기 진입) + - vol_ratio ≥ 5.0x → +1.0% + - vol_ratio ≥ 3.5x → +2.0% + - vol_ratio ≥ 2.5x → +3.0% + - 기본 → +TREND_AFTER_VOL% 3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화 캔들: minute10 데이터를 40분봉으로 리샘플링하여 사용 @@ -38,6 +43,29 @@ VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) _CANDLE_MIN = 40 _FETCH_10M = (LOCAL_VOL_CANDLES + 3) * (_CANDLE_MIN // 10) # 40 개의 10분봉 +# 신호 강도별 진입 임계값 단계 (vol_ratio 최소값, 진입 임계값%) +# 강한 신호일수록 낮은 임계값으로 조기 진입 +_ENTRY_TIERS: list[tuple[float, float]] = [ + (5.0, 1.0), # 매우 강한 신호 (≥5x) → +1.0% 즉시 진입 + (3.5, 2.0), # 강한 신호 (≥3.5x) → +2.0% 조기 진입 + (2.5, 3.0), # 중간 신호 (≥2.5x) → +3.0% 반조기 진입 +] +# 위 조건 미충족 시 TREND_AFTER_VOL 사용 + +# 속도(velocity) 기반 조기 진입 파라미터 +# 신호 후 가격이 빠르게 상승 중이면 거리 임계값 도달 전에 선진입 +VELOCITY_THRESHOLD = float(os.getenv("VELOCITY_THRESHOLD", "0.10")) # %/분 (0.10 = 6%/h) +VELOCITY_MIN_MOVE = float(os.getenv("VELOCITY_MIN_MOVE", "0.5")) # 최소 이동 % (잡음 제거) +VELOCITY_MIN_AGE_M = float(os.getenv("VELOCITY_MIN_AGE_M", "5.0")) # 최소 경과 시간(분) + + +def _calc_entry_threshold(vol_ratio: float) -> float: + """거래량 비율에 따른 진입 임계값 반환. 강한 신호일수록 낮은 값.""" + for min_ratio, threshold in _ENTRY_TIERS: + if vol_ratio >= min_ratio: + return threshold + return TREND_AFTER_VOL + def _resample_40m(df): """minute10 DataFrame → 40분봉으로 리샘플링.""" @@ -47,10 +75,19 @@ def _resample_40m(df): .dropna(subset=["close"]) ) -# 축적 신호 상태: ticker → {"price": float, "time": float(unix)} +# 축적 신호 상태: ticker → {"price": float, "time": float(unix), "vol_ratio": float} _accum_signals: dict[str, dict] = {} +def get_active_signals() -> dict[str, dict]: + """현재 활성화된 신호 딕셔너리 반환 (fast-poll 루프용). + + Returns: + {ticker: {"price": float, "time": float, "vol_ratio": float}} + """ + return dict(_accum_signals) + + def _check_vol_spike(ticker: str, vol_mult: float) -> bool: """직전 완성 40분봉 거래량이 로컬 5h(7봉) 평균의 vol_mult 배 이상인지 확인.""" try: @@ -126,27 +163,33 @@ def should_buy(ticker: str) -> bool: if not _check_vol_spike(ticker, vol_mult): return False - # 축적 신호 기록 - _accum_signals[ticker] = {"price": current, "time": now} - logger.info( - f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}원" - ) - # 거래량 비율 계산 후 알림 전송 + # 거래량 비율 계산 후 신호 기록 + ratio = 0.0 try: df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M) df_h = _resample_40m(df10) if df10 is not None else None if df_h is not None and len(df_h) >= LOCAL_VOL_CANDLES + 1: recent_vol = df_h["volume"].iloc[-2] local_avg = df_h["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() - ratio = recent_vol / local_avg if local_avg > 0 else 0 - notify_signal(ticker, current, ratio) + ratio = recent_vol / local_avg if local_avg > 0 else 0.0 except Exception: - notify_signal(ticker, current, 0.0) + pass + + entry_thr = _calc_entry_threshold(ratio) + _accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": ratio} + logger.info( + f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}원 " + f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}%)" + ) + notify_signal(ticker, current, ratio) return False # 신호 첫 발생 시는 진입 안 함 # ── 신호 있음: 상승 확인 → 진입 ───────────────────────── signal_price = sig["price"] + vol_ratio = sig.get("vol_ratio", 0.0) + entry_thr = _calc_entry_threshold(vol_ratio) move_pct = (current - signal_price) / signal_price * 100 + age_min = (now - sig["time"]) / 60 if current < signal_price: # 신호가 이하 하락 → 축적 실패 @@ -156,16 +199,34 @@ def should_buy(ticker: str) -> bool: ) return False - if move_pct >= TREND_AFTER_VOL: + # ── 거리 기반 진입 ───────────────────────────────────── + if move_pct >= entry_thr: del _accum_signals[ticker] logger.info( f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}원 " - f"(+{move_pct:.1f}% ≥ {TREND_AFTER_VOL}%)" + f"(+{move_pct:.1f}% ≥ {entry_thr:.1f}% | 거래량={vol_ratio:.2f}x)" ) return True + # ── 속도 기반 조기 진입 ──────────────────────────────── + # 신호 후 N분 이상 경과 + 최소 이동 + 분당 상승률 기준 충족 시 선진입 + if age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE: + velocity = move_pct / age_min # %/분 + if velocity >= VELOCITY_THRESHOLD: + del _accum_signals[ticker] + logger.info( + f"[속도진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}원 " + f"(+{move_pct:.1f}% | {velocity:.3f}%/분 ≥ {VELOCITY_THRESHOLD}%/분 | " + f"경과={age_min:.1f}분 | 거래량={vol_ratio:.2f}x)" + ) + return True + logger.debug( f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} " - f"(+{move_pct:.1f}% / 목표={TREND_AFTER_VOL}%)" + f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}% | " + f"속도={move_pct/age_min:.3f}%/분 | 경과={age_min:.1f}분)" + if age_min > 0 else + f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} " + f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}%)" ) return False diff --git a/daemon/runner.py b/daemon/runner.py index 5119a0d..e963dce 100644 --- a/daemon/runner.py +++ b/daemon/runner.py @@ -1,21 +1,60 @@ -"""매수 기회 스캔 루프 - 60초마다 전체 시장 스캔.""" +"""매수 기회 스캔 루프 - 60초마다 전체 시장 스캔 + 신호 종목 빠른 폴링.""" import logging +import os +import threading import time from core import trader from core.market import get_top_tickers from core.market_regime import get_regime -from core.strategy import should_buy +from core.strategy import get_active_signals, should_buy logger = logging.getLogger(__name__) -SCAN_INTERVAL = 60 # 초 +SCAN_INTERVAL = 60 # 전체 시장 스캔 주기 (초) +SIGNAL_POLL_INTERVAL = int(os.getenv("SIGNAL_POLL_INTERVAL", "15")) # 신호 종목 빠른 감시 주기 (초) + + +def _fast_poll_loop() -> None: + """활성 신호 종목을 SIGNAL_POLL_INTERVAL 초마다 빠르게 체크. + + 신호 감지 후 전체 스캔 60초를 기다리지 않고, 신호 종목만 빠르게 감시하여 + 목표 임계값 도달 시 즉시 매수한다. + """ + logger.info(f"신호 감시 시작 (주기={SIGNAL_POLL_INTERVAL}초)") + while True: + try: + signals = get_active_signals() + if signals: + regime = get_regime() + if regime["name"] != "bear": + positions = trader.get_positions() + for ticker in list(signals): + if ticker in positions: + continue + if len(positions) >= trader.MAX_POSITIONS: + break + try: + if should_buy(ticker): + logger.info(f"[빠른감시] 매수 신호: {ticker}") + trader.buy(ticker) + time.sleep(0.1) + except Exception as e: + logger.error(f"[빠른감시] 오류 {ticker}: {e}") + except Exception as e: + logger.error(f"신호 감시 루프 오류: {e}") + + time.sleep(SIGNAL_POLL_INTERVAL) def run_scanner() -> None: """메인 스캔 루프.""" - logger.info(f"스캐너 시작 (주기={SCAN_INTERVAL}초)") + # 신호 종목 빠른 감시 스레드 시작 + t = threading.Thread(target=_fast_poll_loop, daemon=True, name="signal-fast-poll") + t.start() + + logger.info(f"스캐너 시작 (스캔={SCAN_INTERVAL}초 | 신호감시={SIGNAL_POLL_INTERVAL}초)") while True: try: # 포지션 꽉 찼으면 스캔 스킵 diff --git a/sim10m.py b/sim10m.py new file mode 100644 index 0000000..a8e5afc --- /dev/null +++ b/sim10m.py @@ -0,0 +1,308 @@ +"""sim10m.py — 10분봉 + 극단적 거래량 즉시 진입 전략 시뮬. + +기존 전략 (1h봉 vol-lead) vs 신규 전략 (10분봉 + 100x 이상 거래량 즉시 진입) 비교. +데이터: 최근 45일 Upbit API +""" + +import pickle +import time +import sys +from pathlib import Path +from datetime import datetime, timedelta + +import pandas as pd +import pyupbit +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent / ".env") +sys.path.insert(0, str(Path(__file__).parent)) + +# ── 파라미터 ────────────────────────────────────────────── +SIM_DAYS = 45 +TOP30_FILE = Path("top30_tickers.pkl") +CACHE_FILE = Path("sim10m_cache.pkl") +TOP_N = 20 + +FEE = 0.0005 +TIME_STOP_MIN_PCT = 3.0 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 + +# A: 기존 1h봉 +A_LOCAL_VOL = 5 # 봉수 (= 5h) +A_VOL_MULT = 2.0 +A_QUIET_PCT = 2.0 +A_THRESH = 4.8 +A_SIGNAL_TO = 8 # 신호 유효 봉수 (= 8h) +A_ATR_CANDLES = 5 +A_TIME_STOP = 8 # 타임스탑 봉수 (= 8h) + +# B: 신규 10분봉 +B_LOCAL_VOL = 30 # 봉수 (5h = 30 × 10min) +B_VOL_MULT = 2.0 +B_EXTREME_VOL = 100 # 이 이상 → 횡보 조건 면제, 다음 봉 즉시 진입 +B_QUIET_CANDLES = 12 # 2h = 12봉 +B_QUIET_PCT = 2.0 +B_THRESH = 4.8 +B_SIGNAL_TO = 48 # 신호 유효 봉수 (8h = 48봉) +B_ATR_CANDLES = 30 # ATR 봉수 (5h = 30봉) +B_TIME_STOP = 48 # 타임스탑 봉수 (8h = 48봉) + + +# ── 데이터 로드 ────────────────────────────────────────── +def fetch_ohlcv(ticker, interval, days): + """OHLCV 과거 데이터 페이징 로드.""" + target_start = datetime.now() - timedelta(days=days) + all_dfs, to, prev_oldest = [], None, None + while True: + kwargs = dict(ticker=ticker, interval=interval, count=200) + if to: + kwargs["to"] = to + df = pyupbit.get_ohlcv(**kwargs) + if df is None or df.empty: + break + df.index = df.index.tz_localize(None) + oldest = df.index[0] + # 더 이상 과거로 못 가면 중단 (상장일에 도달) + if prev_oldest is not None and oldest >= prev_oldest: + all_dfs.append(df) + break + all_dfs.append(df) + prev_oldest = oldest + if oldest <= target_start: + break + to = oldest.strftime("%Y-%m-%d %H:%M:%S") + time.sleep(0.15) + if not all_dfs: + return None + result = pd.concat(all_dfs).sort_index().drop_duplicates() + return result[result.index >= target_start] + + +def load_data(tickers): + if CACHE_FILE.exists(): + print(f"캐시 로드: {CACHE_FILE}") + return pickle.load(open(CACHE_FILE, "rb")) + + data = {"1h": {}, "10m": {}} + for idx, ticker in enumerate(tickers, 1): + print(f" [{idx}/{len(tickers)}] {ticker}...", end=" ", flush=True) + df1h = fetch_ohlcv(ticker, "minute60", SIM_DAYS) + df10m = fetch_ohlcv(ticker, "minute10", SIM_DAYS) + n1h = len(df1h) if df1h is not None else 0 + n10m = len(df10m) if df10m is not None else 0 + print(f"1h={n1h}봉 10m={n10m}봉") + if df1h is not None and n1h >= 50: data["1h"][ticker] = df1h + if df10m is not None and n10m >= 200: data["10m"][ticker] = df10m + + pickle.dump(data, open(CACHE_FILE, "wb")) + print(f"캐시 저장: {CACHE_FILE}\n") + return data + + +# ── ATR 계산 (시뮬용) ───────────────────────────────────── +def calc_atr(df, buy_idx, n): + sub = df.iloc[max(0, buy_idx - n - 1):buy_idx] + if len(sub) < 3: + return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-n:].mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +# ── 포지션 시뮬 ────────────────────────────────────────── +def simulate_pos(df, buy_idx, buy_price, stop_pct, time_stop_candles): + peak = buy_price + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + ts = df.index[i] + if row["high"] > peak: + peak = row["high"] + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100 + return pnl > 0, sp, ts, f"트레일링({pnl:+.1f}%)", pnl + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= time_stop_candles and pnl_now < TIME_STOP_MIN_PCT: + pnl = (row["close"] * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100 + return pnl > 0, row["close"], ts, "타임스탑", pnl + last = df.iloc[-1]["close"] + pnl = (last * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100 + return pnl > 0, last, df.index[-1], "데이터종료", pnl + + +# ── 전략 A: 기존 1h봉 vol-lead ────────────────────────── +def run_a(df): + trades = [] + sig_i = sig_p = None + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(A_LOCAL_VOL + 2, 3) + + while i < len(df): + if in_pos: + is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, A_TIME_STOP) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, "일반")) + in_pos = False + sig_i = sig_p = None + i = next_i + continue + + close = df.iloc[i]["close"] + chg_2h = abs(close - df.iloc[i-2]["close"]) / df.iloc[i-2]["close"] * 100 + quiet = chg_2h < A_QUIET_PCT + vol_p = df.iloc[i-1]["volume"] + vol_avg = df.iloc[i-A_LOCAL_VOL-1:i-1]["volume"].mean() + spike = vol_avg > 0 and vol_p >= vol_avg * A_VOL_MULT + + if quiet and spike: + if sig_i is None: sig_i, sig_p = i, close + else: + if sig_i is not None and close < sig_p: sig_i = sig_p = None + if sig_i is not None and (i - sig_i) > A_SIGNAL_TO: + sig_i = sig_p = None + if sig_i is not None and (close - sig_p) / sig_p * 100 >= A_THRESH: + in_pos = True; buy_idx = i; buy_price = close + stop_pct = calc_atr(df, i, A_ATR_CANDLES) + sig_i = sig_p = None + i += 1 + return trades + + +# ── 전략 B: 10분봉 + 극단적 거래량 즉시 진입 ──────────── +def run_b(df): + # trade tuple: (is_win, pnl, buy_dt, sell_dt, entry_type) + # entry_type: '일반' | '극단' + trades = [] + sig_i = sig_p = None + extreme_pending = False + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(B_LOCAL_VOL + 2, B_QUIET_CANDLES + 1) + + while i < len(df): + if in_pos: + is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, B_TIME_STOP) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, entry_type)) + in_pos = False + sig_i = sig_p = None + extreme_pending = False + i = next_i + continue + + close = df.iloc[i]["close"] + vol_p = df.iloc[i-1]["volume"] + vol_avg = df.iloc[i-B_LOCAL_VOL-1:i-1]["volume"].mean() + vol_r = vol_p / vol_avg if vol_avg > 0 else 0 + + # 극단적 거래량 → 다음 봉 진입 대기 설정 + if not extreme_pending and vol_r >= B_EXTREME_VOL: + extreme_pending = True + i += 1 + continue + + # 극단적 거래량 다음 봉 → 즉시 진입 + if extreme_pending: + in_pos = True; buy_idx = i; buy_price = close + stop_pct = calc_atr(df, i, B_ATR_CANDLES) + entry_type = "극단" + extreme_pending = False + sig_i = sig_p = None + i += 1 + continue + + # 일반 vol-lead + close_2h = df.iloc[i - B_QUIET_CANDLES]["close"] + chg_2h = abs(close - close_2h) / close_2h * 100 + quiet = chg_2h < B_QUIET_PCT + spike = vol_r >= B_VOL_MULT + + if quiet and spike: + if sig_i is None: sig_i, sig_p = i, close + else: + if sig_i is not None and close < sig_p: sig_i = sig_p = None + if sig_i is not None and (i - sig_i) > B_SIGNAL_TO: + sig_i = sig_p = None + if sig_i is not None and (close - sig_p) / sig_p * 100 >= B_THRESH: + in_pos = True; buy_idx = i; buy_price = close + stop_pct = calc_atr(df, i, B_ATR_CANDLES) + entry_type = "일반" + sig_i = sig_p = None + i += 1 + return trades + + +# ── 통계 계산 ───────────────────────────────────────────── +def calc_stats(trades): + if not trades: + return {"n": 0, "wr": 0.0, "cum": 0.0, "dd": 0.0} + wins = sum(1 for t in trades if t[0]) + cum = peak = dd = 0.0 + for t in sorted(trades, key=lambda x: x[2]): + cum += t[1] + peak = max(peak, cum) + dd = max(dd, peak - cum) + return {"n": len(trades), "wr": wins / len(trades) * 100, "cum": cum, "dd": dd} + + +# ── 메인 ───────────────────────────────────────────────── +def main(): + top30 = pickle.load(open(TOP30_FILE, "rb")) + tickers = top30[:TOP_N] + + print(f"{'='*60}") + print(f"10분봉 전략 시뮬 | 최근 {SIM_DAYS}일 | {TOP_N}종목") + print(f" A: 기존 1h봉 vol-lead") + print(f" B: 10분봉 vol-lead + 극단거래량({B_EXTREME_VOL}x) 즉시 진입") + print(f"{'='*60}\n") + + print(f"데이터 로드 중...") + data = load_data(tickers) + + v1h = [t for t in tickers if t in data["1h"] and len(data["1h"][t]) >= 50] + v10m = [t for t in tickers if t in data["10m"] and len(data["10m"][t]) >= 200] + print(f"유효: 1h={len(v1h)}종목 / 10m={len(v10m)}종목\n") + + # 전략 A + a_all = [] + for t in v1h: + a_all.extend(run_a(data["1h"][t])) + + # 전략 B + b_all = [] + for t in v10m: + b_all.extend(run_b(data["10m"][t])) + + b_extreme = [t for t in b_all if t[4] == "극단"] + b_normal = [t for t in b_all if t[4] == "일반"] + + sa = calc_stats(a_all) + sb = calc_stats(b_all) + sbe = calc_stats(b_extreme) + sbn = calc_stats(b_normal) + + print(f"{'='*65}") + print(f"{'전략':18} {'거래수':>6} {'승률':>6} {'누적PnL%':>10} {'최대낙폭%':>10}") + print(f"{'─'*65}") + print(f"{'A (1h vol-lead)':18} {sa['n']:>6}건 {sa['wr']:>5.1f}% {sa['cum']:>+9.2f}% {-sa['dd']:>+9.2f}%") + print(f"{'B 전체 (10m)':18} {sb['n']:>6}건 {sb['wr']:>5.1f}% {sb['cum']:>+9.2f}% {-sb['dd']:>+9.2f}%") + print(f"{' └ 극단거래량':18} {sbe['n']:>6}건 {sbe['wr']:>5.1f}% {sbe['cum']:>+9.2f}% {-sbe['dd']:>+9.2f}%") + print(f"{' └ 일반vol-lead':18} {sbn['n']:>6}건 {sbn['wr']:>5.1f}% {sbn['cum']:>+9.2f}% {-sbn['dd']:>+9.2f}%") + print(f"{'='*65}") + + # 극단 거래량 진입 상세 + if b_extreme: + print(f"\n── 극단거래량 진입 상세 ({len(b_extreme)}건) ─────────────────────") + print(f" {'매수시각':<18} {'PnL%':>7} {'청산'}") + for t in sorted(b_extreme, key=lambda x: x[2])[:20]: + mark = "✅" if t[0] else "❌" + print(f" {str(t[2])[:16]:<18} {t[1]:>+6.1f}% {mark} {t[4]}") + + +if __name__ == "__main__": + main() diff --git a/velocity_backtest.py b/velocity_backtest.py new file mode 100644 index 0000000..593224b --- /dev/null +++ b/velocity_backtest.py @@ -0,0 +1,458 @@ +"""velocity_backtest.py — 속도 진입 효과 비교 백테스트. + +전략 A: 기존 거리 기반 (signal_price 대비 +THRESH% 도달 시 진입) +전략 B: 거리 + 속도 기반 (velocity >= 레짐별 VELOCITY_THRESHOLD 시 조기 진입) + BULL → vel_thresh = 0.10 (공격적) + NEUTRAL → vel_thresh = 0.15 (보수적) + BEAR → vel_thresh = 0.20 (더 높음) + +레짐 판단: KRW-BTC 1h 변동률 (캐시 데이터 활용) +10분봉 캐시(sim10m_cache.pkl)를 사용. +신호 감지: 40분봉 vol spike + 2h 횡보 (10분봉 합산/슬라이스로 계산) +진입/청산: 10분봉 단위로 체크 (실제 시스템의 15초 폴링 근사) +""" + +import pickle +from pathlib import Path +import pandas as pd + +# ── 파라미터 ────────────────────────────────────────────────────────────────── +CACHE_FILE = Path("sim10m_cache.pkl") +TOP_FILE = Path("top30_tickers.pkl") +TOP_N = 20 + +BUDGET = 15_000_000 +MIN_BUDGET = BUDGET * 3 // 10 +MAX_POS = 3 +FEE = 0.0005 + +# 전략 파라미터 +VOL_MULT = 2.0 # 거래량 배수 기준 +QUIET_PCT = 2.0 # 2h 횡보 기준 (%) +THRESH = 4.8 # 거리 기반 진입 임계값 (%) + +# 10분봉 기준 캔들 수 +QUIET_C = 12 # 2h = 12 × 10분 +VOL40_C = 4 # 40분봉 1개 = 4 × 10분봉 +LOCAL_C = 7 # 로컬 평균 40분봉 7개 = 28 × 10분봉 +TIMEOUT_C = 48 # 신호 타임아웃 8h = 48 × 10분봉 +TS_C = 48 # 타임스탑 8h = 48 × 10분봉 +ATR_C = 28 # ATR 5h = 7 × 40분 = 28 × 10분봉 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 +TS_MIN_PCT = 3.0 + +MIN_I = LOCAL_C * VOL40_C + VOL40_C + QUIET_C + 2 # = 42 + +# 속도 기반 진입 파라미터 (레짐별) +VEL_THRESH_BULL = 0.10 # BULL: 0.10%/분 (공격적) +VEL_THRESH_NEUTRAL = 0.15 # NEUTRAL: 0.15%/분 (보수적) +VEL_THRESH_BEAR = 0.20 # BEAR: 0.20%/분 (더 높음) +VELOCITY_MIN_MOVE = 0.5 # 최소 이동 % (잡음 제거) +VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분 + +# 레짐 판단 기준 (BTC 1h 변동률) +REGIME_BULL_CHANGE = 5.0 # +5% 이상 → BULL +REGIME_BEAR_CHANGE = -5.0 # -5% 이하 → BEAR + +WF_WINDOW = 4 +WF_MIN_WIN_RATE = 0.01 +WF_SHADOW_WINS = 2 + +# ── 낙폭 제어 파라미터 ──────────────────────────────────────────────────────── +HARD_STOP_PCT = 0.015 # 진입가 대비 -1.5% 즉시 청산 (하드 손절) +STREAK_TIGHT_N = 2 # 연속 N회 손절 시 타임스탑 강화 +TS_C_TIGHT = 24 # 강화 타임스탑 보유 시간 (4h = 24 × 10분) +TS_MIN_PCT_TIGHT = 0.0 # 강화 타임스탑 최소 수익률 (0%) + + +# ── 레짐 헬퍼 ───────────────────────────────────────────────────────────────── +def build_regime_series(btc_df: pd.DataFrame) -> pd.Series: + """BTC 10분봉으로 1h 변동률 계산 → 레짐 시리즈 반환.""" + close = btc_df["close"] + change = close.pct_change(6) * 100 # 6 × 10분 = 1h + regime = pd.Series("neutral", index=close.index, dtype=object) + regime[change > REGIME_BULL_CHANGE] = "bull" + regime[change < REGIME_BEAR_CHANGE] = "bear" + return regime + + +def _vel_thresh_for(regime_s: pd.Series, ts) -> float: + """타임스탬프 기준 레짐별 velocity 임계값 반환.""" + if regime_s is None: + return VEL_THRESH_NEUTRAL + idx = regime_s.index.searchsorted(ts) + if idx >= len(regime_s): + return VEL_THRESH_NEUTRAL + r = regime_s.iloc[idx] + if r == "bull": + return VEL_THRESH_BULL + elif r == "bear": + return VEL_THRESH_BEAR + return VEL_THRESH_NEUTRAL + + +# ── 헬퍼 ────────────────────────────────────────────────────────────────────── +def calc_atr(df: pd.DataFrame, i: int) -> float: + sub = df.iloc[max(0, i - ATR_C):i] + if len(sub) < 3: + return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +def simulate_pos(df: pd.DataFrame, buy_idx: int, + buy_price: float, stop_pct: float, + hard_stop: bool = False, tight_ts: bool = False): + """매수 이후 청산 시점·손익 계산. + + hard_stop : True → 진입가 대비 -HARD_STOP_PCT% 즉시 청산 + tight_ts : True → 강화된 타임스탑 (4h / 0%) 적용 + """ + peak = buy_price + hard_stop_px = buy_price * (1 - HARD_STOP_PCT) if hard_stop else None + ts_c_use = TS_C_TIGHT if tight_ts else TS_C + ts_min_use = TS_MIN_PCT_TIGHT if tight_ts else TS_MIN_PCT + + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + if row["high"] > peak: + peak = row["high"] + + # 1. 하드 손절 (진입가 대비 고정 %) + if hard_stop_px is not None and row["low"] <= hard_stop_px: + pnl = (hard_stop_px * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, df.index[i], pnl + + # 2. 트레일링 스탑 (최고가 대비) + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, df.index[i], pnl + + # 3. 타임스탑 + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= ts_c_use and pnl_now < ts_min_use: + pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, df.index[i], pnl + + last = df.iloc[-1]["close"] + pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, df.index[-1], pnl + + +def _prev_40m_vol(df: pd.DataFrame, i: int) -> float: + """직전 완성 40분봉 거래량 (10분봉 4개 합산).""" + return df.iloc[max(0, i - VOL40_C):i]["volume"].sum() + + +def _local_vol_avg(df: pd.DataFrame, i: int) -> float: + """로컬 5h 평균 (직전 7개 40분봉 각각의 합산 평균).""" + vols = [] + for k in range(1, LOCAL_C + 1): + end = i - VOL40_C * (k - 1) + start = end - VOL40_C + if start < 0: + break + vols.append(df.iloc[start:end]["volume"].sum()) + return sum(vols) / len(vols) if vols else 0 + + +# ── 핵심 전략 루프 ───────────────────────────────────────────────────────────── +def run_strategy(df: pd.DataFrame, ticker: str, + use_velocity: bool = False, + regime_s: pd.Series = None, + dd_control: bool = False) -> list: + """ + Returns list of (is_win, pnl, buy_dt, sell_dt, ticker, entry_type) + entry_type: 'dist' | 'vel' + + dd_control: True → 연속 손절 추적하여 hard_stop + tight_ts 적용 + """ + trades = [] + sig_i = sig_p = None + in_pos = False + buy_idx = buy_price = stop_pct = None + entry_type = "dist" + consec_losses = 0 # 연속 손절 횟수 + + i = MIN_I + while i < len(df): + # ── 포지션 중 → 청산 계산 후 다음 진입 탐색 ────────────────── + if in_pos: + use_hard = dd_control and consec_losses >= STREAK_TIGHT_N + use_tight = dd_control and consec_losses >= STREAK_TIGHT_N + is_win, sdt, pnl = simulate_pos( + df, buy_idx, buy_price, stop_pct, + hard_stop=use_hard, tight_ts=use_tight, + ) + next_i = next( + (j for j in range(i, len(df)) if df.index[j] > sdt), + len(df), + ) + trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, entry_type)) + # 연속 손절 카운터 업데이트 + if is_win: + consec_losses = 0 + else: + consec_losses += 1 + in_pos = False + sig_i = sig_p = None + i = next_i + continue + + close = df.iloc[i]["close"] + + # ── 신호 감지 ───────────────────────────────────────────────── + prev_vol = _prev_40m_vol(df, i) + local_avg = _local_vol_avg(df, i) + vol_r = prev_vol / local_avg if local_avg > 0 else 0 + + close_2h = df.iloc[i - QUIET_C]["close"] + quiet = abs(close - close_2h) / close_2h * 100 < QUIET_PCT + spike = vol_r >= VOL_MULT + + if quiet and spike: + if sig_i is None: + sig_i, sig_p = i, close + else: + if sig_i is not None and close < sig_p: + sig_i = sig_p = None + + # 타임아웃 + if sig_i is not None and (i - sig_i) > TIMEOUT_C: + sig_i = sig_p = None + + # ── 진입 판단 ───────────────────────────────────────────────── + if sig_i is not None: + move_pct = (close - sig_p) / sig_p * 100 + age_min = (i - sig_i) * 10 # 10분봉 × 10분 + + # A. 거리 기반 + if move_pct >= THRESH: + in_pos = True + buy_idx = i + buy_price = close + stop_pct = calc_atr(df, i) + entry_type = "dist" + sig_i = sig_p = None + i += 1 + continue + + # B. 속도 기반 (use_velocity=True 일 때만) + if (use_velocity + and age_min >= VELOCITY_MIN_AGE_M + and move_pct >= VELOCITY_MIN_MOVE): + velocity = move_pct / age_min + vel_thresh = _vel_thresh_for(regime_s, df.index[i]) + if velocity >= vel_thresh: + in_pos = True + buy_idx = i + buy_price = close + stop_pct = calc_atr(df, i) + entry_type = "vel" + sig_i = sig_p = None + i += 1 + continue + + i += 1 + return trades + + +# ── WF 필터 ─────────────────────────────────────────────────────────────────── +def apply_wf(trades: list) -> tuple: + history, shadow_streak, blocked = [], 0, False + accepted, blocked_cnt = [], 0 + for trade in trades: + is_win = int(trade[0]) + if not blocked: + accepted.append(trade) + history.append(is_win) + if len(history) >= WF_WINDOW: + if sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE: + blocked = True + shadow_streak = 0 + else: + blocked_cnt += 1 + shadow_streak = (shadow_streak + 1) if is_win else 0 + if shadow_streak >= WF_SHADOW_WINS: + blocked = False + history = [] + shadow_streak = 0 + return accepted, blocked_cnt + + +# ── MAX_POSITIONS 필터 ──────────────────────────────────────────────────────── +def apply_max_pos(all_trades: list) -> tuple: + open_exits, accepted, skipped = [], [], [] + for trade in sorted(all_trades, key=lambda x: x[2]): + buy_dt, sell_dt = trade[2], trade[3] + open_exits = [s for s in open_exits if s > buy_dt] + if len(open_exits) < MAX_POS: + open_exits.append(sell_dt) + accepted.append(trade) + else: + skipped.append(trade) + return accepted, skipped + + +# ── 복리 시뮬 ───────────────────────────────────────────────────────────────── +def simulate(accepted: list) -> dict: + portfolio = float(BUDGET) + total_krw = 0.0 + monthly = {} + peak_pf = BUDGET + max_dd = 0.0 + win_cnt = 0 + + vel_count = sum(1 for t in accepted if t[5] == "vel") + vel_wins = sum(1 for t in accepted if t[5] == "vel" and t[0]) + vel_pnls = [t[1] for t in accepted if t[5] == "vel"] + dist_pnls = [t[1] for t in accepted if t[5] == "dist"] + + for is_win, pnl, buy_dt, sell_dt, ticker, etype in accepted: + pos_size = max(portfolio, MIN_BUDGET) / MAX_POS + krw_profit = pos_size * pnl / 100 + portfolio = max(portfolio + krw_profit, MIN_BUDGET) + total_krw += krw_profit + peak_pf = max(peak_pf, portfolio) + dd = (peak_pf - portfolio) / peak_pf * 100 + max_dd = max(max_dd, dd) + win_cnt += int(is_win) + + ym = buy_dt.strftime("%Y-%m") + m = monthly.setdefault(ym, {"t": 0, "w": 0, "krw": 0.0}) + m["t"] += 1; m["w"] += int(is_win); m["krw"] += krw_profit + + n = len(accepted) + return { + "portfolio": portfolio, + "total_krw": total_krw, + "roi": (portfolio - BUDGET) / BUDGET * 100, + "n": n, + "wins": win_cnt, + "wr": win_cnt / n * 100 if n else 0, + "max_dd": max_dd, + "monthly": monthly, + "vel_count": vel_count, + "vel_wins": vel_wins, + "vel_wr": vel_wins / vel_count * 100 if vel_count else 0, + "vel_avg_pnl": sum(vel_pnls) / len(vel_pnls) if vel_pnls else 0, + "dist_avg_pnl": sum(dist_pnls) / len(dist_pnls) if dist_pnls else 0, + } + + +# ── 메인 ────────────────────────────────────────────────────────────────────── +def main(): + print("캐시 로드...") + cache = pickle.load(open(CACHE_FILE, "rb")) + top30 = pickle.load(open(TOP_FILE, "rb")) + tickers = [t for t in top30[:TOP_N] if t in cache["10m"]] + print(f"유효 종목: {len(tickers)}개\n") + + # BTC 레짐 시리즈 빌드 + btc_df = cache["10m"].get("KRW-BTC") + regime_s = build_regime_series(btc_df) if btc_df is not None else None + if regime_s is not None: + bull_pct = (regime_s == "bull").mean() * 100 + bear_pct = (regime_s == "bear").mean() * 100 + print(f"레짐 분포: BULL {bull_pct:.1f}% / NEUTRAL {100-bull_pct-bear_pct:.1f}% / BEAR {bear_pct:.1f}%") + print(f"vel threshold: BULL={VEL_THRESH_BULL} / NEUTRAL={VEL_THRESH_NEUTRAL} / BEAR={VEL_THRESH_BEAR}\n") + + all_a, all_b, all_c = [], [], [] + wf_a_total = wf_b_total = wf_c_total = 0 + + for t in tickers: + df = cache["10m"][t] + if len(df) < MIN_I + 50: + continue + + raw_a = run_strategy(df, t, use_velocity=False) + raw_b = run_strategy(df, t, use_velocity=True, regime_s=regime_s) + raw_c = run_strategy(df, t, use_velocity=True, regime_s=regime_s, dd_control=True) + + fa, ba = apply_wf(raw_a) + fb, bb = apply_wf(raw_b) + fc, bc = apply_wf(raw_c) + wf_a_total += ba; wf_b_total += bb; wf_c_total += bc + all_a.extend(fa); all_b.extend(fb); all_c.extend(fc) + + acc_a, skp_a = apply_max_pos(all_a) + acc_b, skp_b = apply_max_pos(all_b) + acc_c, skp_c = apply_max_pos(all_c) + + ra = simulate(acc_a) + rb = simulate(acc_b) + rc = simulate(acc_c) + + # ── 날짜 범위 ───────────────────────────────────────── + def date_range(acc): + if acc: + s = min(t[2] for t in acc).strftime("%Y-%m-%d") + e = max(t[3] for t in acc).strftime("%Y-%m-%d") + return f"{s} ~ {e}" + return "N/A" + + # ── 출력 ───────────────────────────────────────────── + W = 72 + print("=" * W) + print(f" 낙폭 제어 비교 백테스트 | 10분봉 | {len(tickers)}종목") + print(f" 기간: {date_range(acc_a)}") + print(f" hard_stop={HARD_STOP_PCT*100:.1f}% | tight_ts={TS_C_TIGHT*10//60}h+{TS_MIN_PCT_TIGHT:.0f}% " + f"(연속 {STREAK_TIGHT_N}손절 후)") + print("=" * W) + print(f" {'항목':<22} {'A. 기존':>12} {'B. +속도':>12} {'C. +속도+DD제어':>14}") + print(f" {'─'*64}") + + def row3(label, va, vb, vc, fmt="{}"): + sa, sb, sc = fmt.format(va), fmt.format(vb), fmt.format(vc) + try: + dbc = float(str(vc).replace(",","").replace("%","").replace("원","")) \ + - float(str(va).replace(",","").replace("%","").replace("원","")) + dc = f" ({dbc:+.1f})" if abs(dbc) >= 0.01 else "" + except Exception: + dc = "" + print(f" {label:<22} {sa:>12} {sb:>12} {sc:>14}{dc}") + + row3("총 진입", ra["n"], rb["n"], rc["n"], "{:,}건") + row3(" 속도 진입", 0, rb["vel_count"], rc["vel_count"], "{:,}건") + row3("WF 차단", wf_a_total, wf_b_total, wf_c_total, "{:,}건") + row3("MAX_POS 스킵", len(skp_a), len(skp_b), len(skp_c), "{:,}건") + print(f" {'─'*64}") + row3("승률", f"{ra['wr']:.1f}%", f"{rb['wr']:.1f}%", f"{rc['wr']:.1f}%") + row3(" 속도진입 승률","-", f"{rb['vel_wr']:.1f}%", f"{rc['vel_wr']:.1f}%") + print(f" {'─'*64}") + row3("평균 pnl (거리)",f"{ra['dist_avg_pnl']:.2f}%", f"{rb['dist_avg_pnl']:.2f}%", f"{rc['dist_avg_pnl']:.2f}%") + row3("평균 pnl (속도)","-", f"{rb['vel_avg_pnl']:.2f}%", f"{rc['vel_avg_pnl']:.2f}%") + print(f" {'─'*64}") + row3("최종 자산", f"{ra['portfolio']:,.0f}원", f"{rb['portfolio']:,.0f}원", f"{rc['portfolio']:,.0f}원") + row3("총 수익", f"{ra['total_krw']:+,.0f}원", f"{rb['total_krw']:+,.0f}원", f"{rc['total_krw']:+,.0f}원") + row3("수익률", f"{ra['roi']:.2f}%", f"{rb['roi']:.2f}%", f"{rc['roi']:.2f}%") + row3("최대 낙폭", f"{-ra['max_dd']:.2f}%", f"{-rb['max_dd']:.2f}%", f"{-rc['max_dd']:.2f}%") + print("=" * W) + + # ── 월별 ───────────────────────────────────────────── + print(f"\n── 월별 수익 비교 {'─'*50}") + print(f" {'월':^8} │ {'A':>5} {'A%':>4} {'A수익':>10} │ " + f"{'B':>5} {'B%':>4} {'B수익':>10} │ " + f"{'C':>5} {'C%':>4} {'C수익':>10}") + all_months = sorted(set(list(ra["monthly"]) + list(rb["monthly"]) + list(rc["monthly"]))) + for ym in all_months: + ma = ra["monthly"].get(ym, {"t":0,"w":0,"krw":0}) + mb = rb["monthly"].get(ym, {"t":0,"w":0,"krw":0}) + mc = rc["monthly"].get(ym, {"t":0,"w":0,"krw":0}) + wra = ma["w"]/ma["t"]*100 if ma["t"] else 0 + wrb = mb["w"]/mb["t"]*100 if mb["t"] else 0 + wrc = mc["w"]/mc["t"]*100 if mc["t"] else 0 + print(f" {ym:^8} │ {ma['t']:>4}건 {wra:>3.0f}% {ma['krw']:>+9,.0f}원 │ " + f"{mb['t']:>4}건 {wrb:>3.0f}% {mb['krw']:>+9,.0f}원 │ " + f"{mc['t']:>4}건 {wrc:>3.0f}% {mc['krw']:>+9,.0f}원") + print("=" * W) + + +if __name__ == "__main__": + main()