"""Strategy C: 현재 기준 N시간 전 대비 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호. 추가 필터: 1. 6h 추세 확인 (단기 급등 아닌 지속 추세) 2. 15분 확인 워치리스트 (신호 첫 발생 후 15분 내 재확인 시 진입) """ from __future__ import annotations import logging import os import time import pyupbit from .market import get_current_price, get_ohlcv from .market_regime import get_regime from .price_db import get_price_n_hours_ago logger = logging.getLogger(__name__) # 추세 판단: 현재 기준 N시간 전 DB 가격 대비 +M% 이상이면 상승 중 TREND_HOURS = float(os.getenv("TREND_HOURS", "12")) TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # 레짐이 없을 때 기본값 # 6h 단기 추세 최소 상승률 (추세 지속형 필터) TREND_6H_MIN_PCT = float(os.getenv("TREND_6H_MIN_PCT", "1.0")) # 모멘텀: MA 기간, 거래량 급증 배수 MA_PERIOD = 20 VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) # 레짐이 없을 때 기본값 LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h) # 15분 확인 워치리스트: 신호 첫 발생 시각(unix ts) 기록 CONFIRM_SECONDS = int(os.getenv("CONFIRM_SECONDS", "900")) # 기본 15분 _watchlist: dict[str, float] = {} # ticker → first_signal_time (unix timestamp) def check_trend(ticker: str, min_gain_pct: float) -> bool: """상승 추세 조건: 현재가가 DB에 저장된 N시간 전 가격 대비 +min_gain_pct% 이상.""" past_price = get_price_n_hours_ago(ticker, TREND_HOURS) if past_price is None: logger.debug(f"[추세] {ticker} {TREND_HOURS:.0f}h 전 가격 없음 (수집 중)") return False current = get_current_price(ticker) if not current: return False gain_pct = (current - past_price) / past_price * 100 result = gain_pct >= min_gain_pct if result: logger.info( f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.2f} " f"현재={current:,.2f} (+{gain_pct:.1f}% ≥ {min_gain_pct}%)" ) else: logger.debug( f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={min_gain_pct:+.0f}%)" ) return result def check_momentum(ticker: str, vol_mult: float) -> bool: """모멘텀 조건: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 × vol_mult. 23h 평균은 낮 시간대 고거래량이 포함돼 새벽에 항상 미달하므로, 로컬 5h 평균(같은 시간대 컨텍스트)과 비교한다. """ # MA20: 일봉 기준 df_daily = get_ohlcv(ticker, count=MA_PERIOD + 1) if df_daily is None or len(df_daily) < MA_PERIOD + 1: return False ma = df_daily["close"].iloc[-MA_PERIOD:].mean() current = get_current_price(ticker) if current is None: return False price_ok = current > ma if not price_ok: logger.debug(f"[모멘텀✗] {ticker} 현재={current:,.0f} < MA20={ma:,.0f} (가격 기준 미달)") return False # 거래량: 60분봉 기준 (최근 1h vs 이전 LOCAL_VOL_HOURS h 로컬 평균) fetch_count = LOCAL_VOL_HOURS + 3 # 여유 있게 fetch try: df_hour = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count) except Exception: return False if df_hour is None or len(df_hour) < LOCAL_VOL_HOURS + 1: return False recent_vol = df_hour["volume"].iloc[-2] # 직전 완성된 1h 봉 local_avg = df_hour["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 LOCAL_VOL_HOURS h 평균 vol_ok = local_avg > 0 and recent_vol >= local_avg * vol_mult ratio = recent_vol / local_avg if local_avg > 0 else 0 if vol_ok: logger.info( f"[모멘텀↑] {ticker} 현재={current:,.0f} MA20={ma:,.0f} " f"1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)" ) else: logger.debug( f"[모멘텀✗] {ticker} 1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} " f"({ratio:.2f}x < {vol_mult}x)" ) return vol_ok def check_trend_6h(ticker: str) -> bool: """6h 추세 지속 확인: 6h 전 대비 +TREND_6H_MIN_PCT% 이상 상승 중이어야 진입 허용.""" past = get_price_n_hours_ago(ticker, 6) if past is None: logger.debug(f"[6h추세] {ticker} 6h 전 가격 없음 (수집 중)") return True # 데이터 없으면 필터 패스 (수집 초기) current = get_current_price(ticker) if not current: return False gain_pct = (current - past) / past * 100 result = gain_pct >= TREND_6H_MIN_PCT if result: logger.debug( f"[6h추세↑] {ticker} 6h전={past:,.2f} 현재={current:,.2f} " f"(+{gain_pct:.1f}% ≥ {TREND_6H_MIN_PCT}%)" ) else: logger.debug( f"[6h추세✗] {ticker} 6h {gain_pct:+.1f}% (기준={TREND_6H_MIN_PCT:+.1f}%)" ) return result def should_buy(ticker: str) -> bool: """Strategy C + 시장 레짐 + 추세 지속형 진입 (6h 추세 + 15분 확인 워치리스트). 진입 조건: 1. 12h 추세 ≥ 레짐별 임계값 (bull 3% / neutral 5% / bear 8%) 2. 6h 추세 ≥ 1% (단기 급등이 아닌 지속 추세) 3. 모멘텀 (MA20 초과 + 1h 거래량 급증) 4. 위 조건 최초 충족 후 15분 경과 시 실제 진입 (확인 필터) """ regime = get_regime() trend_pct = regime["trend_pct"] vol_mult = regime["vol_mult"] # 조건 평가 (순서: 가장 빠른 필터 먼저) if not check_trend(ticker, trend_pct): _watchlist.pop(ticker, None) # 조건 깨지면 워치리스트 초기화 return False if not check_trend_6h(ticker): _watchlist.pop(ticker, None) return False if not check_momentum(ticker, vol_mult): _watchlist.pop(ticker, None) return False # 모든 조건 충족 — 15분 확인 워치리스트 처리 now = time.time() if ticker not in _watchlist: _watchlist[ticker] = now logger.info( f"[워치] {ticker} 신호 첫 발생 → {CONFIRM_SECONDS//60}분 후 진입 예정" ) return False elapsed = now - _watchlist[ticker] if elapsed < CONFIRM_SECONDS: logger.debug( f"[워치] {ticker} 확인 대기 중 ({elapsed/60:.1f}분 / {CONFIRM_SECONDS//60}분)" ) return False # 15분 경과 → 진입 확정 _watchlist.pop(ticker, None) logger.info( f"[워치확인] {ticker} {elapsed/60:.1f}분 경과 → 진입 확정" ) return True