feat: switch vol-lead strategy from 1h to 40min candles

Simulation sweep showed 40min candles outperform 1h:
- 40min: 91 trades, 48.4% WR, +119% PnL, -11% DD
- 60min: 65 trades, 50.8% WR, +88% PnL, -12% DD

Changes:
- strategy.py: fetch minute10, resample to 40min for vol spike detection
  - LOCAL_VOL_CANDLES=7 (was LOCAL_VOL_HOURS=5, 5h/40min = 7 candles)
- monitor.py: ATR calculated from 40min candles
  - ATR_CANDLES=7 (was 5, now 5h in 40min units)
  - ATR_CACHE_TTL=2400s (was 600s, aligned to 40min candle)
- interval_sweep.py: new interval comparison tool (10/20/30/40/50/60min)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-02 14:52:48 +09:00
parent 4b6cb8ca0e
commit a479bccee6
3 changed files with 291 additions and 20 deletions

View File

@@ -19,20 +19,29 @@ TIME_STOP_HOURS = float(os.getenv("TIME_STOP_HOURS", "24"))
TIME_STOP_MIN_GAIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3"))
# ATR 기반 적응형 트레일링 스탑 파라미터
ATR_CANDLES = 5 # 최근 N개 1h봉으로 자연 진폭 계산
ATR_CANDLES = 7 # 최근 N개 40분봉으로 자연 진폭 계산 (≈5h, int(5*60/40)=7)
ATR_MULT = 1.5 # 평균 진폭 × 배수 = 스탑 임계값
ATR_MIN_STOP = 0.010 # 최소 스탑 1.0% (너무 좁아지는 거 방지)
ATR_MAX_STOP = 0.020 # 최대 스탑 2.0% (너무 넓어지는 거 방지)
# ATR 캐시: 종목별 (스탑비율, 계산시각) — 10분마다 갱신
# ATR 캐시: 종목별 (스탑비율, 계산시각) — 40분마다 갱신
_atr_cache: dict[str, tuple[float, float]] = {}
_ATR_CACHE_TTL = 600 # 10분
_ATR_CACHE_TTL = 2400 # 40분
def _resample_40m(df):
"""minute10 DataFrame → 40분봉으로 리샘플링."""
return (
df.resample("40min")
.agg({"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"})
.dropna(subset=["close"])
)
def _get_adaptive_stop(ticker: str) -> float:
"""최근 ATR_CANDLES개 1h봉 평균 진폭 × ATR_MULT 로 적응형 스탑 비율 반환.
"""최근 ATR_CANDLES개 40분봉 평균 진폭 × ATR_MULT 로 적응형 스탑 비율 반환.
캐시(10분)를 활용해 API 호출 최소화.
캐시(40분)를 활용해 API 호출 최소화.
계산 실패 시 ATR_MIN_STOP 반환.
"""
now = time.time()
@@ -41,8 +50,12 @@ def _get_adaptive_stop(ticker: str) -> float:
return cached[0]
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=ATR_CANDLES + 2)
if df is None or len(df) < ATR_CANDLES:
fetch_n = (ATR_CANDLES + 2) * 4 # 40분봉 N개 = 10분봉 N*4개
df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=fetch_n)
if df10 is None or df10.empty:
return ATR_MIN_STOP
df = _resample_40m(df10)
if len(df) < ATR_CANDLES:
return ATR_MIN_STOP
ranges = (df["high"] - df["low"]) / df["low"]
avg_range = ranges.iloc[-ATR_CANDLES:].mean()

View File

@@ -1,11 +1,13 @@
"""Volume Lead 전략: 거래량 축적(급증+횡보) 감지 후 +TREND_AFTER_VOL% 상승 시 선진입.
흐름:
1. 직전 1h 거래량 > 로컬 5h 평균 × VOL_MULT AND
1. 직전 40분봉 거래량 > 로컬 5h(7봉) 평균 × VOL_MULT AND
2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
→ 신호가(signal_price) 기록
2. signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입
3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화
캔들: minute10 데이터를 40분봉으로 리샘플링하여 사용
"""
from __future__ import annotations
@@ -29,25 +31,41 @@ TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "5.0")) # 진입 임계
SIGNAL_TIMEOUT_H = float(os.getenv("SIGNAL_TIMEOUT_H", "8.0")) # 신호 유효 시간 (h)
# 거래량 파라미터
LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h)
LOCAL_VOL_CANDLES = 7 # 5h를 40분봉으로 환산 (int(5 * 60/40) = 7)
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
# 40분봉 리샘플링 파라미터
_CANDLE_MIN = 40
_FETCH_10M = (LOCAL_VOL_CANDLES + 3) * (_CANDLE_MIN // 10) # 40 개의 10분봉
def _resample_40m(df):
"""minute10 DataFrame → 40분봉으로 리샘플링."""
return (
df.resample("40min")
.agg({"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"})
.dropna(subset=["close"])
)
# 축적 신호 상태: ticker → {"price": float, "time": float(unix)}
_accum_signals: dict[str, dict] = {}
def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
"""직전 완성 1h 거래량이 로컬 5h 평균의 vol_mult 배 이상인지 확인."""
fetch_count = LOCAL_VOL_HOURS + 3
"""직전 완성 40분봉 거래량이 로컬 5h(7봉) 평균의 vol_mult 배 이상인지 확인."""
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M)
except Exception:
return False
if df is None or len(df) < LOCAL_VOL_HOURS + 1:
if df10 is None or len(df10) < _CANDLE_MIN // 10 * 2:
return False
recent_vol = df["volume"].iloc[-2] # 직전 완성된 1h 봉
local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 5h 평균
df = _resample_40m(df10)
if len(df) < LOCAL_VOL_CANDLES + 1:
return False
recent_vol = df["volume"].iloc[-2] # 직전 완성된 40분봉
local_avg = df["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() # 이전 7봉(≈5h) 평균
if local_avg <= 0:
return False
@@ -55,7 +73,7 @@ def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
result = ratio >= vol_mult
if result:
logger.debug(
f"[거래량↑] {ticker} 1h={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
f"[거래량↑] {ticker} 40m={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
)
else:
logger.debug(
@@ -115,11 +133,11 @@ def should_buy(ticker: str) -> bool:
)
# 거래량 비율 계산 후 알림 전송
try:
fetch_count = LOCAL_VOL_HOURS + 3
df_h = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
if df_h is not None and len(df_h) >= LOCAL_VOL_HOURS + 1:
df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M)
df_h = _resample_40m(df10) if df10 is not None else None
if df_h is not None and len(df_h) >= LOCAL_VOL_CANDLES + 1:
recent_vol = df_h["volume"].iloc[-2]
local_avg = df_h["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean()
local_avg = df_h["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean()
ratio = recent_vol / local_avg if local_avg > 0 else 0
notify_signal(ticker, current, ratio)
except Exception:

240
interval_sweep.py Normal file
View File

@@ -0,0 +1,240 @@
"""interval_sweep.py — 봉 단위별 vol-lead 전략 성과 비교.
10분봉 캐시 데이터를 리샘플링해 10/20/30/60분봉 성과를 비교한다.
추가로 극단거래량(100x) 즉시 진입 조건도 함께 테스트.
데이터: sim10m_cache.pkl (10분봉 45일)
"""
import pickle
import sys
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent / ".env")
sys.path.insert(0, str(Path(__file__).parent))
# ── 고정 파라미터 ─────────────────────────────────────────
CACHE_FILE = Path("sim10m_cache.pkl")
TOP30_FILE = Path("top30_tickers.pkl")
TOP_N = 20
FEE = 0.0005
TIME_STOP_MIN_PCT = 3.0
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
VOL_MULT = 2.0
QUIET_PCT = 2.0
THRESH = 4.8
EXTREME_VOL = 100 # 극단적 거래량 배수
# 봉 단위별 시간 기반 파라미터 (모두 "시간"으로 정의 → 봉수로 자동 변환)
INTERVALS = [10, 20, 30, 40, 50, 60] # 분 단위
LOCAL_VOL_H = 5.0 # 로컬 거래량 기준 5시간
QUIET_H = 2.0 # 횡보 기준 2시간
SIGNAL_TO_H = 8.0 # 신호 유효 8시간
ATR_H = 5.0 # ATR 계산 5시간
TIME_STOP_H = 8.0 # 타임스탑 8시간
# ── 리샘플링 ──────────────────────────────────────────────
def resample(df, minutes):
"""10분봉 DataFrame을 N분봉으로 리샘플링."""
rule = f"{minutes}T"
resampled = df.resample(rule).agg({
"open": "first",
"high": "max",
"low": "min",
"close": "last",
"volume": "sum",
}).dropna(subset=["close"])
return resampled
# ── ATR 계산 (시뮬용) ─────────────────────────────────────
def calc_atr(df, buy_idx, n):
sub = df.iloc[max(0, buy_idx - n - 1):buy_idx]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-n:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
# ── 포지션 시뮬 ──────────────────────────────────────────
def simulate_pos(df, buy_idx, buy_price, stop_pct, ts_candles):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
ts = df.index[i]
if row["high"] > peak:
peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100
return pnl > 0, sp, ts, pnl
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= ts_candles and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"] * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100
return pnl > 0, row["close"], ts, pnl
last = df.iloc[-1]["close"]
pnl = (last * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100
return pnl > 0, last, df.index[-1], pnl
# ── vol-lead 전략 실행 ────────────────────────────────────
def run_vol_lead(df, minutes, use_extreme=False):
"""vol-lead 전략 실행. use_extreme=True이면 극단거래량 즉시 진입 추가."""
candles_per_h = 60 / minutes
local_vol_n = int(LOCAL_VOL_H * candles_per_h)
quiet_n = int(QUIET_H * candles_per_h)
signal_to_n = int(SIGNAL_TO_H * candles_per_h)
atr_n = int(ATR_H * candles_per_h)
ts_n = int(TIME_STOP_H * candles_per_h)
trades = []
sig_i = sig_p = None
extreme_pending = False
in_pos = False
buy_idx = buy_price = stop_pct = entry_type = None
i = max(local_vol_n + 2, quiet_n + 1)
while i < len(df):
if in_pos:
is_win, sp, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, ts_n)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, entry_type))
in_pos = False
sig_i = sig_p = None
extreme_pending = False
i = next_i
continue
close = df.iloc[i]["close"]
vol_p = df.iloc[i-1]["volume"]
vol_avg = df.iloc[i-local_vol_n-1:i-1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
# 극단적 거래량 → 다음 봉 즉시 진입
if use_extreme and not extreme_pending and vol_r >= EXTREME_VOL:
extreme_pending = True
i += 1
continue
if use_extreme and extreme_pending:
in_pos = True; buy_idx = i; buy_price = close
stop_pct = calc_atr(df, i, atr_n)
entry_type = "극단"
extreme_pending = False
sig_i = sig_p = None
i += 1
continue
# 일반 vol-lead
close_qh = df.iloc[i - quiet_n]["close"]
chg_qh = abs(close - close_qh) / close_qh * 100
quiet = chg_qh < QUIET_PCT
spike = vol_r >= VOL_MULT
if quiet and spike:
if sig_i is None: sig_i, sig_p = i, close
else:
if sig_i is not None and close < sig_p: sig_i = sig_p = None
if sig_i is not None and (i - sig_i) > signal_to_n:
sig_i = sig_p = None
if sig_i is not None and (close - sig_p) / sig_p * 100 >= THRESH:
in_pos = True; buy_idx = i; buy_price = close
stop_pct = calc_atr(df, i, atr_n)
entry_type = "일반"
sig_i = sig_p = None
i += 1
return trades
# ── 통계 ─────────────────────────────────────────────────
def calc_stats(trades):
if not trades:
return {"n": 0, "wr": 0.0, "cum": 0.0, "dd": 0.0}
wins = sum(1 for t in trades if t[0])
cum = peak = dd = 0.0
for t in sorted(trades, key=lambda x: x[2]):
cum += t[1]
peak = max(peak, cum)
dd = max(dd, peak - cum)
return {"n": len(trades), "wr": wins / len(trades) * 100, "cum": cum, "dd": dd}
# ── 메인 ─────────────────────────────────────────────────
def main():
print("캐시 로드 중...")
cache = pickle.load(open(CACHE_FILE, "rb"))
top30 = pickle.load(open(TOP30_FILE, "rb"))
tickers = [t for t in top30[:TOP_N] if t in cache["10m"]]
print(f"유효 종목: {len(tickers)}\n")
results = []
for minutes in INTERVALS:
# 10분봉은 그대로, 나머지는 리샘플링
ticker_data = {}
for t in tickers:
df10 = cache["10m"][t]
if minutes == 10:
ticker_data[t] = df10
else:
ticker_data[t] = resample(df10, minutes)
# 일반 vol-lead
all_trades = []
for t in tickers:
if t in ticker_data and len(ticker_data[t]) >= 50:
all_trades.extend(run_vol_lead(ticker_data[t], minutes, use_extreme=False))
s = calc_stats(all_trades)
results.append((f"{minutes}분봉 (일반)", s))
# 일반 + 극단 거래량
all_trades_ex = []
for t in tickers:
if t in ticker_data and len(ticker_data[t]) >= 50:
all_trades_ex.extend(run_vol_lead(ticker_data[t], minutes, use_extreme=True))
s_ex = calc_stats(all_trades_ex)
# 극단 거래량만 분리
extreme_trades = [t for t in all_trades_ex if t[4] == "극단"]
s_ext = calc_stats(extreme_trades)
results.append((f" +극단{EXTREME_VOL}x", s_ex, s_ext))
# 출력
print(f"{'='*72}")
print(f"봉 단위별 vol-lead 전략 비교 | 45일 | {len(tickers)}종목")
print(f"{'='*72}")
print(f"{'전략':20} {'거래수':>6} {'승률':>6} {'누적PnL%':>10} {'최대낙폭%':>10}")
print(f"{''*72}")
for row in results:
label = row[0]
s = row[1]
if s["n"] == 0:
print(f"{label:20} {'없음':>6}")
continue
print(f"{label:20} {s['n']:>6}{s['wr']:>5.1f}% {s['cum']:>+9.2f}% {-s['dd']:>+9.2f}%", end="")
# 극단 거래량 정보 추가
if len(row) == 3:
se = row[2]
if se["n"] > 0:
print(f" (극단:{se['n']}{se['wr']:.0f}% {se['cum']:+.1f}%)", end="")
print()
if label.startswith(" +"):
print() # 구분선
print(f"{'='*72}")
if __name__ == "__main__":
main()