"""1년치 데이터 수집 — 10분봉 OHLCV + F&G 히스토리. 생성 파일: data/sim1y_cache.pkl — {"10m": {ticker: DataFrame}} (10분봉, 365일) data/fng_1y.json — {"YYYY-MM-DD": int, ...} (Fear & Greed 1년치) 소요 시간: 약 10~15분 (20종목 × 263 API 호출) """ import os as _os, sys as _sys _sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__)))) import json import pickle import time import urllib.request from datetime import datetime, timedelta from pathlib import Path import pandas as pd import pyupbit from dotenv import load_dotenv load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env") # ── 설정 ───────────────────────────────────────────────── CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl" FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json" TOP30_FILE = Path(__file__).parent.parent / "data" / "top30_tickers.pkl" DAYS = 365 TOP_N = 20 # ── 10분봉 수집 ─────────────────────────────────────────── def fetch_10m(ticker: str, days: int) -> "pd.DataFrame | None": target_start = datetime.now() - timedelta(days=days) all_dfs, to, prev_oldest = [], None, None while True: kwargs = dict(ticker=ticker, interval="minute10", count=200) if to: kwargs["to"] = to.strftime("%Y-%m-%d %H:%M:%S") try: df = pyupbit.get_ohlcv(**kwargs) except Exception: time.sleep(0.5) break if df is None or df.empty: break all_dfs.append(df) oldest = df.index[0] if prev_oldest is not None and oldest >= prev_oldest: break prev_oldest = oldest if oldest <= target_start: break to = oldest time.sleep(0.12) if not all_dfs: return None combined = pd.concat(all_dfs).sort_index() combined = combined[~combined.index.duplicated(keep="last")] return combined[combined.index >= target_start] # ── F&G 1년치 수집 ──────────────────────────────────────── def fetch_fng(limit: int = 400) -> dict: url = f"https://api.alternative.me/fng/?limit={limit}&format=json" with urllib.request.urlopen(url, timeout=15) as r: data = json.loads(r.read()) result = {} for e in data["data"]: dt = datetime.fromtimestamp(int(e["timestamp"])) result[dt.strftime("%Y-%m-%d")] = int(e["value"]) return result # ── 메인 ───────────────────────────────────────────────── def main(): # ── 종목 목록 ───────────────────────────────────────── try: from core.market import get_top_tickers tickers = get_top_tickers()[:TOP_N] print(f"Top{TOP_N} 종목 API 조회: {tickers}\n") # top30 파일 갱신 pickle.dump(tickers, open(TOP30_FILE, "wb")) except Exception as e: print(f" [경고] 종목 API 실패: {e}") if TOP30_FILE.exists(): tickers = pickle.load(open(TOP30_FILE, "rb"))[:TOP_N] print(f" 기존 top30 파일 사용: {tickers}\n") else: print(" [오류] 종목 목록 없음. 종료.") return # ── F&G 1년치 ───────────────────────────────────────── print("F&G 1년치 수집...") try: fng_map = fetch_fng(limit=400) sorted_dates = sorted(fng_map.keys()) print(f" 기간: {sorted_dates[0]} ~ {sorted_dates[-1]} ({len(fng_map)}일)") # 분포 zones = {"극공포(≤25)": 0, "공포(26~40)": 0, "중립(41~55)": 0, "탐욕(56~75)": 0, "극탐욕(76+)": 0} for v in fng_map.values(): if v <= 25: zones["극공포(≤25)"] += 1 elif v <= 40: zones["공포(26~40)"] += 1 elif v <= 55: zones["중립(41~55)"] += 1 elif v <= 75: zones["탐욕(56~75)"] += 1 else: zones["극탐욕(76+)"] += 1 total = sum(zones.values()) for name, cnt in zones.items(): print(f" {name:12} {cnt:>3}일 ({cnt/total*100:.1f}%)") json.dump(fng_map, open(FNG_FILE, "w")) print(f" 저장: {FNG_FILE}\n") except Exception as e: print(f" [오류] F&G 수집 실패: {e}\n") fng_map = {} # ── 10분봉 1년치 ────────────────────────────────────── print(f"10분봉 {DAYS}일치 수집 중 ({len(tickers)}종목)...") print(f" 예상 소요: {len(tickers) * 265 * 0.12 / 60:.0f}~{len(tickers) * 265 * 0.15 / 60:.0f}분\n") data = {"10m": {}} for i, ticker in enumerate(tickers, 1): start_t = time.time() df = fetch_10m(ticker, DAYS) elapsed = time.time() - start_t if df is not None and len(df) > 500: data["10m"][ticker] = df candles = len(df) period = f"{df.index[0].strftime('%Y-%m-%d')}~{df.index[-1].strftime('%Y-%m-%d')}" print(f" {i:>2}/{len(tickers)} {ticker:<15} {candles:>6}봉 {period} ({elapsed:.0f}s)") else: print(f" {i:>2}/{len(tickers)} {ticker:<15} 데이터 부족 ({elapsed:.0f}s)") time.sleep(0.15) # ── 저장 ────────────────────────────────────────────── print(f"\n수집 완료: {len(data['10m'])}종목") if data["10m"]: sample = next(iter(data["10m"].values())) print(f"기간: {sample.index[0].strftime('%Y-%m-%d')} ~ {sample.index[-1].strftime('%Y-%m-%d')}") print(f"봉 수: {len(sample)}개 (10분봉)") # 파일 크기 추정 import sys size_mb = sys.getsizeof(pickle.dumps(data)) / 1024 / 1024 print(f"예상 크기: {size_mb:.1f} MB") pickle.dump(data, open(CACHE_FILE, "wb")) print(f"\n캐시 저장: {CACHE_FILE}") print("완료!") if __name__ == "__main__": main()