Files
upbit-trader/core/market.py
joungmin 80ab004eba feat: replace volatility breakout with DB-backed real-time trend check
- price_history table on Oracle ADB stores prices every 10 minutes
- check_trend(): current price vs N hours ago (default 1h, +3% threshold)
- check_momentum(): unchanged (MA20 + 2x volume still applies)
- Ticker list cached 5 minutes to avoid 429 rate limits
- Collector starts 30s after boot to avoid simultaneous API calls
- Configurable: TREND_HOURS, TREND_MIN_GAIN_PCT in .env

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 11:26:26 +09:00

74 lines
2.3 KiB
Python

"""Market data utilities."""
from __future__ import annotations
import logging
import time
import pyupbit
import requests
logger = logging.getLogger(__name__)
TOP_N = 20 # 거래량 상위 N개 종목만 스캔
_TICKER_URL = "https://api.upbit.com/v1/ticker"
# 티커 목록 캐시 (5분 TTL — API rate limit 방지)
_ticker_cache: list[str] = []
_ticker_cache_time: float = 0.0
_TICKER_CACHE_TTL = 300 # 5분
def get_top_tickers() -> list[str]:
"""24시간 거래대금 상위 KRW 마켓 티커 반환 (5분 캐시)."""
global _ticker_cache, _ticker_cache_time
if _ticker_cache and (time.time() - _ticker_cache_time) < _TICKER_CACHE_TTL:
return _ticker_cache
try:
all_tickers = pyupbit.get_tickers(fiat="KRW")
if not all_tickers:
return []
# 100개씩 나눠서 조회 (URL 길이 제한)
chunk_size = 100
ticker_data = []
for i in range(0, len(all_tickers), chunk_size):
chunk = all_tickers[i:i + chunk_size]
params = {"markets": ",".join(chunk)}
resp = requests.get(_TICKER_URL, params=params, timeout=5)
resp.raise_for_status()
ticker_data.extend(resp.json())
# 스테이블코인 제외
EXCLUDE = {"KRW-USDT", "KRW-USDC", "KRW-DAI", "KRW-BUSD"}
ticker_data = [t for t in ticker_data if t["market"] not in EXCLUDE]
# 24h 거래대금 기준 정렬
ticker_data.sort(key=lambda x: x.get("acc_trade_price_24h", 0), reverse=True)
top = [t["market"] for t in ticker_data[:TOP_N]]
_ticker_cache = top
_ticker_cache_time = time.time()
logger.debug(f"상위 {TOP_N}개: {top[:5]}...")
return top
except Exception as e:
logger.error(f"get_top_tickers 실패: {e}")
return _ticker_cache # 실패 시 이전 캐시 반환
def get_ohlcv(ticker: str, count: int = 21):
"""일봉 OHLCV 데이터 반환."""
try:
return pyupbit.get_ohlcv(ticker, interval="day", count=count)
except Exception as e:
logger.error(f"get_ohlcv({ticker}) 실패: {e}")
return None
def get_current_price(ticker: str) -> float | None:
"""현재가 반환."""
try:
return pyupbit.get_current_price(ticker)
except Exception as e:
logger.error(f"get_current_price({ticker}) 실패: {e}")
return None