Files
upbit-trader/core/price_collector.py
joungmin 0b264b304c feat: add backtest module with DB cache and scenario comparison
Backtest improvements:
- Add backtest.py with Oracle DB-backed OHLCV cache (no repeated API calls)
- Add backtest_trades table to cache simulation results by params hash
  (same params -> instant load, skip re-simulation)
- Add walk-forward scenario comparison (--walkforward-cmp)
- Add trend ceiling filter (--trend-cmp, max gain threshold)
- Add ticker win-rate filter (--ticker-cmp, SQL-based instant analysis)
- Precompute daily_features once per data load (not per scenario)

Live bot fixes:
- monitor: add hard stop-loss from buy price (in addition to trailing)
- strategy: fix re-entry condition to require +1% above last sell price
- price_collector: add 48h backfill on startup for trend calculation
- main: call backfill_prices() at startup

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 23:28:27 +09:00

86 lines
2.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""10분마다 상위 종목 현재가를 Oracle DB에 저장하는 수집기."""
from __future__ import annotations
import logging
import time
import pyupbit
import requests
from .market import get_top_tickers
from .price_db import cleanup_old_prices, insert_prices, insert_prices_with_time
logger = logging.getLogger(__name__)
COLLECT_INTERVAL = 600 # 10분 (초)
CLEANUP_EVERY = 6 # 1시간(10분 × 6)마다 오래된 데이터 정리
def backfill_prices(hours: int = 48) -> None:
"""시작 시 과거 N시간치 1시간봉 종가를 DB에 백필.
price_history에 데이터가 없으면 추세 판단이 불가능하므로
봇 시작 직후 한 번 호출해 과거 데이터를 채운다.
"""
tickers = get_top_tickers()
if not tickers:
logger.warning("[백필] 종목 목록 없음, 스킵")
return
count = hours + 2 # 여유 있게 요청
total_rows = 0
for ticker in tickers:
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=count)
if df is None or df.empty:
continue
rows = [
(ticker, float(row["close"]), ts.to_pydatetime())
for ts, row in df.iterrows()
]
insert_prices_with_time(rows)
total_rows += len(rows)
time.sleep(0.1)
except Exception as e:
logger.error(f"[백필] {ticker} 오류: {e}")
logger.info(f"[백필] 완료 — {len(tickers)}개 종목 / {total_rows}개 레코드 저장")
def run_collector(interval: int = COLLECT_INTERVAL) -> None:
"""가격 수집 루프."""
logger.info(f"가격 수집기 시작 (주기={interval//60}분)")
time.sleep(30) # 스캐너와 동시 API 호출 방지
cycle = 0
while True:
try:
tickers = get_top_tickers()
if not tickers:
continue
resp = requests.get(
"https://api.upbit.com/v1/ticker",
params={"markets": ",".join(tickers)},
timeout=5,
)
resp.raise_for_status()
data = resp.json()
valid = {
item["market"]: item["trade_price"]
for item in data
if item.get("trade_price")
}
insert_prices(valid)
logger.info(f"[수집] {len(valid)}개 종목 가격 저장")
cycle += 1
if cycle % CLEANUP_EVERY == 0:
cleanup_old_prices(keep_hours=48)
logger.info("오래된 가격 데이터 정리 완료")
except Exception as e:
logger.error(f"가격 수집 오류: {e}")
time.sleep(interval)