"""1분봉 장기 히스토리 fetch 데몬. 주요 5종목(BTC/ETH/XRP/SOL/DOGE)의 1분봉을 2년치까지 소급 수집. 백그라운드에서 조용히 실행 — API 딜레이 충분히 줘서 다른 작업 방해 안 함. 재시작 시 DB에 이미 있는 범위는 건너뜀. 실행: .venv/bin/python3 daemons/fetch_1min_history.py [--tickers BTC ETH] [--days 730] 로그: /tmp/fetch_1min_history.log """ import sys, os, time, argparse, logging from datetime import datetime, timedelta sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) import pyupbit import oracledb # ── 설정 ────────────────────────────────────────────────────────────────────── DEFAULT_TICKERS = ['KRW-BTC', 'KRW-ETH', 'KRW-XRP', 'KRW-SOL', 'KRW-DOGE'] BATCH = 200 # API 1회 요청 봉수 (Upbit 최대 200) DELAY = 0.4 # API 호출 간격 (초) — 넉넉히 줘서 rate limit 회피 RETRY_WAIT = 5.0 # 오류 시 대기 (초) # ── 로깅 설정 ───────────────────────────────────────────────────────────────── logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[ logging.FileHandler('/tmp/fetch_1min_history.log'), logging.StreamHandler(sys.stdout), ] ) log = logging.getLogger(__name__) def _get_conn(): kwargs = dict(user=os.environ["ORACLE_USER"], password=os.environ["ORACLE_PASSWORD"], dsn=os.environ["ORACLE_DSN"]) wallet = os.environ.get("ORACLE_WALLET") if wallet: kwargs["config_dir"] = wallet return oracledb.connect(**kwargs) def get_oldest_ts(conn, ticker: str): """DB에 있는 해당 ticker 1분봉의 가장 오래된 ts 반환. 없으면 None.""" cur = conn.cursor() cur.execute( "SELECT MIN(ts) FROM backtest_ohlcv " "WHERE ticker=:t AND interval_cd='minute1'", {"t": ticker} ) row = cur.fetchone() return row[0] if row and row[0] else None def insert_batch(conn, ticker: str, rows: list) -> int: """rows: [(ts, open, high, low, close, volume), ...] — bulk insert, 중복 무시.""" if not rows: return 0 cur = conn.cursor() cur.executemany( "INSERT INTO backtest_ohlcv " "(ticker,interval_cd,ts,open_p,high_p,low_p,close_p,volume_p) " "VALUES (:1,'minute1',:2,:3,:4,:5,:6,:7)", [(ticker, ts, o, h, l, c, v) for ts, o, h, l, c, v in rows], batcherrors=True, ) errors = cur.getbatcherrors() conn.commit() return len(rows) - len(errors) def fetch_ticker(conn, ticker: str, cutoff: datetime) -> int: """ticker의 cutoff까지 1분봉 소급 fetch. DB에 이미 있는 범위는 batcherrors로 자동 스킵. """ oldest_in_db = get_oldest_ts(conn, ticker) if oldest_in_db and oldest_in_db <= cutoff: log.info(f"{ticker}: DB에 이미 {oldest_in_db.date()} 까지 있음 → 스킵") return 0 # datetime.now()에서 시작해 cutoff까지 역방향 fetch # 중복은 DB unique constraint + batcherrors가 처리 to_dt = datetime.now() total = 0 batch_n = 0 if oldest_in_db: log.info(f"{ticker}: DB 최솟값={oldest_in_db.date()}, {cutoff.date()} 까지 소급 시작") else: log.info(f"{ticker}: DB에 데이터 없음, {cutoff.date()} 까지 전체 fetch 시작") while to_dt > cutoff: # pyupbit의 to 파라미터는 UTC로 해석됨 — KST에서 9시간 빼서 전달 to_utc = to_dt - timedelta(hours=9) to_str = to_utc.strftime('%Y-%m-%d %H:%M:%S') try: df = pyupbit.get_ohlcv(ticker, interval='minute1', count=BATCH, to=to_str) time.sleep(DELAY) except Exception as e: log.warning(f"{ticker} API 오류: {e} → {RETRY_WAIT}s 후 재시도") time.sleep(RETRY_WAIT) continue if df is None or len(df) == 0: log.info(f"{ticker}: API 데이터 소진 ({to_str})") break rows = [ (ts.to_pydatetime(), float(r['open']), float(r['high']), float(r['low']), float(r['close']), float(r['volume'])) for ts, r in df.iterrows() ] n = insert_batch(conn, ticker, rows) total += n batch_n += 1 oldest = df.index[0].to_pydatetime() if batch_n % 50 == 0: log.info(f" {ticker} 배치{batch_n:04d}: {oldest.date()} | 신규 누적 {total:,}행") to_dt = oldest - timedelta(minutes=1) if oldest <= cutoff: break log.info(f"{ticker}: 완료 — 신규 {total:,}행 (배치 {batch_n}회)") return total def main(): parser = argparse.ArgumentParser() parser.add_argument('--tickers', nargs='+', default=DEFAULT_TICKERS, help='수집 티커 (예: KRW-BTC KRW-ETH)') parser.add_argument('--days', type=int, default=730, help='소급 일수 (기본 730일 = 2년)') args = parser.parse_args() cutoff = datetime.now() - timedelta(days=args.days) log.info(f"=== 1분봉 히스토리 데몬 시작 ===") log.info(f"대상: {args.tickers}") log.info(f"목표: {cutoff.date()} ({args.days}일) 까지 소급") conn = _get_conn() grand_total = 0 t_start = time.time() for ticker in args.tickers: t0 = time.time() try: n = fetch_ticker(conn, ticker, cutoff) grand_total += n elapsed = time.time() - t0 log.info(f"{ticker}: {n:,}행 저장 ({elapsed/60:.1f}분)") except Exception as e: log.error(f"{ticker}: 오류 — {e}") conn.close() total_min = (time.time() - t_start) / 60 log.info(f"=== 완료: 총 {grand_total:,}행 / {total_min:.0f}분 ===") if __name__ == '__main__': main()