- tick_trader.py를 Controller로 축소, 로직을 3개 모듈로 분리: - core/signal.py: 시그널 감지, 지표 계산 (calc_vr, calc_atr, detect_signal) - core/order.py: Upbit 주문 실행 (매수/매도/취소/조회) - core/position_manager.py: 포지션 관리, DB sync, 복구, 청산 조건 - type hints, Google docstring, 구체적 예외 타입 적용 - 50줄 초과 함수 분리 (process_signal, restore_positions) - 미사용 파일 58개 archive/ 폴더로 이동 - README.md 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
170 lines
6.1 KiB
Python
170 lines
6.1 KiB
Python
"""1분봉 장기 히스토리 fetch 데몬.
|
|
|
|
주요 5종목(BTC/ETH/XRP/SOL/DOGE)의 1분봉을 2년치까지 소급 수집.
|
|
백그라운드에서 조용히 실행 — API 딜레이 충분히 줘서 다른 작업 방해 안 함.
|
|
재시작 시 DB에 이미 있는 범위는 건너뜀.
|
|
|
|
실행:
|
|
.venv/bin/python3 daemons/fetch_1min_history.py [--tickers BTC ETH] [--days 730]
|
|
로그:
|
|
/tmp/fetch_1min_history.log
|
|
"""
|
|
import sys, os, time, argparse, logging
|
|
from datetime import datetime, timedelta
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from dotenv import load_dotenv
|
|
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env'))
|
|
|
|
import pyupbit
|
|
import oracledb
|
|
|
|
# ── 설정 ──────────────────────────────────────────────────────────────────────
|
|
DEFAULT_TICKERS = ['KRW-BTC', 'KRW-ETH', 'KRW-XRP', 'KRW-SOL', 'KRW-DOGE']
|
|
BATCH = 200 # API 1회 요청 봉수 (Upbit 최대 200)
|
|
DELAY = 0.4 # API 호출 간격 (초) — 넉넉히 줘서 rate limit 회피
|
|
RETRY_WAIT = 5.0 # 오류 시 대기 (초)
|
|
|
|
# ── 로깅 설정 ─────────────────────────────────────────────────────────────────
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s %(levelname)s %(message)s',
|
|
handlers=[
|
|
logging.FileHandler('/tmp/fetch_1min_history.log'),
|
|
logging.StreamHandler(sys.stdout),
|
|
]
|
|
)
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
def _get_conn():
|
|
kwargs = dict(user=os.environ["ORACLE_USER"],
|
|
password=os.environ["ORACLE_PASSWORD"],
|
|
dsn=os.environ["ORACLE_DSN"])
|
|
wallet = os.environ.get("ORACLE_WALLET")
|
|
if wallet:
|
|
kwargs["config_dir"] = wallet
|
|
return oracledb.connect(**kwargs)
|
|
|
|
|
|
def get_oldest_ts(conn, ticker: str):
|
|
"""DB에 있는 해당 ticker 1분봉의 가장 오래된 ts 반환. 없으면 None."""
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT MIN(ts) FROM backtest_ohlcv "
|
|
"WHERE ticker=:t AND interval_cd='minute1'",
|
|
{"t": ticker}
|
|
)
|
|
row = cur.fetchone()
|
|
return row[0] if row and row[0] else None
|
|
|
|
|
|
def insert_batch(conn, ticker: str, rows: list) -> int:
|
|
"""rows: [(ts, open, high, low, close, volume), ...] — bulk insert, 중복 무시."""
|
|
if not rows:
|
|
return 0
|
|
cur = conn.cursor()
|
|
cur.executemany(
|
|
"INSERT INTO backtest_ohlcv "
|
|
"(ticker,interval_cd,ts,open_p,high_p,low_p,close_p,volume_p) "
|
|
"VALUES (:1,'minute1',:2,:3,:4,:5,:6,:7)",
|
|
[(ticker, ts, o, h, l, c, v) for ts, o, h, l, c, v in rows],
|
|
batcherrors=True,
|
|
)
|
|
errors = cur.getbatcherrors()
|
|
conn.commit()
|
|
return len(rows) - len(errors)
|
|
|
|
|
|
def fetch_ticker(conn, ticker: str, cutoff: datetime) -> int:
|
|
"""ticker의 cutoff까지 1분봉 소급 fetch.
|
|
DB에 이미 있는 범위는 batcherrors로 자동 스킵.
|
|
"""
|
|
oldest_in_db = get_oldest_ts(conn, ticker)
|
|
if oldest_in_db and oldest_in_db <= cutoff:
|
|
log.info(f"{ticker}: DB에 이미 {oldest_in_db.date()} 까지 있음 → 스킵")
|
|
return 0
|
|
|
|
# datetime.now()에서 시작해 cutoff까지 역방향 fetch
|
|
# 중복은 DB unique constraint + batcherrors가 처리
|
|
to_dt = datetime.now()
|
|
total = 0
|
|
batch_n = 0
|
|
|
|
if oldest_in_db:
|
|
log.info(f"{ticker}: DB 최솟값={oldest_in_db.date()}, {cutoff.date()} 까지 소급 시작")
|
|
else:
|
|
log.info(f"{ticker}: DB에 데이터 없음, {cutoff.date()} 까지 전체 fetch 시작")
|
|
|
|
while to_dt > cutoff:
|
|
# pyupbit의 to 파라미터는 UTC로 해석됨 — KST에서 9시간 빼서 전달
|
|
to_utc = to_dt - timedelta(hours=9)
|
|
to_str = to_utc.strftime('%Y-%m-%d %H:%M:%S')
|
|
try:
|
|
df = pyupbit.get_ohlcv(ticker, interval='minute1', count=BATCH, to=to_str)
|
|
time.sleep(DELAY)
|
|
except Exception as e:
|
|
log.warning(f"{ticker} API 오류: {e} → {RETRY_WAIT}s 후 재시도")
|
|
time.sleep(RETRY_WAIT)
|
|
continue
|
|
|
|
if df is None or len(df) == 0:
|
|
log.info(f"{ticker}: API 데이터 소진 ({to_str})")
|
|
break
|
|
|
|
rows = [
|
|
(ts.to_pydatetime(), float(r['open']), float(r['high']),
|
|
float(r['low']), float(r['close']), float(r['volume']))
|
|
for ts, r in df.iterrows()
|
|
]
|
|
n = insert_batch(conn, ticker, rows)
|
|
total += n
|
|
batch_n += 1
|
|
oldest = df.index[0].to_pydatetime()
|
|
|
|
if batch_n % 50 == 0:
|
|
log.info(f" {ticker} 배치{batch_n:04d}: {oldest.date()} | 신규 누적 {total:,}행")
|
|
|
|
to_dt = oldest - timedelta(minutes=1)
|
|
|
|
if oldest <= cutoff:
|
|
break
|
|
|
|
log.info(f"{ticker}: 완료 — 신규 {total:,}행 (배치 {batch_n}회)")
|
|
return total
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
parser.add_argument('--tickers', nargs='+', default=DEFAULT_TICKERS,
|
|
help='수집 티커 (예: KRW-BTC KRW-ETH)')
|
|
parser.add_argument('--days', type=int, default=730,
|
|
help='소급 일수 (기본 730일 = 2년)')
|
|
args = parser.parse_args()
|
|
|
|
cutoff = datetime.now() - timedelta(days=args.days)
|
|
log.info(f"=== 1분봉 히스토리 데몬 시작 ===")
|
|
log.info(f"대상: {args.tickers}")
|
|
log.info(f"목표: {cutoff.date()} ({args.days}일) 까지 소급")
|
|
|
|
conn = _get_conn()
|
|
grand_total = 0
|
|
t_start = time.time()
|
|
|
|
for ticker in args.tickers:
|
|
t0 = time.time()
|
|
try:
|
|
n = fetch_ticker(conn, ticker, cutoff)
|
|
grand_total += n
|
|
elapsed = time.time() - t0
|
|
log.info(f"{ticker}: {n:,}행 저장 ({elapsed/60:.1f}분)")
|
|
except Exception as e:
|
|
log.error(f"{ticker}: 오류 — {e}")
|
|
|
|
conn.close()
|
|
total_min = (time.time() - t_start) / 60
|
|
log.info(f"=== 완료: 총 {grand_total:,}행 / {total_min:.0f}분 ===")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|