"""Market data utilities.""" from __future__ import annotations import logging import time import pyupbit import requests logger = logging.getLogger(__name__) TOP_N = 20 # 거래량 상위 N개 종목만 스캔 _TICKER_URL = "https://api.upbit.com/v1/ticker" # 티커 목록 캐시 (5분 TTL — API rate limit 방지) _ticker_cache: list[str] = [] _ticker_cache_time: float = 0.0 _TICKER_CACHE_TTL = 300 # 5분 def get_top_tickers() -> list[str]: """24시간 거래대금 상위 KRW 마켓 티커 반환 (5분 캐시).""" global _ticker_cache, _ticker_cache_time if _ticker_cache and (time.time() - _ticker_cache_time) < _TICKER_CACHE_TTL: return _ticker_cache try: all_tickers = pyupbit.get_tickers(fiat="KRW") if not all_tickers: return [] # 100개씩 나눠서 조회 (URL 길이 제한) chunk_size = 100 ticker_data = [] for i in range(0, len(all_tickers), chunk_size): chunk = all_tickers[i:i + chunk_size] params = {"markets": ",".join(chunk)} resp = requests.get(_TICKER_URL, params=params, timeout=5) resp.raise_for_status() ticker_data.extend(resp.json()) # 스테이블코인 제외 EXCLUDE = {"KRW-USDT", "KRW-USDC", "KRW-DAI", "KRW-BUSD"} ticker_data = [t for t in ticker_data if t["market"] not in EXCLUDE] # 24h 거래대금 기준 정렬 ticker_data.sort(key=lambda x: x.get("acc_trade_price_24h", 0), reverse=True) top = [t["market"] for t in ticker_data[:TOP_N]] _ticker_cache = top _ticker_cache_time = time.time() logger.debug(f"상위 {TOP_N}개: {top[:5]}...") return top except Exception as e: logger.error(f"get_top_tickers 실패: {e}") return _ticker_cache # 실패 시 이전 캐시 반환 def get_ohlcv(ticker: str, count: int = 21): """일봉 OHLCV 데이터 반환.""" try: return pyupbit.get_ohlcv(ticker, interval="day", count=count) except Exception as e: logger.error(f"get_ohlcv({ticker}) 실패: {e}") return None def get_current_price(ticker: str) -> float | None: """현재가 반환.""" try: return pyupbit.get_current_price(ticker) except Exception as e: logger.error(f"get_current_price({ticker}) 실패: {e}") return None