"""10분마다 상위 종목 현재가를 Oracle DB에 저장하는 수집기.""" from __future__ import annotations import logging import time import pyupbit import requests from .market import get_top_tickers from .price_db import cleanup_old_prices, insert_prices, insert_prices_with_time logger = logging.getLogger(__name__) COLLECT_INTERVAL = 600 # 10분 (초) CLEANUP_EVERY = 6 # 1시간(10분 × 6)마다 오래된 데이터 정리 def backfill_prices(hours: int = 48) -> None: """시작 시 과거 N시간치 1시간봉 종가를 DB에 백필. price_history에 데이터가 없으면 추세 판단이 불가능하므로 봇 시작 직후 한 번 호출해 과거 데이터를 채운다. """ tickers = get_top_tickers() if not tickers: logger.warning("[백필] 종목 목록 없음, 스킵") return count = hours + 2 # 여유 있게 요청 total_rows = 0 for ticker in tickers: try: df = pyupbit.get_ohlcv(ticker, interval="minute60", count=count) if df is None or df.empty: continue rows = [ (ticker, float(row["close"]), ts.to_pydatetime()) for ts, row in df.iterrows() ] insert_prices_with_time(rows) total_rows += len(rows) time.sleep(0.1) except Exception as e: logger.error(f"[백필] {ticker} 오류: {e}") logger.info(f"[백필] 완료 — {len(tickers)}개 종목 / {total_rows}개 레코드 저장") def run_collector(interval: int = COLLECT_INTERVAL) -> None: """가격 수집 루프.""" logger.info(f"가격 수집기 시작 (주기={interval//60}분)") time.sleep(30) # 스캐너와 동시 API 호출 방지 cycle = 0 while True: try: tickers = get_top_tickers() if not tickers: continue resp = requests.get( "https://api.upbit.com/v1/ticker", params={"markets": ",".join(tickers)}, timeout=5, ) resp.raise_for_status() data = resp.json() valid = { item["market"]: item["trade_price"] for item in data if item.get("trade_price") } insert_prices(valid) logger.info(f"[수집] {len(valid)}개 종목 가격 저장") cycle += 1 if cycle % CLEANUP_EVERY == 0: cleanup_old_prices(keep_hours=48) logger.info("오래된 가격 데이터 정리 완료") except Exception as e: logger.error(f"가격 수집 오류: {e}") time.sleep(interval)