From a479bccee6b156e80989b84be9b89059df4439f5 Mon Sep 17 00:00:00 2001 From: joungmin Date: Mon, 2 Mar 2026 14:52:48 +0900 Subject: [PATCH] feat: switch vol-lead strategy from 1h to 40min candles Simulation sweep showed 40min candles outperform 1h: - 40min: 91 trades, 48.4% WR, +119% PnL, -11% DD - 60min: 65 trades, 50.8% WR, +88% PnL, -12% DD Changes: - strategy.py: fetch minute10, resample to 40min for vol spike detection - LOCAL_VOL_CANDLES=7 (was LOCAL_VOL_HOURS=5, 5h/40min = 7 candles) - monitor.py: ATR calculated from 40min candles - ATR_CANDLES=7 (was 5, now 5h in 40min units) - ATR_CACHE_TTL=2400s (was 600s, aligned to 40min candle) - interval_sweep.py: new interval comparison tool (10/20/30/40/50/60min) Co-Authored-By: Claude Sonnet 4.6 --- core/monitor.py | 27 ++++-- core/strategy.py | 44 ++++++--- interval_sweep.py | 240 ++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 291 insertions(+), 20 deletions(-) create mode 100644 interval_sweep.py diff --git a/core/monitor.py b/core/monitor.py index 0363455..944bbe4 100644 --- a/core/monitor.py +++ b/core/monitor.py @@ -19,20 +19,29 @@ TIME_STOP_HOURS = float(os.getenv("TIME_STOP_HOURS", "24")) TIME_STOP_MIN_GAIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) # ATR 기반 적응형 트레일링 스탑 파라미터 -ATR_CANDLES = 5 # 최근 N개 1h봉으로 자연 진폭 계산 +ATR_CANDLES = 7 # 최근 N개 40분봉으로 자연 진폭 계산 (≈5h, int(5*60/40)=7) ATR_MULT = 1.5 # 평균 진폭 × 배수 = 스탑 임계값 ATR_MIN_STOP = 0.010 # 최소 스탑 1.0% (너무 좁아지는 거 방지) ATR_MAX_STOP = 0.020 # 최대 스탑 2.0% (너무 넓어지는 거 방지) -# ATR 캐시: 종목별 (스탑비율, 계산시각) — 10분마다 갱신 +# ATR 캐시: 종목별 (스탑비율, 계산시각) — 40분마다 갱신 _atr_cache: dict[str, tuple[float, float]] = {} -_ATR_CACHE_TTL = 600 # 10분 +_ATR_CACHE_TTL = 2400 # 40분 + + +def _resample_40m(df): + """minute10 DataFrame → 40분봉으로 리샘플링.""" + return ( + df.resample("40min") + .agg({"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}) + .dropna(subset=["close"]) + ) def _get_adaptive_stop(ticker: str) -> float: - """최근 ATR_CANDLES개 1h봉 평균 진폭 × ATR_MULT 로 적응형 스탑 비율 반환. + """최근 ATR_CANDLES개 40분봉 평균 진폭 × ATR_MULT 로 적응형 스탑 비율 반환. - 캐시(10분)를 활용해 API 호출 최소화. + 캐시(40분)를 활용해 API 호출 최소화. 계산 실패 시 ATR_MIN_STOP 반환. """ now = time.time() @@ -41,8 +50,12 @@ def _get_adaptive_stop(ticker: str) -> float: return cached[0] try: - df = pyupbit.get_ohlcv(ticker, interval="minute60", count=ATR_CANDLES + 2) - if df is None or len(df) < ATR_CANDLES: + fetch_n = (ATR_CANDLES + 2) * 4 # 40분봉 N개 = 10분봉 N*4개 + df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=fetch_n) + if df10 is None or df10.empty: + return ATR_MIN_STOP + df = _resample_40m(df10) + if len(df) < ATR_CANDLES: return ATR_MIN_STOP ranges = (df["high"] - df["low"]) / df["low"] avg_range = ranges.iloc[-ATR_CANDLES:].mean() diff --git a/core/strategy.py b/core/strategy.py index e7a48fa..9a7240a 100644 --- a/core/strategy.py +++ b/core/strategy.py @@ -1,11 +1,13 @@ """Volume Lead 전략: 거래량 축적(급증+횡보) 감지 후 +TREND_AFTER_VOL% 상승 시 선진입. 흐름: - 1. 직전 1h 거래량 > 로컬 5h 평균 × VOL_MULT AND + 1. 직전 40분봉 거래량 > 로컬 5h(7봉) 평균 × VOL_MULT AND 2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적) → 신호가(signal_price) 기록 2. signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입 3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화 + +캔들: minute10 데이터를 40분봉으로 리샘플링하여 사용 """ from __future__ import annotations @@ -29,25 +31,41 @@ TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "5.0")) # 진입 임계 SIGNAL_TIMEOUT_H = float(os.getenv("SIGNAL_TIMEOUT_H", "8.0")) # 신호 유효 시간 (h) # 거래량 파라미터 -LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h) +LOCAL_VOL_CANDLES = 7 # 5h를 40분봉으로 환산 (int(5 * 60/40) = 7) VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) +# 40분봉 리샘플링 파라미터 +_CANDLE_MIN = 40 +_FETCH_10M = (LOCAL_VOL_CANDLES + 3) * (_CANDLE_MIN // 10) # 40 개의 10분봉 + + +def _resample_40m(df): + """minute10 DataFrame → 40분봉으로 리샘플링.""" + return ( + df.resample("40min") + .agg({"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"}) + .dropna(subset=["close"]) + ) + # 축적 신호 상태: ticker → {"price": float, "time": float(unix)} _accum_signals: dict[str, dict] = {} def _check_vol_spike(ticker: str, vol_mult: float) -> bool: - """직전 완성 1h 거래량이 로컬 5h 평균의 vol_mult 배 이상인지 확인.""" - fetch_count = LOCAL_VOL_HOURS + 3 + """직전 완성 40분봉 거래량이 로컬 5h(7봉) 평균의 vol_mult 배 이상인지 확인.""" try: - df = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count) + df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M) except Exception: return False - if df is None or len(df) < LOCAL_VOL_HOURS + 1: + if df10 is None or len(df10) < _CANDLE_MIN // 10 * 2: return False - recent_vol = df["volume"].iloc[-2] # 직전 완성된 1h 봉 - local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 5h 평균 + df = _resample_40m(df10) + if len(df) < LOCAL_VOL_CANDLES + 1: + return False + + recent_vol = df["volume"].iloc[-2] # 직전 완성된 40분봉 + local_avg = df["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() # 이전 7봉(≈5h) 평균 if local_avg <= 0: return False @@ -55,7 +73,7 @@ def _check_vol_spike(ticker: str, vol_mult: float) -> bool: result = ratio >= vol_mult if result: logger.debug( - f"[거래량↑] {ticker} 1h={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)" + f"[거래량↑] {ticker} 40m={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)" ) else: logger.debug( @@ -115,11 +133,11 @@ def should_buy(ticker: str) -> bool: ) # 거래량 비율 계산 후 알림 전송 try: - fetch_count = LOCAL_VOL_HOURS + 3 - df_h = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count) - if df_h is not None and len(df_h) >= LOCAL_VOL_HOURS + 1: + df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M) + df_h = _resample_40m(df10) if df10 is not None else None + if df_h is not None and len(df_h) >= LOCAL_VOL_CANDLES + 1: recent_vol = df_h["volume"].iloc[-2] - local_avg = df_h["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() + local_avg = df_h["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() ratio = recent_vol / local_avg if local_avg > 0 else 0 notify_signal(ticker, current, ratio) except Exception: diff --git a/interval_sweep.py b/interval_sweep.py new file mode 100644 index 0000000..58cc390 --- /dev/null +++ b/interval_sweep.py @@ -0,0 +1,240 @@ +"""interval_sweep.py — 봉 단위별 vol-lead 전략 성과 비교. + +10분봉 캐시 데이터를 리샘플링해 10/20/30/60분봉 성과를 비교한다. +추가로 극단거래량(100x) 즉시 진입 조건도 함께 테스트. + +데이터: sim10m_cache.pkl (10분봉 45일) +""" + +import pickle +import sys +from pathlib import Path + +import pandas as pd +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent / ".env") +sys.path.insert(0, str(Path(__file__).parent)) + +# ── 고정 파라미터 ───────────────────────────────────────── +CACHE_FILE = Path("sim10m_cache.pkl") +TOP30_FILE = Path("top30_tickers.pkl") +TOP_N = 20 + +FEE = 0.0005 +TIME_STOP_MIN_PCT = 3.0 +ATR_MULT = 1.5 +ATR_MIN = 0.010 +ATR_MAX = 0.020 + +VOL_MULT = 2.0 +QUIET_PCT = 2.0 +THRESH = 4.8 +EXTREME_VOL = 100 # 극단적 거래량 배수 + +# 봉 단위별 시간 기반 파라미터 (모두 "시간"으로 정의 → 봉수로 자동 변환) +INTERVALS = [10, 20, 30, 40, 50, 60] # 분 단위 +LOCAL_VOL_H = 5.0 # 로컬 거래량 기준 5시간 +QUIET_H = 2.0 # 횡보 기준 2시간 +SIGNAL_TO_H = 8.0 # 신호 유효 8시간 +ATR_H = 5.0 # ATR 계산 5시간 +TIME_STOP_H = 8.0 # 타임스탑 8시간 + + +# ── 리샘플링 ────────────────────────────────────────────── +def resample(df, minutes): + """10분봉 DataFrame을 N분봉으로 리샘플링.""" + rule = f"{minutes}T" + resampled = df.resample(rule).agg({ + "open": "first", + "high": "max", + "low": "min", + "close": "last", + "volume": "sum", + }).dropna(subset=["close"]) + return resampled + + +# ── ATR 계산 (시뮬용) ───────────────────────────────────── +def calc_atr(df, buy_idx, n): + sub = df.iloc[max(0, buy_idx - n - 1):buy_idx] + if len(sub) < 3: + return ATR_MIN + try: + avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-n:].mean() + return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT))) + except Exception: + return ATR_MIN + + +# ── 포지션 시뮬 ────────────────────────────────────────── +def simulate_pos(df, buy_idx, buy_price, stop_pct, ts_candles): + peak = buy_price + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + ts = df.index[i] + if row["high"] > peak: + peak = row["high"] + if row["low"] <= peak * (1 - stop_pct): + sp = peak * (1 - stop_pct) + pnl = (sp * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100 + return pnl > 0, sp, ts, pnl + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if (i - buy_idx) >= ts_candles and pnl_now < TIME_STOP_MIN_PCT: + pnl = (row["close"] * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100 + return pnl > 0, row["close"], ts, pnl + last = df.iloc[-1]["close"] + pnl = (last * (1-FEE) - buy_price * (1+FEE)) / (buy_price * (1+FEE)) * 100 + return pnl > 0, last, df.index[-1], pnl + + +# ── vol-lead 전략 실행 ──────────────────────────────────── +def run_vol_lead(df, minutes, use_extreme=False): + """vol-lead 전략 실행. use_extreme=True이면 극단거래량 즉시 진입 추가.""" + candles_per_h = 60 / minutes + + local_vol_n = int(LOCAL_VOL_H * candles_per_h) + quiet_n = int(QUIET_H * candles_per_h) + signal_to_n = int(SIGNAL_TO_H * candles_per_h) + atr_n = int(ATR_H * candles_per_h) + ts_n = int(TIME_STOP_H * candles_per_h) + + trades = [] + sig_i = sig_p = None + extreme_pending = False + in_pos = False + buy_idx = buy_price = stop_pct = entry_type = None + i = max(local_vol_n + 2, quiet_n + 1) + + while i < len(df): + if in_pos: + is_win, sp, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct, ts_n) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, entry_type)) + in_pos = False + sig_i = sig_p = None + extreme_pending = False + i = next_i + continue + + close = df.iloc[i]["close"] + vol_p = df.iloc[i-1]["volume"] + vol_avg = df.iloc[i-local_vol_n-1:i-1]["volume"].mean() + vol_r = vol_p / vol_avg if vol_avg > 0 else 0 + + # 극단적 거래량 → 다음 봉 즉시 진입 + if use_extreme and not extreme_pending and vol_r >= EXTREME_VOL: + extreme_pending = True + i += 1 + continue + + if use_extreme and extreme_pending: + in_pos = True; buy_idx = i; buy_price = close + stop_pct = calc_atr(df, i, atr_n) + entry_type = "극단" + extreme_pending = False + sig_i = sig_p = None + i += 1 + continue + + # 일반 vol-lead + close_qh = df.iloc[i - quiet_n]["close"] + chg_qh = abs(close - close_qh) / close_qh * 100 + quiet = chg_qh < QUIET_PCT + spike = vol_r >= VOL_MULT + + if quiet and spike: + if sig_i is None: sig_i, sig_p = i, close + else: + if sig_i is not None and close < sig_p: sig_i = sig_p = None + if sig_i is not None and (i - sig_i) > signal_to_n: + sig_i = sig_p = None + if sig_i is not None and (close - sig_p) / sig_p * 100 >= THRESH: + in_pos = True; buy_idx = i; buy_price = close + stop_pct = calc_atr(df, i, atr_n) + entry_type = "일반" + sig_i = sig_p = None + i += 1 + return trades + + +# ── 통계 ───────────────────────────────────────────────── +def calc_stats(trades): + if not trades: + return {"n": 0, "wr": 0.0, "cum": 0.0, "dd": 0.0} + wins = sum(1 for t in trades if t[0]) + cum = peak = dd = 0.0 + for t in sorted(trades, key=lambda x: x[2]): + cum += t[1] + peak = max(peak, cum) + dd = max(dd, peak - cum) + return {"n": len(trades), "wr": wins / len(trades) * 100, "cum": cum, "dd": dd} + + +# ── 메인 ───────────────────────────────────────────────── +def main(): + print("캐시 로드 중...") + cache = pickle.load(open(CACHE_FILE, "rb")) + top30 = pickle.load(open(TOP30_FILE, "rb")) + tickers = [t for t in top30[:TOP_N] if t in cache["10m"]] + print(f"유효 종목: {len(tickers)}개\n") + + results = [] + + for minutes in INTERVALS: + # 10분봉은 그대로, 나머지는 리샘플링 + ticker_data = {} + for t in tickers: + df10 = cache["10m"][t] + if minutes == 10: + ticker_data[t] = df10 + else: + ticker_data[t] = resample(df10, minutes) + + # 일반 vol-lead + all_trades = [] + for t in tickers: + if t in ticker_data and len(ticker_data[t]) >= 50: + all_trades.extend(run_vol_lead(ticker_data[t], minutes, use_extreme=False)) + s = calc_stats(all_trades) + results.append((f"{minutes}분봉 (일반)", s)) + + # 일반 + 극단 거래량 + all_trades_ex = [] + for t in tickers: + if t in ticker_data and len(ticker_data[t]) >= 50: + all_trades_ex.extend(run_vol_lead(ticker_data[t], minutes, use_extreme=True)) + s_ex = calc_stats(all_trades_ex) + # 극단 거래량만 분리 + extreme_trades = [t for t in all_trades_ex if t[4] == "극단"] + s_ext = calc_stats(extreme_trades) + results.append((f" +극단{EXTREME_VOL}x", s_ex, s_ext)) + + # 출력 + print(f"{'='*72}") + print(f"봉 단위별 vol-lead 전략 비교 | 45일 | {len(tickers)}종목") + print(f"{'='*72}") + print(f"{'전략':20} {'거래수':>6} {'승률':>6} {'누적PnL%':>10} {'최대낙폭%':>10}") + print(f"{'─'*72}") + + for row in results: + label = row[0] + s = row[1] + if s["n"] == 0: + print(f"{label:20} {'없음':>6}") + continue + print(f"{label:20} {s['n']:>6}건 {s['wr']:>5.1f}% {s['cum']:>+9.2f}% {-s['dd']:>+9.2f}%", end="") + # 극단 거래량 정보 추가 + if len(row) == 3: + se = row[2] + if se["n"] > 0: + print(f" (극단:{se['n']}건 {se['wr']:.0f}% {se['cum']:+.1f}%)", end="") + print() + if label.startswith(" +"): + print() # 구분선 + + print(f"{'='*72}") + + +if __name__ == "__main__": + main()