Files
upbit-trader/core/strategy.py
joungmin 324d69dde0 feat: volume-lead strategy with compounding, WF filter, and DB-backed simulation
- core/strategy.py: replace trend strategy with volume-lead accumulation
  (vol spike + 2h quiet → signal, +4.8% rise → entry)
- core/trader.py: compound budget adjusts on both profit and loss (floor 30%)
- core/notify.py: add accumulation signal telegram notification
- ohlcv_db.py: Oracle ADB OHLCV cache (insert, load, incremental update)
- sim_365.py: 365-day compounding simulation loading from DB
- krw_sim.py: KRW-based simulation with MAX_POSITIONS constraint
- ticker_sim.py: ticker count expansion comparison
- STRATEGY.md: full strategy documentation
- .gitignore: exclude *.pkl cache files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 01:46:03 +09:00

154 lines
5.6 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Volume Lead 전략: 거래량 축적(급증+횡보) 감지 후 +TREND_AFTER_VOL% 상승 시 선진입.
흐름:
1. 직전 1h 거래량 > 로컬 5h 평균 × VOL_MULT AND
2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
→ 신호가(signal_price) 기록
2. signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입
3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화
"""
from __future__ import annotations
import logging
import os
import time
import pyupbit
from .market import get_current_price
from .market_regime import get_regime
from .notify import notify_signal
from .price_db import get_price_n_hours_ago
logger = logging.getLogger(__name__)
# 축적 감지 파라미터
PRICE_QUIET_PCT = float(os.getenv("PRICE_QUIET_PCT", "2.0")) # 2h 횡보 기준 (%)
TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "5.0")) # 진입 임계값 (신호가 대비 %)
SIGNAL_TIMEOUT_H = float(os.getenv("SIGNAL_TIMEOUT_H", "8.0")) # 신호 유효 시간 (h)
# 거래량 파라미터
LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h)
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
# 축적 신호 상태: ticker → {"price": float, "time": float(unix)}
_accum_signals: dict[str, dict] = {}
def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
"""직전 완성 1h 거래량이 로컬 5h 평균의 vol_mult 배 이상인지 확인."""
fetch_count = LOCAL_VOL_HOURS + 3
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
except Exception:
return False
if df is None or len(df) < LOCAL_VOL_HOURS + 1:
return False
recent_vol = df["volume"].iloc[-2] # 직전 완성된 1h 봉
local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 5h 평균
if local_avg <= 0:
return False
ratio = recent_vol / local_avg
result = ratio >= vol_mult
if result:
logger.debug(
f"[거래량↑] {ticker} 1h={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
)
else:
logger.debug(
f"[거래량✗] {ticker} {ratio:.2f}x < {vol_mult}x"
)
return result
def should_buy(ticker: str) -> bool:
"""Volume Lead 전략.
1단계: 거래량 급증 + 2h 횡보 → 신호가 기록
2단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
"""
regime = get_regime()
vol_mult = regime["vol_mult"]
current = get_current_price(ticker)
if not current:
return False
now = time.time()
# ── 기존 신호 유효성 확인 ────────────────────────────────
sig = _accum_signals.get(ticker)
if sig is not None:
age_h = (now - sig["time"]) / 3600
if age_h > SIGNAL_TIMEOUT_H:
del _accum_signals[ticker]
sig = None
logger.debug(f"[축적타임아웃] {ticker} {age_h:.1f}h 경과 → 신호 초기화")
# ── 신호 없음: 축적 조건 체크 ────────────────────────────
if sig is None:
# 2h 가격 횡보 확인 (DB 가격 활용)
price_2h = get_price_n_hours_ago(ticker, 2)
if price_2h is None:
return False
quiet = abs(current - price_2h) / price_2h * 100 < PRICE_QUIET_PCT
if not quiet:
logger.debug(
f"[횡보✗] {ticker} 2h변동={(current - price_2h) / price_2h * 100:+.1f}% "
f"(기준={PRICE_QUIET_PCT}%)"
)
return False
# 거래량 급증 확인
if not _check_vol_spike(ticker, vol_mult):
return False
# 축적 신호 기록
_accum_signals[ticker] = {"price": current, "time": now}
logger.info(
f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}"
)
# 거래량 비율 계산 후 알림 전송
try:
fetch_count = LOCAL_VOL_HOURS + 3
df_h = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
if df_h is not None and len(df_h) >= LOCAL_VOL_HOURS + 1:
recent_vol = df_h["volume"].iloc[-2]
local_avg = df_h["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean()
ratio = recent_vol / local_avg if local_avg > 0 else 0
notify_signal(ticker, current, ratio)
except Exception:
notify_signal(ticker, current, 0.0)
return False # 신호 첫 발생 시는 진입 안 함
# ── 신호 있음: 상승 확인 → 진입 ─────────────────────────
signal_price = sig["price"]
move_pct = (current - signal_price) / signal_price * 100
if current < signal_price:
# 신호가 이하 하락 → 축적 실패
del _accum_signals[ticker]
logger.debug(
f"[축적실패] {ticker} 신호가={signal_price:,.2f} → 현재={current:,.2f} (하락) → 초기화"
)
return False
if move_pct >= TREND_AFTER_VOL:
del _accum_signals[ticker]
logger.info(
f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}"
f"(+{move_pct:.1f}% ≥ {TREND_AFTER_VOL}%)"
)
return True
logger.debug(
f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} "
f"(+{move_pct:.1f}% / 목표={TREND_AFTER_VOL}%)"
)
return False