- core/fng.py: F&G API wrapper with 1h cache (alternative.me) - FNG_MIN_ENTRY=41 (env-configurable), blocks entry below threshold - core/strategy.py: call is_entry_allowed() before volume/regime checks - daemon/runner.py: log F&G status on every scan cycle - core/notify.py: include F&G value in buy/signal/status notifications - core/trader.py: pass current F&G value to notify_buy Backtest evidence (1y / 18 tickers / 1h candles): - No filter: 820 trades, 32.7% WR, avg +0.012%, KRW +95k - F&G >= 41: 372 trades, 39.5% WR, avg +0.462%, KRW +1.72M - Blocked 452 trades (avg -0.372%, saved ~1.68M KRW loss) Also add: - backtest_db.py: Oracle DB storage for backtest runs/results/trades - fng_1y_backtest.py, fng_adaptive_backtest.py, fng_sim_comparison.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
241 lines
9.7 KiB
Python
241 lines
9.7 KiB
Python
"""Volume Lead 전략: 거래량 축적(급증+횡보) 감지 후 +TREND_AFTER_VOL% 상승 시 선진입.
|
||
|
||
흐름:
|
||
1. 직전 40분봉 거래량 > 로컬 5h(7봉) 평균 × VOL_MULT AND
|
||
2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
|
||
→ 신호가(signal_price) + 거래량비율(vol_ratio) 기록
|
||
2. signal_price 대비 +임계값% 이상 상승 시 진입
|
||
임계값은 vol_ratio 강도에 따라 자동 조정 (강한 신호 → 낮은 임계값 → 조기 진입)
|
||
- vol_ratio ≥ 5.0x → +1.0%
|
||
- vol_ratio ≥ 3.5x → +2.0%
|
||
- vol_ratio ≥ 2.5x → +3.0%
|
||
- 기본 → +TREND_AFTER_VOL%
|
||
3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화
|
||
|
||
캔들: minute10 데이터를 40분봉으로 리샘플링하여 사용
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import logging
|
||
import os
|
||
import time
|
||
|
||
import pyupbit
|
||
|
||
from .fng import FNG_MIN_ENTRY, is_entry_allowed
|
||
from .market import get_current_price
|
||
from .market_regime import get_regime
|
||
from .notify import notify_signal
|
||
from .price_db import get_price_n_hours_ago
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# 축적 감지 파라미터
|
||
PRICE_QUIET_PCT = float(os.getenv("PRICE_QUIET_PCT", "2.0")) # 2h 횡보 기준 (%)
|
||
TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "5.0")) # 진입 임계값 (신호가 대비 %)
|
||
SIGNAL_TIMEOUT_H = float(os.getenv("SIGNAL_TIMEOUT_H", "8.0")) # 신호 유효 시간 (h)
|
||
|
||
# 거래량 파라미터
|
||
LOCAL_VOL_CANDLES = 7 # 5h를 40분봉으로 환산 (int(5 * 60/40) = 7)
|
||
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
|
||
|
||
# 40분봉 리샘플링 파라미터
|
||
_CANDLE_MIN = 40
|
||
_FETCH_10M = (LOCAL_VOL_CANDLES + 3) * (_CANDLE_MIN // 10) # 40 개의 10분봉
|
||
|
||
# 신호 강도별 진입 임계값 단계 (vol_ratio 최소값, 진입 임계값%)
|
||
# 강한 신호일수록 낮은 임계값으로 조기 진입
|
||
_ENTRY_TIERS: list[tuple[float, float]] = [
|
||
(5.0, 1.0), # 매우 강한 신호 (≥5x) → +1.0% 즉시 진입
|
||
(3.5, 2.0), # 강한 신호 (≥3.5x) → +2.0% 조기 진입
|
||
(2.5, 3.0), # 중간 신호 (≥2.5x) → +3.0% 반조기 진입
|
||
]
|
||
# 위 조건 미충족 시 TREND_AFTER_VOL 사용
|
||
|
||
# 속도(velocity) 기반 조기 진입 파라미터
|
||
# 신호 후 가격이 빠르게 상승 중이면 거리 임계값 도달 전에 선진입
|
||
VELOCITY_THRESHOLD = float(os.getenv("VELOCITY_THRESHOLD", "0.10")) # %/분 (0.10 = 6%/h)
|
||
VELOCITY_MIN_MOVE = float(os.getenv("VELOCITY_MIN_MOVE", "0.5")) # 최소 이동 % (잡음 제거)
|
||
VELOCITY_MIN_AGE_M = float(os.getenv("VELOCITY_MIN_AGE_M", "5.0")) # 최소 경과 시간(분)
|
||
|
||
|
||
def _calc_entry_threshold(vol_ratio: float) -> float:
|
||
"""거래량 비율에 따른 진입 임계값 반환. 강한 신호일수록 낮은 값."""
|
||
for min_ratio, threshold in _ENTRY_TIERS:
|
||
if vol_ratio >= min_ratio:
|
||
return threshold
|
||
return TREND_AFTER_VOL
|
||
|
||
|
||
def _resample_40m(df):
|
||
"""minute10 DataFrame → 40분봉으로 리샘플링."""
|
||
return (
|
||
df.resample("40min")
|
||
.agg({"open": "first", "high": "max", "low": "min", "close": "last", "volume": "sum"})
|
||
.dropna(subset=["close"])
|
||
)
|
||
|
||
# 축적 신호 상태: ticker → {"price": float, "time": float(unix), "vol_ratio": float}
|
||
_accum_signals: dict[str, dict] = {}
|
||
|
||
|
||
def get_active_signals() -> dict[str, dict]:
|
||
"""현재 활성화된 신호 딕셔너리 반환 (fast-poll 루프용).
|
||
|
||
Returns:
|
||
{ticker: {"price": float, "time": float, "vol_ratio": float}}
|
||
"""
|
||
return dict(_accum_signals)
|
||
|
||
|
||
def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
|
||
"""직전 완성 40분봉 거래량이 로컬 5h(7봉) 평균의 vol_mult 배 이상인지 확인."""
|
||
try:
|
||
df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M)
|
||
except Exception:
|
||
return False
|
||
if df10 is None or len(df10) < _CANDLE_MIN // 10 * 2:
|
||
return False
|
||
|
||
df = _resample_40m(df10)
|
||
if len(df) < LOCAL_VOL_CANDLES + 1:
|
||
return False
|
||
|
||
recent_vol = df["volume"].iloc[-2] # 직전 완성된 40분봉
|
||
local_avg = df["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean() # 이전 7봉(≈5h) 평균
|
||
if local_avg <= 0:
|
||
return False
|
||
|
||
ratio = recent_vol / local_avg
|
||
result = ratio >= vol_mult
|
||
if result:
|
||
logger.debug(
|
||
f"[거래량↑] {ticker} 40m={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
|
||
)
|
||
else:
|
||
logger.debug(
|
||
f"[거래량✗] {ticker} {ratio:.2f}x < {vol_mult}x"
|
||
)
|
||
return result
|
||
|
||
|
||
def should_buy(ticker: str) -> bool:
|
||
"""Volume Lead 전략.
|
||
|
||
1단계: F&G 필터 — 공포탐욕지수 < FNG_MIN_ENTRY(41)이면 즉시 차단
|
||
2단계: 거래량 급증 + 2h 횡보 → 신호가 기록
|
||
3단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
|
||
"""
|
||
# ── F&G 진입 필터 ─────────────────────────────────────
|
||
if not is_entry_allowed():
|
||
return False
|
||
|
||
regime = get_regime()
|
||
vol_mult = regime["vol_mult"]
|
||
|
||
current = get_current_price(ticker)
|
||
if not current:
|
||
return False
|
||
|
||
now = time.time()
|
||
|
||
# ── 기존 신호 유효성 확인 ────────────────────────────────
|
||
sig = _accum_signals.get(ticker)
|
||
if sig is not None:
|
||
age_h = (now - sig["time"]) / 3600
|
||
if age_h > SIGNAL_TIMEOUT_H:
|
||
del _accum_signals[ticker]
|
||
sig = None
|
||
logger.debug(f"[축적타임아웃] {ticker} {age_h:.1f}h 경과 → 신호 초기화")
|
||
|
||
# ── 신호 없음: 축적 조건 체크 ────────────────────────────
|
||
if sig is None:
|
||
# 2h 가격 횡보 확인 (DB 가격 활용)
|
||
price_2h = get_price_n_hours_ago(ticker, 2)
|
||
if price_2h is None:
|
||
return False
|
||
|
||
quiet = abs(current - price_2h) / price_2h * 100 < PRICE_QUIET_PCT
|
||
|
||
if not quiet:
|
||
logger.debug(
|
||
f"[횡보✗] {ticker} 2h변동={(current - price_2h) / price_2h * 100:+.1f}% "
|
||
f"(기준={PRICE_QUIET_PCT}%)"
|
||
)
|
||
return False
|
||
|
||
# 거래량 급증 확인
|
||
if not _check_vol_spike(ticker, vol_mult):
|
||
return False
|
||
|
||
# 거래량 비율 계산 후 신호 기록
|
||
ratio = 0.0
|
||
try:
|
||
df10 = pyupbit.get_ohlcv(ticker, interval="minute10", count=_FETCH_10M)
|
||
df_h = _resample_40m(df10) if df10 is not None else None
|
||
if df_h is not None and len(df_h) >= LOCAL_VOL_CANDLES + 1:
|
||
recent_vol = df_h["volume"].iloc[-2]
|
||
local_avg = df_h["volume"].iloc[-(LOCAL_VOL_CANDLES + 1):-2].mean()
|
||
ratio = recent_vol / local_avg if local_avg > 0 else 0.0
|
||
except Exception:
|
||
pass
|
||
|
||
entry_thr = _calc_entry_threshold(ratio)
|
||
_accum_signals[ticker] = {"price": current, "time": now, "vol_ratio": ratio}
|
||
from .fng import get_fng
|
||
fng_now = get_fng()
|
||
logger.info(
|
||
f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}원 "
|
||
f"(거래량 {ratio:.2f}x → 진입임계={entry_thr:.1f}% | F&G={fng_now})"
|
||
)
|
||
notify_signal(ticker, current, ratio, fng=fng_now)
|
||
return False # 신호 첫 발생 시는 진입 안 함
|
||
|
||
# ── 신호 있음: 상승 확인 → 진입 ─────────────────────────
|
||
signal_price = sig["price"]
|
||
vol_ratio = sig.get("vol_ratio", 0.0)
|
||
entry_thr = _calc_entry_threshold(vol_ratio)
|
||
move_pct = (current - signal_price) / signal_price * 100
|
||
age_min = (now - sig["time"]) / 60
|
||
|
||
if current < signal_price:
|
||
# 신호가 이하 하락 → 축적 실패
|
||
del _accum_signals[ticker]
|
||
logger.debug(
|
||
f"[축적실패] {ticker} 신호가={signal_price:,.2f} → 현재={current:,.2f} (하락) → 초기화"
|
||
)
|
||
return False
|
||
|
||
# ── 거리 기반 진입 ─────────────────────────────────────
|
||
if move_pct >= entry_thr:
|
||
del _accum_signals[ticker]
|
||
logger.info(
|
||
f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}원 "
|
||
f"(+{move_pct:.1f}% ≥ {entry_thr:.1f}% | 거래량={vol_ratio:.2f}x)"
|
||
)
|
||
return True
|
||
|
||
# ── 속도 기반 조기 진입 ────────────────────────────────
|
||
# 신호 후 N분 이상 경과 + 최소 이동 + 분당 상승률 기준 충족 시 선진입
|
||
if age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE:
|
||
velocity = move_pct / age_min # %/분
|
||
if velocity >= VELOCITY_THRESHOLD:
|
||
del _accum_signals[ticker]
|
||
logger.info(
|
||
f"[속도진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}원 "
|
||
f"(+{move_pct:.1f}% | {velocity:.3f}%/분 ≥ {VELOCITY_THRESHOLD}%/분 | "
|
||
f"경과={age_min:.1f}분 | 거래량={vol_ratio:.2f}x)"
|
||
)
|
||
return True
|
||
|
||
logger.debug(
|
||
f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} "
|
||
f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}% | "
|
||
f"속도={move_pct/age_min:.3f}%/분 | 경과={age_min:.1f}분)"
|
||
if age_min > 0 else
|
||
f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} "
|
||
f"(+{move_pct:.1f}% / 목표={entry_thr:.1f}%)"
|
||
)
|
||
return False
|