feat: add velocity entry, fast-poll thread, tighten BEAR threshold

- Add velocity-based entry signal in strategy.py (VELOCITY_THRESHOLD=0.10,
  VELOCITY_MIN_MOVE=0.5%, VELOCITY_MIN_AGE_M=5)
- Add fast-poll thread in daemon/runner.py (SIGNAL_POLL_INTERVAL=15s)
  for sub-minute velocity event detection
- Add vol_ratio tiered condition and get_active_signals() to strategy.py
- Change BEAR_THRESHOLD -1.0 → -0.5 in market_regime.py to catch
  slow downtrends earlier (weighted 2h score)
- Expand sell_reason VARCHAR2(500) in price_db.py DDL
- Add velocity_backtest.py and sim10m.py for strategy experimentation
- Update STRATEGY.md: correct regime algorithm description (weighted 2h
  score, not BTC 1h ±5%), add fast-poll/velocity sections, add backtest
  section D, add change history table

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-03 10:17:08 +09:00
parent 612055162e
commit 673ce08d84
7 changed files with 986 additions and 36 deletions

View File

@@ -15,7 +15,7 @@
## 진입 조건 (2단계) ## 진입 조건 (2단계)
### 1단계: 매집 신호 감지 ### 1단계: 매집 신호 감지
다음 두 조건 동시 충족 시 `signal_price` 기록: 다음 두 조건 동시 충족 시 `signal_price` + `vol_ratio` 기록:
| 조건 | 파라미터 | 기본값 | | 조건 | 파라미터 | 기본값 |
|------|----------|--------| |------|----------|--------|
@@ -29,12 +29,40 @@
> **2h 횡보 체크**: Oracle DB에 저장된 실시간 가격 기록을 조회 (`get_price_n_hours_ago`) > **2h 횡보 체크**: Oracle DB에 저장된 실시간 가격 기록을 조회 (`get_price_n_hours_ago`)
> **거래량 체크**: `minute10` → 40분봉 resample → 직전 완성봉 vs 이전 7봉 평균 > **거래량 체크**: `minute10` → 40분봉 resample → 직전 완성봉 vs 이전 7봉 평균
### 2단계: 추세 확인 후 진입 ### 2단계: 추세 확인 후 진입 (거리 기반 OR 속도 기반 — 먼저 충족되는 조건으로 진입)
`signal_price` 대비 +`TREND_AFTER_VOL`% 이상 상승 확인 시 매수:
**A. 거리 기반**: `signal_price` 대비 +임계값% 이상 상승 시 매수 (vol_ratio에 따라 동적 조정)
| vol_ratio | 진입 임계값 | 설명 |
|-----------|------------|------|
| ≥ 5.0x | +1.0% | 매우 강한 신호 |
| ≥ 3.5x | +2.0% | 강한 신호 |
| ≥ 2.5x | +3.0% | 중간 신호 |
| < 2.5x | +`TREND_AFTER_VOL`% | 기본 임계값 |
**B. 속도 기반 (조기 진입)**: 신호 후 가격 상승 속도가 `VELOCITY_THRESHOLD` 이상이면 즉시 진입
| 파라미터 | 기본값 | 설명 | | 파라미터 | 기본값 | 설명 |
|----------|--------|------| |----------|--------|------|
| `TREND_AFTER_VOL` | 4.8% | 신호가 대비 진입 임계값 | | `TREND_AFTER_VOL` | 4.8% | 거리 기반 기본 임계값 |
| `VELOCITY_THRESHOLD` | 0.10%/분 | 속도 기준 (6%/h — 가파른 상승 감지) |
| `VELOCITY_MIN_MOVE` | 0.5% | 속도 체크 전 최소 이동 % |
| `VELOCITY_MIN_AGE_M` | 5분 | 속도 체크 전 최소 경과 시간 |
> **예시 (BTC 23:20 신호)**: 23:30에 +1.26%/6분 = 0.21%/분 → 속도 기준 충족 → 조기 진입
> 실제 진입(00:34, 100,840,000) 대비 약 1시간 빠른 23:30(97,286,000) 진입 가능
### 신호 감시 스레드 (Fast Poll)
신호 감지 후 전체 스캔 60초를 기다리지 않고 해당 종목만 빠르게 폴링.
| 파라미터 | 기본값 | 설명 |
|----------|--------|------|
| `SCAN_INTERVAL` | 60초 | 전체 시장 스캔 주기 |
| `SIGNAL_POLL_INTERVAL` | 15초 | 신호 종목 집중 감시 주기 |
- 신호 발생 시 별도 스레드(`signal-fast-poll`)가 15초마다 해당 종목만 체크
- 목표 임계값(거리 or 속도) 도달 즉시 매수 → 60초 지연 제거
--- ---
@@ -62,7 +90,7 @@
| `WF_MIN_WIN_RATE` | 0.01 | 최소 승률 임계값 (1%) | | `WF_MIN_WIN_RATE` | 0.01 | 최소 승률 임계값 (1%) |
| `WF_SHADOW_WINS` | 2 | 차단 해제 조건 (가상 N연승) | | `WF_SHADOW_WINS` | 2 | 차단 해제 조건 (가상 N연승) |
- 직전 2모두 손실 → 해당 종목 진입 차단 - 직전 4승률 < 1% → 해당 종목 진입 차단
- 차단 후 가상 추적으로 2연승 달성 시 자동 복귀 - 차단 후 가상 추적으로 2연승 달성 시 자동 복귀
- **WF 차단 상태는 Oracle DB(`wf_state` 테이블)에 영속 저장** → 재시작 후에도 복원 - **WF 차단 상태는 Oracle DB(`wf_state` 테이블)에 영속 저장** → 재시작 후에도 복원
@@ -76,14 +104,27 @@
## 시장 레짐 적응 ## 시장 레짐 적응
| 레짐 | BTC 1h 변동 | 거래량 기준 | BTC·ETH·SOL·XRP 가중평균 **2h 추세 score**로 레짐 결정.
|------|------------|------------|
| BULL | +5% 이상 | 1.5x |
| NEUTRAL | ±5% 이내 | 2.0x |
| BEAR | -5% 이하 | 진입 차단 |
- BEAR 레짐 감지 시 신규 진입 전면 차단 | 종목 | 가중치 |
- 레짐별 `vol_mult` 조정으로 민감도 제어 |------|--------|
| KRW-BTC | 40% |
| KRW-ETH | 30% |
| KRW-SOL | 15% |
| KRW-XRP | 15% |
| 레짐 | score 기준 | vol_mult | 신규 진입 |
|------|-----------|----------|---------|
| BULL | ≥ +1.5% | 1.5x | 허용 |
| NEUTRAL | -0.5% ~ +1.5% | 2.0x | 허용 |
| BEAR | < -0.5% | 3.5x | **전면 차단** |
- BEAR 레짐 감지 시 신규 매수 전면 차단 (기존 포지션 청산은 정상 진행)
- 레짐 캐시 TTL: 10분 (API 호출 최소화)
- 현재가는 매 레짐 계산 시 Oracle DB(`price_history`)에 저장 → 2h 전 가격 조회에 재활용
> **2026-03-03 조정**: BEAR 기준 -1.0% → **-0.5%**로 강화
> 완만한 하락장(score ≈ -0.4%)에서 NEUTRAL로 오판하던 문제 수정
--- ---
@@ -96,6 +137,11 @@ TREND_AFTER_VOL=4.8 # 진입 임계값 (신호가 대비 %)
SIGNAL_TIMEOUT_H=8.0 # 신호 유효 시간 (h) SIGNAL_TIMEOUT_H=8.0 # 신호 유효 시간 (h)
VOLUME_MULTIPLIER=2.0 # 거래량 배수 기준 VOLUME_MULTIPLIER=2.0 # 거래량 배수 기준
# 속도 기반 조기 진입
VELOCITY_THRESHOLD=0.10 # %/분 (0.10 = 6%/h)
VELOCITY_MIN_MOVE=0.5 # 최소 이동 % (잡음 제거)
VELOCITY_MIN_AGE_M=5.0 # 최소 경과 시간 (분)
# 청산 # 청산
TIME_STOP_HOURS=8 # 타임스탑 보유 시간 TIME_STOP_HOURS=8 # 타임스탑 보유 시간
TIME_STOP_MIN_GAIN_PCT=3 # 타임스탑 최소 수익률 TIME_STOP_MIN_GAIN_PCT=3 # 타임스탑 최소 수익률
@@ -104,8 +150,11 @@ TIME_STOP_MIN_GAIN_PCT=3 # 타임스탑 최소 수익률
MAX_BUDGET=15000000 # 초기 운용 예산 MAX_BUDGET=15000000 # 초기 운용 예산
MAX_POSITIONS=3 # 최대 동시 보유 종목 MAX_POSITIONS=3 # 최대 동시 보유 종목
# 감시 주기
SIGNAL_POLL_INTERVAL=15 # 신호 종목 빠른 감시 (초)
# WF 필터 # WF 필터
WF_WINDOW=2 WF_WINDOW=4
WF_MIN_WIN_RATE=0.01 WF_MIN_WIN_RATE=0.01
WF_SHADOW_WINS=2 WF_SHADOW_WINS=2
``` ```
@@ -167,22 +216,35 @@ WF_SHADOW_WINS=2
| 2.5% | 50.8% | +256% | -5.3% | 1.77% | | 2.5% | 50.8% | +256% | -5.3% | 1.77% |
| 4.0% | 45.9% | -52% | -29.1% | 3.11% | | 4.0% | 45.9% | -52% | -29.1% | 3.11% |
### D. 속도 진입 효과 비교 — 10분봉 (`velocity_backtest.py`)
> 기간: 2026-01-19 ~ 2026-03-02 / 10분봉 캐시 / 20종목
| 설정 | 속도진입 | 승률 | 수익률 | 최대낙폭 |
|------|---------|------|--------|---------|
| A: 거리 기반만 | 0건 | 34.7% | +8.83% | -8.35% |
| B: +속도(0.10) | 89건 | 33.6% | +13.36% | **-4.50%** |
| B: +속도(0.15) | 39건 | 35.7% | +17.19% | -7.84% |
- **0.10 채택**: 낙폭 -8.35% → -4.50% 개선, 수익률 +4.5%p
- 속도진입 BTC 예시: 0.21%/분 → 0.10 기준 충족 (0.20도 아슬하게 충족)
--- ---
## 주요 파일 ## 주요 파일
| 파일 | 역할 | | 파일 | 역할 |
|------|------| |------|------|
| `core/strategy.py` | 진입 신호 로직 (40분봉 vol-lead) | | `core/strategy.py` | 진입 신호 로직 (40분봉 vol-lead + 속도 기반 조기 진입) |
| `core/monitor.py` | ATR 트레일링 스탑 + 타임스탑 (40분봉 ATR) | | `core/monitor.py` | ATR 트레일링 스탑 + 타임스탑 (40분봉 ATR) |
| `core/trader.py` | 주문 실행 + 복리 예산 관리 | | `core/trader.py` | 주문 실행 + 복리 예산 관리 |
| `core/market_regime.py` | 시장 레짐 감지 | | `core/market_regime.py` | 시장 레짐 감지 (BTC/ETH/SOL/XRP 가중 2h 추세) |
| `core/price_db.py` | 가격 DB + WF 상태 영속화 | | `core/price_db.py` | 가격 DB + WF 상태 영속화 |
| `daemon/runner.py` | 전체 스캔 루프 + 신호 종목 fast-poll 스레드 |
| `ohlcv_db.py` | OHLCV 시계열 DB 캐시 관리 | | `ohlcv_db.py` | OHLCV 시계열 DB 캐시 관리 |
| `sim_365.py` | 365일 복리 시뮬레이션 (1h봉, DB) | | `sim_365.py` | 365일 복리 시뮬레이션 (1h봉, DB) |
| `sim_45m40.py` | 45일 복리 시뮬레이션 (40분봉, 캐시) | | `sim_45m40.py` | 45일 복리 시뮬레이션 (40분봉, 캐시) |
| `velocity_backtest.py` | 속도 진입 효과 비교 백테스트 (A vs B vs C) |
| `atr_sweep.py` | ATR_MAX_STOP 파라미터 스윕 | | `atr_sweep.py` | ATR_MAX_STOP 파라미터 스윕 |
| `sim10m.py` | 10분봉 vs 1h봉 전략 비교 시뮬 |
| `interval_sweep.py` | 봉 단위별 성과 비교 (10/20/30/40/50/60분) | | `interval_sweep.py` | 봉 단위별 성과 비교 (10/20/30/40/50/60분) |
--- ---
@@ -196,6 +258,9 @@ python sim_45m40.py
# 365일 복리 시뮬 — 1h봉 (DB에서 로드) # 365일 복리 시뮬 — 1h봉 (DB에서 로드)
python sim_365.py python sim_365.py
# 속도 진입 효과 비교
python velocity_backtest.py
# 봉 단위별 비교 (10m 캐시 필요) # 봉 단위별 비교 (10m 캐시 필요)
python interval_sweep.py python interval_sweep.py
@@ -208,3 +273,15 @@ python ohlcv_db.py status
# 신규 봉 증분 업데이트 # 신규 봉 증분 업데이트
python ohlcv_db.py update python ohlcv_db.py update
``` ```
---
## 변경 이력
| 날짜 | 변경 내용 |
|------|---------|
| 2026-03-03 | BEAR_THRESHOLD -1.0% → **-0.5%** 강화 (완만한 하락장 오판 수정) |
| 2026-03-03 | 속도 기반 조기 진입 추가 (`VELOCITY_THRESHOLD=0.10%/분`) |
| 2026-03-03 | 신호 종목 fast-poll 스레드 추가 (`SIGNAL_POLL_INTERVAL=15s`) |
| 2026-03-03 | `sell_reason` 컬럼 VARCHAR2(100→500) 자동 확장 |
| 2026-03-03 | vol_ratio 강도별 진입 임계값 티어 추가 (5x→1%, 3.5x→2%, 2.5x→3%) |

View File

@@ -26,7 +26,7 @@ LEADERS: dict[str, float] = {
TREND_HOURS = 2 # 2h 추세 기준 TREND_HOURS = 2 # 2h 추세 기준
BULL_THRESHOLD = 1.5 # score ≥ 1.5% → Bull BULL_THRESHOLD = 1.5 # score ≥ 1.5% → Bull
BEAR_THRESHOLD = -1.0 # score < -1.0% → Bear BEAR_THRESHOLD = -0.5 # score < -0.5% → Bear
# 레짐별 매수 조건 파라미터 # 레짐별 매수 조건 파라미터
REGIME_PARAMS: dict[str, dict] = { REGIME_PARAMS: dict[str, dict] = {

View File

@@ -170,6 +170,13 @@ def ensure_trade_results_table() -> None:
except oracledb.DatabaseError as e: except oracledb.DatabaseError as e:
if e.args[0].code not in (955, 1408): if e.args[0].code not in (955, 1408):
raise raise
# sell_reason 컬럼이 100 BYTE 이하이면 500으로 확장
try:
conn.cursor().execute(
"ALTER TABLE trade_results MODIFY sell_reason VARCHAR2(500)"
)
except oracledb.DatabaseError:
pass # 이미 500 이상이거나 컬럼 없으면 무시
def record_trade( def record_trade(

View File

@@ -3,8 +3,13 @@
흐름: 흐름:
1. 직전 40분봉 거래량 > 로컬 5h(7봉) 평균 × VOL_MULT AND 1. 직전 40분봉 거래량 > 로컬 5h(7봉) 평균 × VOL_MULT AND
2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적) 2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
→ 신호가(signal_price) 기록 → 신호가(signal_price) + 거래량비율(vol_ratio) 기록
2. signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입 2. signal_price 대비 +임계값% 이상 상승 시 진입
임계값은 vol_ratio 강도에 따라 자동 조정 (강한 신호 → 낮은 임계값 → 조기 진입)
- vol_ratio ≥ 5.0x → +1.0%
- vol_ratio ≥ 3.5x → +2.0%
- vol_ratio ≥ 2.5x → +3.0%
- 기본 → +TREND_AFTER_VOL%
3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화 3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화
캔들: minute10 데이터를 40분봉으로 리샘플링하여 사용 캔들: minute10 데이터를 40분봉으로 리샘플링하여 사용
@@ -38,6 +43,29 @@ VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
_CANDLE_MIN = 40 _CANDLE_MIN = 40
_FETCH_10M = (LOCAL_VOL_CANDLES + 3) * (_CANDLE_MIN // 10) # 40 개의 10분봉 _FETCH_10M = (LOCAL_VOL_CANDLES + 3) * (_CANDLE_MIN // 10) # 40 개의 10분봉
# 신호 강도별 진입 임계값 단계 (vol_ratio 최소값, 진입 임계값%)
# 강한 신호일수록 낮은 임계값으로 조기 진입
_ENTRY_TIERS: list[tuple[float, float]] = [
(5.0, 1.0), # 매우 강한 신호 (≥5x) → +1.0% 즉시 진입
(3.5, 2.0), # 강한 신호 (≥3.5x) → +2.0% 조기 진입
(2.5, 3.0), # 중간 신호 (≥2.5x) → +3.0% 반조기 진입
]
# 위 조건 미충족 시 TREND_AFTER_VOL 사용
# 속도(velocity) 기반 조기 진입 파라미터
# 신호 후 가격이 빠르게 상승 중이면 거리 임계값 도달 전에 선진입
VELOCITY_THRESHOLD = float(os.getenv("VELOCITY_THRESHOLD", "0.10")) # %/분 (0.10 = 6%/h)
VELOCITY_MIN_MOVE = float(os.getenv("VELOCITY_MIN_MOVE", "0.5")) # 최소 이동 % (잡음 제거)
VELOCITY_MIN_AGE_M = float(os.getenv("VELOCITY_MIN_AGE_M", "5.0")) # 최소 경과 시간(분)
def _calc_entry_threshold(vol_ratio: float) -> float:
"""거래량 비율에 따른 진입 임계값 반환. 강한 신호일수록 낮은 값."""
for min_ratio, threshold in _ENTRY_TIERS:
if vol_ratio >= min_ratio:
return threshold
return TREND_AFTER_VOL
def _resample_40m(df): def _resample_40m(df):
"""minute10 DataFrame → 40분봉으로 리샘플링.""" """minute10 DataFrame → 40분봉으로 리샘플링."""
@@ -47,10 +75,19 @@ def _resample_40m(df):
.dropna(subset=["close"]) .dropna(subset=["close"])
) )
# 축적 신호 상태: ticker → {"price": float, "time": float(unix)} # 축적 신호 상태: ticker → {"price": float, "time": float(unix), "vol_ratio": float}
_accum_signals: dict[str, dict] = {} _accum_signals: dict[str, dict] = {}
def get_active_signals() -> dict[str, dict]:
"""현재 활성화된 신호 딕셔너리 반환 (fast-poll 루프용).
Returns:
{ticker: {"price": float, "time": float, "vol_ratio": float}}
"""
return dict(_accum_signals)
def _check_vol_spike(ticker: str, vol_mult: float) -> bool: def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
"""직전 완성 40분봉 거래량이 로컬 5h(7봉) 평균의 vol_mult 배 이상인지 확인.""" """직전 완성 40분봉 거래량이 로컬 5h(7봉) 평균의 vol_mult 배 이상인지 확인."""
try: try:
@@ -126,27 +163,33 @@ def should_buy(ticker: str) -> bool:
if not _check_vol_spike(ticker, vol_mult): if not _check_vol_spike(ticker, vol_mult):
return False return False
# 축적 신호 기록 # 거래량 비율 계산 후 신호 기록
_accum_signals[ticker] = {"price": current, "time": now} ratio = 0.0
logger.info(
f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}"
)
# 거래량 비율 계산 후 알림 전송
try: try:
df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M) df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M)
df_h = _resample_40m(df10) if df10 is not None else None df_h = _resample_40m(df10) if df10 is not None else None
if df_h is not None and len(df_h) >= LOCAL_VOL_CANDLES + 1: if df_h is not None and len(df_h) >= LOCAL_VOL_CANDLES + 1:
recent_vol = df_h["volume"].iloc[-2] recent_vol = df_h["volume"].iloc[-2]
local_avg = df_h["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() local_avg = df_h["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean()
ratio = recent_vol / local_avg if local_avg > 0 else 0 ratio = recent_vol / local_avg if local_avg > 0 else 0.0
notify_signal(ticker, current, ratio)
except Exception: except Exception:
notify_signal(ticker, current, 0.0) pass
entry_thr = _calc_entry_threshold(ratio)
_accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": ratio}
logger.info(
f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}"
f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}%)"
)
notify_signal(ticker, current, ratio)
return False # 신호 첫 발생 시는 진입 안 함 return False # 신호 첫 발생 시는 진입 안 함
# ── 신호 있음: 상승 확인 → 진입 ───────────────────────── # ── 신호 있음: 상승 확인 → 진입 ─────────────────────────
signal_price = sig["price"] signal_price = sig["price"]
vol_ratio = sig.get("vol_ratio", 0.0)
entry_thr = _calc_entry_threshold(vol_ratio)
move_pct = (current - signal_price) / signal_price * 100 move_pct = (current - signal_price) / signal_price * 100
age_min = (now - sig["time"]) / 60
if current < signal_price: if current < signal_price:
# 신호가 이하 하락 → 축적 실패 # 신호가 이하 하락 → 축적 실패
@@ -156,16 +199,34 @@ def should_buy(ticker: str) -> bool:
) )
return False return False
if move_pct >= TREND_AFTER_VOL: # ── 거리 기반 진입 ─────────────────────────────────────
if move_pct >= entry_thr:
del _accum_signals[ticker] del _accum_signals[ticker]
logger.info( logger.info(
f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}" f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}"
f"(+{move_pct:.1f}% ≥ {TREND_AFTER_VOL}%)" f"(+{move_pct:.1f}% ≥ {entry_thr:.1f}% | 거래량={vol_ratio:.2f}x)"
) )
return True return True
# ── 속도 기반 조기 진입 ────────────────────────────────
# 신호 후 N분 이상 경과 + 최소 이동 + 분당 상승률 기준 충족 시 선진입
if age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE:
velocity = move_pct / age_min # %/분
if velocity >= VELOCITY_THRESHOLD:
del _accum_signals[ticker]
logger.info(
f"[속도진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}"
f"(+{move_pct:.1f}% | {velocity:.3f}%/분 ≥ {VELOCITY_THRESHOLD}%/분 | "
f"경과={age_min:.1f}분 | 거래량={vol_ratio:.2f}x)"
)
return True
logger.debug( logger.debug(
f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} " f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} "
f"(+{move_pct:.1f}% / 목표={TREND_AFTER_VOL}%)" f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}% | "
f"속도={move_pct/age_min:.3f}%/분 | 경과={age_min:.1f}분)"
if age_min > 0 else
f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} "
f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}%)"
) )
return False return False

View File

@@ -1,21 +1,60 @@
"""매수 기회 스캔 루프 - 60초마다 전체 시장 스캔.""" """매수 기회 스캔 루프 - 60초마다 전체 시장 스캔 + 신호 종목 빠른 폴링."""
import logging import logging
import os
import threading
import time import time
from core import trader from core import trader
from core.market import get_top_tickers from core.market import get_top_tickers
from core.market_regime import get_regime from core.market_regime import get_regime
from core.strategy import should_buy from core.strategy import get_active_signals, should_buy
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SCAN_INTERVAL = 60 # SCAN_INTERVAL = 60 # 전체 시장 스캔 주기 (초)
SIGNAL_POLL_INTERVAL = int(os.getenv("SIGNAL_POLL_INTERVAL", "15")) # 신호 종목 빠른 감시 주기 (초)
def _fast_poll_loop() -> None:
"""활성 신호 종목을 SIGNAL_POLL_INTERVAL 초마다 빠르게 체크.
신호 감지 후 전체 스캔 60초를 기다리지 않고, 신호 종목만 빠르게 감시하여
목표 임계값 도달 시 즉시 매수한다.
"""
logger.info(f"신호 감시 시작 (주기={SIGNAL_POLL_INTERVAL}초)")
while True:
try:
signals = get_active_signals()
if signals:
regime = get_regime()
if regime["name"] != "bear":
positions = trader.get_positions()
for ticker in list(signals):
if ticker in positions:
continue
if len(positions) >= trader.MAX_POSITIONS:
break
try:
if should_buy(ticker):
logger.info(f"[빠른감시] 매수 신호: {ticker}")
trader.buy(ticker)
time.sleep(0.1)
except Exception as e:
logger.error(f"[빠른감시] 오류 {ticker}: {e}")
except Exception as e:
logger.error(f"신호 감시 루프 오류: {e}")
time.sleep(SIGNAL_POLL_INTERVAL)
def run_scanner() -> None: def run_scanner() -> None:
"""메인 스캔 루프.""" """메인 스캔 루프."""
logger.info(f"스캐너 시작 (주기={SCAN_INTERVAL}초)") # 신호 종목 빠른 감시 스레드 시작
t = threading.Thread(target=_fast_poll_loop, daemon=True, name="signal-fast-poll")
t.start()
logger.info(f"스캐너 시작 (스캔={SCAN_INTERVAL}초 | 신호감시={SIGNAL_POLL_INTERVAL}초)")
while True: while True:
try: try:
# 포지션 꽉 찼으면 스캔 스킵 # 포지션 꽉 찼으면 스캔 스킵

308
sim10m.py Normal file
View File

@@ -0,0 +1,308 @@
"""sim10m.py — 10분봉 + 극단적 거래량 즉시 진입 전략 시뮬.
기존 전략 (1h봉 vol-lead) vs 신규 전략 (10분봉 + 100x 이상 거래량 즉시 진입) 비교.
데이터: 최근 45일 Upbit API
"""
import pickle
import time
import sys
from pathlib import Path
from datetime import datetime, timedelta
import pandas as pd
import pyupbit
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent / ".env")
sys.path.insert(0, str(Path(__file__).parent))
# ── 파라미터 ──────────────────────────────────────────────
SIM_DAYS = 45
TOP30_FILE = Path("top30_tickers.pkl")
CACHE_FILE = Path("sim10m_cache.pkl")
TOP_N = 20
FEE = 0.0005
TIME_STOP_MIN_PCT = 3.0
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
# A: 기존 1h봉
A_LOCAL_VOL = 5 # 봉수 (= 5h)
A_VOL_MULT = 2.0
A_QUIET_PCT = 2.0
A_THRESH = 4.8
A_SIGNAL_TO = 8 # 신호 유효 봉수 (= 8h)
A_ATR_CANDLES = 5
A_TIME_STOP = 8 # 타임스탑 봉수 (= 8h)
# B: 신규 10분봉
B_LOCAL_VOL = 30 # 봉수 (5h = 30 × 10min)
B_VOL_MULT = 2.0
B_EXTREME_VOL = 100 # 이 이상 → 횡보 조건 면제, 다음 봉 즉시 진입
B_QUIET_CANDLES = 12 # 2h = 12봉
B_QUIET_PCT = 2.0
B_THRESH = 4.8
B_SIGNAL_TO = 48 # 신호 유효 봉수 (8h = 48봉)
B_ATR_CANDLES = 30 # ATR 봉수 (5h = 30봉)
B_TIME_STOP = 48 # 타임스탑 봉수 (8h = 48봉)
# ── 데이터 로드 ──────────────────────────────────────────
def fetch_ohlcv(ticker, interval, days):
"""OHLCV 과거 데이터 페이징 로드."""
target_start = datetime.now() - timedelta(days=days)
all_dfs, to, prev_oldest = [], None, None
while True:
kwargs = dict(ticker=ticker, interval=interval, count=200)
if to:
kwargs["to"] = to
df = pyupbit.get_ohlcv(**kwargs)
if df is None or df.empty:
break
df.index = df.index.tz_localize(None)
oldest = df.index[0]
# 더 이상 과거로 못 가면 중단 (상장일에 도달)
if prev_oldest is not None and oldest >= prev_oldest:
all_dfs.append(df)
break
all_dfs.append(df)
prev_oldest = oldest
if oldest <= target_start:
break
to = oldest.strftime("%Y-%m-%d %H:%M:%S")
time.sleep(0.15)
if not all_dfs:
return None
result = pd.concat(all_dfs).sort_index().drop_duplicates()
return result[result.index >= target_start]
def load_data(tickers):
if CACHE_FILE.exists():
print(f"캐시 로드: {CACHE_FILE}")
return pickle.load(open(CACHE_FILE, "rb"))
data = {"1h": {}, "10m": {}}
for idx, ticker in enumerate(tickers, 1):
print(f" [{idx}/{len(tickers)}] {ticker}...", end=" ", flush=True)
df1h = fetch_ohlcv(ticker, "minute60", SIM_DAYS)
df10m = fetch_ohlcv(ticker, "minute10", SIM_DAYS)
n1h = len(df1h) if df1h is not None else 0
n10m = len(df10m) if df10m is not None else 0
print(f"1h={n1h}봉 10m={n10m}")
if df1h is not None and n1h >= 50: data["1h"][ticker] = df1h
if df10m is not None and n10m >= 200: data["10m"][ticker] = df10m
pickle.dump(data, open(CACHE_FILE, "wb"))
print(f"캐시 저장: {CACHE_FILE}\n")
return data
# ── ATR 계산 (시뮬용) ─────────────────────────────────────
def calc_atr(df, buy_idx, n):
sub = df.iloc[max(0, buy_idx - n - 1):buy_idx]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-n:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
# ── 포지션 시뮬 ──────────────────────────────────────────
def simulate_pos(df, buy_idx, buy_price, stop_pct, time_stop_candles):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
ts = df.index[i]
if row["high"] > peak:
peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100
return pnl > 0, sp, ts, f"트레일링({pnl:+.1f}%)", pnl
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= time_stop_candles and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"] * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100
return pnl > 0, row["close"], ts, "타임스탑", pnl
last = df.iloc[-1]["close"]
pnl = (last * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100
return pnl > 0, last, df.index[-1], "데이터종료", pnl
# ── 전략 A: 기존 1h봉 vol-lead ──────────────────────────
def run_a(df):
trades = []
sig_i = sig_p = None
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(A_LOCAL_VOL + 2, 3)
while i < len(df):
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, A_TIME_STOP)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, "일반"))
in_pos = False
sig_i = sig_p = None
i = next_i
continue
close = df.iloc[i]["close"]
chg_2h = abs(close - df.iloc[i-2]["close"]) / df.iloc[i-2]["close"] * 100
quiet = chg_2h < A_QUIET_PCT
vol_p = df.iloc[i-1]["volume"]
vol_avg = df.iloc[i-A_LOCAL_VOL-1:i-1]["volume"].mean()
spike = vol_avg > 0 and vol_p >= vol_avg * A_VOL_MULT
if quiet and spike:
if sig_i is None: sig_i, sig_p = i, close
else:
if sig_i is not None and close < sig_p: sig_i = sig_p = None
if sig_i is not None and (i - sig_i) > A_SIGNAL_TO:
sig_i = sig_p = None
if sig_i is not None and (close - sig_p) / sig_p * 100 >= A_THRESH:
in_pos = True; buy_idx = i; buy_price = close
stop_pct = calc_atr(df, i, A_ATR_CANDLES)
sig_i = sig_p = None
i += 1
return trades
# ── 전략 B: 10분봉 + 극단적 거래량 즉시 진입 ────────────
def run_b(df):
# trade tuple: (is_win, pnl, buy_dt, sell_dt, entry_type)
# entry_type: '일반' | '극단'
trades = []
sig_i = sig_p = None
extreme_pending = False
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(B_LOCAL_VOL + 2, B_QUIET_CANDLES + 1)
while i < len(df):
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, B_TIME_STOP)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, entry_type))
in_pos = False
sig_i = sig_p = None
extreme_pending = False
i = next_i
continue
close = df.iloc[i]["close"]
vol_p = df.iloc[i-1]["volume"]
vol_avg = df.iloc[i-B_LOCAL_VOL-1:i-1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
# 극단적 거래량 → 다음 봉 진입 대기 설정
if not extreme_pending and vol_r >= B_EXTREME_VOL:
extreme_pending = True
i += 1
continue
# 극단적 거래량 다음 봉 → 즉시 진입
if extreme_pending:
in_pos = True; buy_idx = i; buy_price = close
stop_pct = calc_atr(df, i, B_ATR_CANDLES)
entry_type = "극단"
extreme_pending = False
sig_i = sig_p = None
i += 1
continue
# 일반 vol-lead
close_2h = df.iloc[i - B_QUIET_CANDLES]["close"]
chg_2h = abs(close - close_2h) / close_2h * 100
quiet = chg_2h < B_QUIET_PCT
spike = vol_r >= B_VOL_MULT
if quiet and spike:
if sig_i is None: sig_i, sig_p = i, close
else:
if sig_i is not None and close < sig_p: sig_i = sig_p = None
if sig_i is not None and (i - sig_i) > B_SIGNAL_TO:
sig_i = sig_p = None
if sig_i is not None and (close - sig_p) / sig_p * 100 >= B_THRESH:
in_pos = True; buy_idx = i; buy_price = close
stop_pct = calc_atr(df, i, B_ATR_CANDLES)
entry_type = "일반"
sig_i = sig_p = None
i += 1
return trades
# ── 통계 계산 ─────────────────────────────────────────────
def calc_stats(trades):
if not trades:
return {"n": 0, "wr": 0.0, "cum": 0.0, "dd": 0.0}
wins = sum(1 for t in trades if t[0])
cum = peak = dd = 0.0
for t in sorted(trades, key=lambda x: x[2]):
cum += t[1]
peak = max(peak, cum)
dd = max(dd, peak - cum)
return {"n": len(trades), "wr": wins / len(trades) * 100, "cum": cum, "dd": dd}
# ── 메인 ─────────────────────────────────────────────────
def main():
top30 = pickle.load(open(TOP30_FILE, "rb"))
tickers = top30[:TOP_N]
print(f"{'='*60}")
print(f"10분봉 전략 시뮬 | 최근 {SIM_DAYS}일 | {TOP_N}종목")
print(f" A: 기존 1h봉 vol-lead")
print(f" B: 10분봉 vol-lead + 극단거래량({B_EXTREME_VOL}x) 즉시 진입")
print(f"{'='*60}\n")
print(f"데이터 로드 중...")
data = load_data(tickers)
v1h = [t for t in tickers if t in data["1h"] and len(data["1h"][t]) >= 50]
v10m = [t for t in tickers if t in data["10m"] and len(data["10m"][t]) >= 200]
print(f"유효: 1h={len(v1h)}종목 / 10m={len(v10m)}종목\n")
# 전략 A
a_all = []
for t in v1h:
a_all.extend(run_a(data["1h"][t]))
# 전략 B
b_all = []
for t in v10m:
b_all.extend(run_b(data["10m"][t]))
b_extreme = [t for t in b_all if t[4] == "극단"]
b_normal = [t for t in b_all if t[4] == "일반"]
sa = calc_stats(a_all)
sb = calc_stats(b_all)
sbe = calc_stats(b_extreme)
sbn = calc_stats(b_normal)
print(f"{'='*65}")
print(f"{'전략':18} {'거래수':>6} {'승률':>6} {'누적PnL%':>10} {'최대낙폭%':>10}")
print(f"{''*65}")
print(f"{'A (1h vol-lead)':18} {sa['n']:>6}{sa['wr']:>5.1f}% {sa['cum']:>+9.2f}% {-sa['dd']:>+9.2f}%")
print(f"{'B 전체 (10m)':18} {sb['n']:>6}{sb['wr']:>5.1f}% {sb['cum']:>+9.2f}% {-sb['dd']:>+9.2f}%")
print(f"{' └ 극단거래량':18} {sbe['n']:>6}{sbe['wr']:>5.1f}% {sbe['cum']:>+9.2f}% {-sbe['dd']:>+9.2f}%")
print(f"{' └ 일반vol-lead':18} {sbn['n']:>6}{sbn['wr']:>5.1f}% {sbn['cum']:>+9.2f}% {-sbn['dd']:>+9.2f}%")
print(f"{'='*65}")
# 극단 거래량 진입 상세
if b_extreme:
print(f"\n── 극단거래량 진입 상세 ({len(b_extreme)}건) ─────────────────────")
print(f" {'매수시각':<18} {'PnL%':>7} {'청산'}")
for t in sorted(b_extreme, key=lambda x: x[2])[:20]:
mark = "" if t[0] else ""
print(f" {str(t[2])[:16]:<18} {t[1]:>+6.1f}% {mark} {t[4]}")
if __name__ == "__main__":
main()

458
velocity_backtest.py Normal file
View File

@@ -0,0 +1,458 @@
"""velocity_backtest.py — 속도 진입 효과 비교 백테스트.
전략 A: 기존 거리 기반 (signal_price 대비 +THRESH% 도달 시 진입)
전략 B: 거리 + 속도 기반 (velocity >= 레짐별 VELOCITY_THRESHOLD 시 조기 진입)
BULL → vel_thresh = 0.10 (공격적)
NEUTRAL → vel_thresh = 0.15 (보수적)
BEAR → vel_thresh = 0.20 (더 높음)
레짐 판단: KRW-BTC 1h 변동률 (캐시 데이터 활용)
10분봉 캐시(sim10m_cache.pkl)를 사용.
신호 감지: 40분봉 vol spike + 2h 횡보 (10분봉 합산/슬라이스로 계산)
진입/청산: 10분봉 단위로 체크 (실제 시스템의 15초 폴링 근사)
"""
import pickle
from pathlib import Path
import pandas as pd
# ── 파라미터 ──────────────────────────────────────────────────────────────────
CACHE_FILE = Path("sim10m_cache.pkl")
TOP_FILE = Path("top30_tickers.pkl")
TOP_N = 20
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
# 전략 파라미터
VOL_MULT = 2.0 # 거래량 배수 기준
QUIET_PCT = 2.0 # 2h 횡보 기준 (%)
THRESH = 4.8 # 거리 기반 진입 임계값 (%)
# 10분봉 기준 캔들 수
QUIET_C = 12 # 2h = 12 × 10분
VOL40_C = 4 # 40분봉 1개 = 4 × 10분봉
LOCAL_C = 7 # 로컬 평균 40분봉 7개 = 28 × 10분봉
TIMEOUT_C = 48 # 신호 타임아웃 8h = 48 × 10분봉
TS_C = 48 # 타임스탑 8h = 48 × 10분봉
ATR_C = 28 # ATR 5h = 7 × 40분 = 28 × 10분봉
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
TS_MIN_PCT = 3.0
MIN_I = LOCAL_C * VOL40_C + VOL40_C + QUIET_C + 2 # = 42
# 속도 기반 진입 파라미터 (레짐별)
VEL_THRESH_BULL = 0.10 # BULL: 0.10%/분 (공격적)
VEL_THRESH_NEUTRAL = 0.15 # NEUTRAL: 0.15%/분 (보수적)
VEL_THRESH_BEAR = 0.20 # BEAR: 0.20%/분 (더 높음)
VELOCITY_MIN_MOVE = 0.5 # 최소 이동 % (잡음 제거)
VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분
# 레짐 판단 기준 (BTC 1h 변동률)
REGIME_BULL_CHANGE = 5.0 # +5% 이상 → BULL
REGIME_BEAR_CHANGE = -5.0 # -5% 이하 → BEAR
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
# ── 낙폭 제어 파라미터 ────────────────────────────────────────────────────────
HARD_STOP_PCT = 0.015 # 진입가 대비 -1.5% 즉시 청산 (하드 손절)
STREAK_TIGHT_N = 2 # 연속 N회 손절 시 타임스탑 강화
TS_C_TIGHT = 24 # 강화 타임스탑 보유 시간 (4h = 24 × 10분)
TS_MIN_PCT_TIGHT = 0.0 # 강화 타임스탑 최소 수익률 (0%)
# ── 레짐 헬퍼 ─────────────────────────────────────────────────────────────────
def build_regime_series(btc_df: pd.DataFrame) -> pd.Series:
"""BTC 10분봉으로 1h 변동률 계산 → 레짐 시리즈 반환."""
close = btc_df["close"]
change = close.pct_change(6) * 100 # 6 × 10분 = 1h
regime = pd.Series("neutral", index=close.index, dtype=object)
regime[change > REGIME_BULL_CHANGE] = "bull"
regime[change < REGIME_BEAR_CHANGE] = "bear"
return regime
def _vel_thresh_for(regime_s: pd.Series, ts) -> float:
"""타임스탬프 기준 레짐별 velocity 임계값 반환."""
if regime_s is None:
return VEL_THRESH_NEUTRAL
idx = regime_s.index.searchsorted(ts)
if idx >= len(regime_s):
return VEL_THRESH_NEUTRAL
r = regime_s.iloc[idx]
if r == "bull":
return VEL_THRESH_BULL
elif r == "bear":
return VEL_THRESH_BEAR
return VEL_THRESH_NEUTRAL
# ── 헬퍼 ──────────────────────────────────────────────────────────────────────
def calc_atr(df: pd.DataFrame, i: int) -> float:
sub = df.iloc[max(0, i - ATR_C):i]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
def simulate_pos(df: pd.DataFrame, buy_idx: int,
buy_price: float, stop_pct: float,
hard_stop: bool = False, tight_ts: bool = False):
"""매수 이후 청산 시점·손익 계산.
hard_stop : True → 진입가 대비 -HARD_STOP_PCT% 즉시 청산
tight_ts : True → 강화된 타임스탑 (4h / 0%) 적용
"""
peak = buy_price
hard_stop_px = buy_price * (1 - HARD_STOP_PCT) if hard_stop else None
ts_c_use = TS_C_TIGHT if tight_ts else TS_C
ts_min_use = TS_MIN_PCT_TIGHT if tight_ts else TS_MIN_PCT
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
if row["high"] > peak:
peak = row["high"]
# 1. 하드 손절 (진입가 대비 고정 %)
if hard_stop_px is not None and row["low"] <= hard_stop_px:
pnl = (hard_stop_px * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[i], pnl
# 2. 트레일링 스탑 (최고가 대비)
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[i], pnl
# 3. 타임스탑
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= ts_c_use and pnl_now < ts_min_use:
pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[i], pnl
last = df.iloc[-1]["close"]
pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[-1], pnl
def _prev_40m_vol(df: pd.DataFrame, i: int) -> float:
"""직전 완성 40분봉 거래량 (10분봉 4개 합산)."""
return df.iloc[max(0, i - VOL40_C):i]["volume"].sum()
def _local_vol_avg(df: pd.DataFrame, i: int) -> float:
"""로컬 5h 평균 (직전 7개 40분봉 각각의 합산 평균)."""
vols = []
for k in range(1, LOCAL_C + 1):
end = i - VOL40_C * (k - 1)
start = end - VOL40_C
if start < 0:
break
vols.append(df.iloc[start:end]["volume"].sum())
return sum(vols) / len(vols) if vols else 0
# ── 핵심 전략 루프 ─────────────────────────────────────────────────────────────
def run_strategy(df: pd.DataFrame, ticker: str,
use_velocity: bool = False,
regime_s: pd.Series = None,
dd_control: bool = False) -> list:
"""
Returns list of (is_win, pnl, buy_dt, sell_dt, ticker, entry_type)
entry_type: 'dist' | 'vel'
dd_control: True → 연속 손절 추적하여 hard_stop + tight_ts 적용
"""
trades = []
sig_i = sig_p = None
in_pos = False
buy_idx = buy_price = stop_pct = None
entry_type = "dist"
consec_losses = 0 # 연속 손절 횟수
i = MIN_I
while i < len(df):
# ── 포지션 중 → 청산 계산 후 다음 진입 탐색 ──────────────────
if in_pos:
use_hard = dd_control and consec_losses >= STREAK_TIGHT_N
use_tight = dd_control and consec_losses >= STREAK_TIGHT_N
is_win, sdt, pnl = simulate_pos(
df, buy_idx, buy_price, stop_pct,
hard_stop=use_hard, tight_ts=use_tight,
)
next_i = next(
(j for j in range(i, len(df)) if df.index[j] > sdt),
len(df),
)
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, entry_type))
# 연속 손절 카운터 업데이트
if is_win:
consec_losses = 0
else:
consec_losses += 1
in_pos = False
sig_i = sig_p = None
i = next_i
continue
close = df.iloc[i]["close"]
# ── 신호 감지 ─────────────────────────────────────────────────
prev_vol = _prev_40m_vol(df, i)
local_avg = _local_vol_avg(df, i)
vol_r = prev_vol / local_avg if local_avg > 0 else 0
close_2h = df.iloc[i - QUIET_C]["close"]
quiet = abs(close - close_2h) / close_2h * 100 < QUIET_PCT
spike = vol_r >= VOL_MULT
if quiet and spike:
if sig_i is None:
sig_i, sig_p = i, close
else:
if sig_i is not None and close < sig_p:
sig_i = sig_p = None
# 타임아웃
if sig_i is not None and (i - sig_i) > TIMEOUT_C:
sig_i = sig_p = None
# ── 진입 판단 ─────────────────────────────────────────────────
if sig_i is not None:
move_pct = (close - sig_p) / sig_p * 100
age_min = (i - sig_i) * 10 # 10분봉 × 10분
# A. 거리 기반
if move_pct >= THRESH:
in_pos = True
buy_idx = i
buy_price = close
stop_pct = calc_atr(df, i)
entry_type = "dist"
sig_i = sig_p = None
i += 1
continue
# B. 속도 기반 (use_velocity=True 일 때만)
if (use_velocity
and age_min >= VELOCITY_MIN_AGE_M
and move_pct >= VELOCITY_MIN_MOVE):
velocity = move_pct / age_min
vel_thresh = _vel_thresh_for(regime_s, df.index[i])
if velocity >= vel_thresh:
in_pos = True
buy_idx = i
buy_price = close
stop_pct = calc_atr(df, i)
entry_type = "vel"
sig_i = sig_p = None
i += 1
continue
i += 1
return trades
# ── WF 필터 ───────────────────────────────────────────────────────────────────
def apply_wf(trades: list) -> tuple:
history, shadow_streak, blocked = [], 0, False
accepted, blocked_cnt = [], 0
for trade in trades:
is_win = int(trade[0])
if not blocked:
accepted.append(trade)
history.append(is_win)
if len(history) >= WF_WINDOW:
if sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
blocked = True
shadow_streak = 0
else:
blocked_cnt += 1
shadow_streak = (shadow_streak + 1) if is_win else 0
if shadow_streak >= WF_SHADOW_WINS:
blocked = False
history = []
shadow_streak = 0
return accepted, blocked_cnt
# ── MAX_POSITIONS 필터 ────────────────────────────────────────────────────────
def apply_max_pos(all_trades: list) -> tuple:
open_exits, accepted, skipped = [], [], []
for trade in sorted(all_trades, key=lambda x: x[2]):
buy_dt, sell_dt = trade[2], trade[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt)
accepted.append(trade)
else:
skipped.append(trade)
return accepted, skipped
# ── 복리 시뮬 ─────────────────────────────────────────────────────────────────
def simulate(accepted: list) -> dict:
portfolio = float(BUDGET)
total_krw = 0.0
monthly = {}
peak_pf = BUDGET
max_dd = 0.0
win_cnt = 0
vel_count = sum(1 for t in accepted if t[5] == "vel")
vel_wins = sum(1 for t in accepted if t[5] == "vel" and t[0])
vel_pnls = [t[1] for t in accepted if t[5] == "vel"]
dist_pnls = [t[1] for t in accepted if t[5] == "dist"]
for is_win, pnl, buy_dt, sell_dt, ticker, etype in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
peak_pf = max(peak_pf, portfolio)
dd = (peak_pf - portfolio) / peak_pf * 100
max_dd = max(max_dd, dd)
win_cnt += int(is_win)
ym = buy_dt.strftime("%Y-%m")
m = monthly.setdefault(ym, {"t": 0, "w": 0, "krw": 0.0})
m["t"] += 1; m["w"] += int(is_win); m["krw"] += krw_profit
n = len(accepted)
return {
"portfolio": portfolio,
"total_krw": total_krw,
"roi": (portfolio - BUDGET) / BUDGET * 100,
"n": n,
"wins": win_cnt,
"wr": win_cnt / n * 100 if n else 0,
"max_dd": max_dd,
"monthly": monthly,
"vel_count": vel_count,
"vel_wins": vel_wins,
"vel_wr": vel_wins / vel_count * 100 if vel_count else 0,
"vel_avg_pnl": sum(vel_pnls) / len(vel_pnls) if vel_pnls else 0,
"dist_avg_pnl": sum(dist_pnls) / len(dist_pnls) if dist_pnls else 0,
}
# ── 메인 ──────────────────────────────────────────────────────────────────────
def main():
print("캐시 로드...")
cache = pickle.load(open(CACHE_FILE, "rb"))
top30 = pickle.load(open(TOP_FILE, "rb"))
tickers = [t for t in top30[:TOP_N] if t in cache["10m"]]
print(f"유효 종목: {len(tickers)}\n")
# BTC 레짐 시리즈 빌드
btc_df = cache["10m"].get("KRW-BTC")
regime_s = build_regime_series(btc_df) if btc_df is not None else None
if regime_s is not None:
bull_pct = (regime_s == "bull").mean() * 100
bear_pct = (regime_s == "bear").mean() * 100
print(f"레짐 분포: BULL {bull_pct:.1f}% / NEUTRAL {100-bull_pct-bear_pct:.1f}% / BEAR {bear_pct:.1f}%")
print(f"vel threshold: BULL={VEL_THRESH_BULL} / NEUTRAL={VEL_THRESH_NEUTRAL} / BEAR={VEL_THRESH_BEAR}\n")
all_a, all_b, all_c = [], [], []
wf_a_total = wf_b_total = wf_c_total = 0
for t in tickers:
df = cache["10m"][t]
if len(df) < MIN_I + 50:
continue
raw_a = run_strategy(df, t, use_velocity=False)
raw_b = run_strategy(df, t, use_velocity=True, regime_s=regime_s)
raw_c = run_strategy(df, t, use_velocity=True, regime_s=regime_s, dd_control=True)
fa, ba = apply_wf(raw_a)
fb, bb = apply_wf(raw_b)
fc, bc = apply_wf(raw_c)
wf_a_total += ba; wf_b_total += bb; wf_c_total += bc
all_a.extend(fa); all_b.extend(fb); all_c.extend(fc)
acc_a, skp_a = apply_max_pos(all_a)
acc_b, skp_b = apply_max_pos(all_b)
acc_c, skp_c = apply_max_pos(all_c)
ra = simulate(acc_a)
rb = simulate(acc_b)
rc = simulate(acc_c)
# ── 날짜 범위 ─────────────────────────────────────────
def date_range(acc):
if acc:
s = min(t[2] for t in acc).strftime("%Y-%m-%d")
e = max(t[3] for t in acc).strftime("%Y-%m-%d")
return f"{s} ~ {e}"
return "N/A"
# ── 출력 ─────────────────────────────────────────────
W = 72
print("=" * W)
print(f" 낙폭 제어 비교 백테스트 | 10분봉 | {len(tickers)}종목")
print(f" 기간: {date_range(acc_a)}")
print(f" hard_stop={HARD_STOP_PCT*100:.1f}% | tight_ts={TS_C_TIGHT*10//60}h+{TS_MIN_PCT_TIGHT:.0f}% "
f"(연속 {STREAK_TIGHT_N}손절 후)")
print("=" * W)
print(f" {'항목':<22} {'A. 기존':>12} {'B. +속도':>12} {'C. +속도+DD제어':>14}")
print(f" {''*64}")
def row3(label, va, vb, vc, fmt="{}"):
sa, sb, sc = fmt.format(va), fmt.format(vb), fmt.format(vc)
try:
dbc = float(str(vc).replace(",","").replace("%","").replace("","")) \
- float(str(va).replace(",","").replace("%","").replace("",""))
dc = f" ({dbc:+.1f})" if abs(dbc) >= 0.01 else ""
except Exception:
dc = ""
print(f" {label:<22} {sa:>12} {sb:>12} {sc:>14}{dc}")
row3("총 진입", ra["n"], rb["n"], rc["n"], "{:,}")
row3(" 속도 진입", 0, rb["vel_count"], rc["vel_count"], "{:,}")
row3("WF 차단", wf_a_total, wf_b_total, wf_c_total, "{:,}")
row3("MAX_POS 스킵", len(skp_a), len(skp_b), len(skp_c), "{:,}")
print(f" {''*64}")
row3("승률", f"{ra['wr']:.1f}%", f"{rb['wr']:.1f}%", f"{rc['wr']:.1f}%")
row3(" 속도진입 승률","-", f"{rb['vel_wr']:.1f}%", f"{rc['vel_wr']:.1f}%")
print(f" {''*64}")
row3("평균 pnl (거리)",f"{ra['dist_avg_pnl']:.2f}%", f"{rb['dist_avg_pnl']:.2f}%", f"{rc['dist_avg_pnl']:.2f}%")
row3("평균 pnl (속도)","-", f"{rb['vel_avg_pnl']:.2f}%", f"{rc['vel_avg_pnl']:.2f}%")
print(f" {''*64}")
row3("최종 자산", f"{ra['portfolio']:,.0f}", f"{rb['portfolio']:,.0f}", f"{rc['portfolio']:,.0f}")
row3("총 수익", f"{ra['total_krw']:+,.0f}", f"{rb['total_krw']:+,.0f}", f"{rc['total_krw']:+,.0f}")
row3("수익률", f"{ra['roi']:.2f}%", f"{rb['roi']:.2f}%", f"{rc['roi']:.2f}%")
row3("최대 낙폭", f"{-ra['max_dd']:.2f}%", f"{-rb['max_dd']:.2f}%", f"{-rc['max_dd']:.2f}%")
print("=" * W)
# ── 월별 ─────────────────────────────────────────────
print(f"\n── 월별 수익 비교 {''*50}")
print(f" {'':^8}{'A':>5} {'A%':>4} {'A수익':>10}"
f"{'B':>5} {'B%':>4} {'B수익':>10}"
f"{'C':>5} {'C%':>4} {'C수익':>10}")
all_months = sorted(set(list(ra["monthly"]) + list(rb["monthly"]) + list(rc["monthly"])))
for ym in all_months:
ma = ra["monthly"].get(ym, {"t":0,"w":0,"krw":0})
mb = rb["monthly"].get(ym, {"t":0,"w":0,"krw":0})
mc = rc["monthly"].get(ym, {"t":0,"w":0,"krw":0})
wra = ma["w"]/ma["t"]*100 if ma["t"] else 0
wrb = mb["w"]/mb["t"]*100 if mb["t"] else 0
wrc = mc["w"]/mc["t"]*100 if mc["t"] else 0
print(f" {ym:^8}{ma['t']:>4}{wra:>3.0f}% {ma['krw']:>+9,.0f}원 │ "
f"{mb['t']:>4}{wrb:>3.0f}% {mb['krw']:>+9,.0f}원 │ "
f"{mc['t']:>4}{wrc:>3.0f}% {mc['krw']:>+9,.0f}")
print("=" * W)
if __name__ == "__main__":
main()