- tick_trader.py를 Controller로 축소, 로직을 3개 모듈로 분리: - core/signal.py: 시그널 감지, 지표 계산 (calc_vr, calc_atr, detect_signal) - core/order.py: Upbit 주문 실행 (매수/매도/취소/조회) - core/position_manager.py: 포지션 관리, DB sync, 복구, 청산 조건 - type hints, Google docstring, 구체적 예외 타입 적용 - 50줄 초과 함수 분리 (process_signal, restore_positions) - 미사용 파일 58개 archive/ 폴더로 이동 - README.md 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
132 lines
4.4 KiB
Python
132 lines
4.4 KiB
Python
"""1년치 1분봉 OHLCV → BACKTEST_OHLCV (수동 페이지네이션).
|
||
|
||
pyupbit이 count>5000에서 실패하므로 to 파라미터로 직접 페이지네이션.
|
||
실행: .venv/bin/python3 tests/fetch_1y_minute1.py
|
||
예상 소요: 20종목 × ~15분 = ~5시간 (overnight 실행 권장)
|
||
"""
|
||
import sys, os, time
|
||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||
from dotenv import load_dotenv
|
||
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env'))
|
||
|
||
import pyupbit
|
||
import oracledb
|
||
from datetime import datetime, timedelta
|
||
import pandas as pd
|
||
|
||
ALL_TICKERS = [
|
||
'KRW-XRP','KRW-BTC','KRW-ETH','KRW-SOL','KRW-DOGE', # 그룹 1
|
||
'KRW-ADA','KRW-SUI','KRW-NEAR','KRW-KAVA','KRW-SXP', # 그룹 2
|
||
'KRW-AKT','KRW-SONIC','KRW-IP','KRW-ORBS','KRW-VIRTUAL', # 그룹 3
|
||
'KRW-BARD','KRW-XPL','KRW-KITE','KRW-ENSO','KRW-0G', # 그룹 4
|
||
]
|
||
# 실행: python fetch_1y_minute1.py [그룹번호 1-4]
|
||
import argparse
|
||
parser = argparse.ArgumentParser()
|
||
parser.add_argument('group', nargs='?', type=int, default=0,
|
||
help='1-4: 해당 그룹만 실행, 0: 전체')
|
||
args, _ = parser.parse_known_args()
|
||
if args.group in (1,2,3,4):
|
||
TICKERS = ALL_TICKERS[(args.group-1)*5 : args.group*5]
|
||
print(f"그룹 {args.group} 실행: {TICKERS}")
|
||
else:
|
||
TICKERS = ALL_TICKERS
|
||
BATCH = 4000 # 한 번에 요청할 봉 수 (4000 = ~2.8일)
|
||
DELAY = 0.15 # API 딜레이 (초)
|
||
TARGET_DAYS = 365 # 목표 기간
|
||
|
||
def _get_conn():
|
||
kwargs = dict(user=os.environ["ORACLE_USER"],
|
||
password=os.environ["ORACLE_PASSWORD"],
|
||
dsn=os.environ["ORACLE_DSN"])
|
||
wallet = os.environ.get("ORACLE_WALLET")
|
||
if wallet:
|
||
kwargs["config_dir"] = wallet
|
||
return oracledb.connect(**kwargs)
|
||
|
||
|
||
def insert_batch(conn, ticker, df) -> int:
|
||
"""DB에 없는 행만 삽입. 반환: 신규 건수."""
|
||
cur = conn.cursor()
|
||
min_ts = df.index.min().to_pydatetime()
|
||
max_ts = df.index.max().to_pydatetime()
|
||
cur.execute(
|
||
"SELECT ts FROM backtest_ohlcv WHERE ticker=:t AND interval_cd='minute1' "
|
||
"AND ts BETWEEN :s AND :e",
|
||
{"t": ticker, "s": min_ts, "e": max_ts}
|
||
)
|
||
existing = {r[0] for r in cur.fetchall()}
|
||
new_rows = [
|
||
(ticker, 'minute1', ts.to_pydatetime(),
|
||
float(r["open"]), float(r["high"]), float(r["low"]),
|
||
float(r["close"]), float(r["volume"]))
|
||
for ts, r in df.iterrows()
|
||
if ts.to_pydatetime() not in existing
|
||
]
|
||
if not new_rows:
|
||
return 0
|
||
cur.executemany(
|
||
"INSERT INTO backtest_ohlcv (ticker,interval_cd,ts,open_p,high_p,low_p,close_p,volume_p) "
|
||
"VALUES (:1,:2,:3,:4,:5,:6,:7,:8)",
|
||
new_rows
|
||
)
|
||
conn.commit()
|
||
return len(new_rows)
|
||
|
||
|
||
def fetch_ticker(conn, ticker) -> int:
|
||
"""ticker 1년치 1분봉 fetch → DB 저장. 반환: 총 신규 건수."""
|
||
cutoff = datetime.now() - timedelta(days=TARGET_DAYS)
|
||
to_dt = datetime.now()
|
||
total = 0
|
||
batch_no = 0
|
||
|
||
while to_dt > cutoff:
|
||
to_str = to_dt.strftime('%Y-%m-%d %H:%M:%S')
|
||
try:
|
||
df = pyupbit.get_ohlcv(ticker, interval='minute1', count=BATCH, to=to_str)
|
||
time.sleep(DELAY)
|
||
except Exception as e:
|
||
print(f" API 오류: {e} → 재시도")
|
||
time.sleep(2.0)
|
||
continue
|
||
|
||
if df is None or len(df) == 0:
|
||
break
|
||
|
||
n = insert_batch(conn, ticker, df)
|
||
total += n
|
||
batch_no += 1
|
||
oldest = df.index[0]
|
||
|
||
print(f" 배치{batch_no:03d}: {oldest.date()} ~ {df.index[-1].strftime('%m-%d')} "
|
||
f"({len(df)}봉, 신규 {n}) | 누적 {total:,}", flush=True)
|
||
|
||
# 다음 페이지: 이 배치에서 가장 오래된 봉 이전부터
|
||
to_dt = oldest - timedelta(minutes=1)
|
||
|
||
if oldest <= cutoff:
|
||
break
|
||
|
||
return total
|
||
|
||
|
||
conn = _get_conn()
|
||
grand_total = 0
|
||
start_time = time.time()
|
||
|
||
for idx, tk in enumerate(TICKERS, 1):
|
||
t0 = time.time()
|
||
print(f"\n[{idx:02d}/{len(TICKERS)}] {tk} 시작...", flush=True)
|
||
try:
|
||
n = fetch_ticker(conn, tk)
|
||
elapsed = time.time() - t0
|
||
grand_total += n
|
||
print(f" → 완료: 신규 {n:,}행 ({elapsed/60:.1f}분) | 전체 누적 {grand_total:,}행", flush=True)
|
||
except Exception as e:
|
||
print(f" → 오류: {e}")
|
||
|
||
conn.close()
|
||
elapsed_total = time.time() - start_time
|
||
print(f"\n전체 완료: {grand_total:,}행 저장 ({elapsed_total/60:.0f}분 소요)")
|