refactor: reorganize project structure into tests/, data/, logs/
- Move all backtest/simulation scripts to tests/ - Add sys.path.insert to each script for correct import resolution - Move pkl cache files to data/ (git-ignored) - Move log files to logs/ (git-ignored) - Update main.py: trading.log path → logs/trading.log - Add ecosystem.config.js: pm2 log paths → logs/pm2*.log - Update .gitignore: ignore data/ and logs/ instead of *.pkl/*.log - core/fng.py: increase cache TTL 3600→86400s (API updates daily at KST 09:00) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
216
tests/ohlcv_db.py
Normal file
216
tests/ohlcv_db.py
Normal file
@@ -0,0 +1,216 @@
|
||||
"""OHLCV 시계열 캐시 — Oracle ADB ohlcv_hourly 테이블.
|
||||
|
||||
기능:
|
||||
- 테이블 생성 (없으면)
|
||||
- pkl → DB 최초 적재
|
||||
- DB → DataFrame dict 로드 (시뮬용)
|
||||
- 증분 업데이트 (신규 봉만 API 페치)
|
||||
"""
|
||||
|
||||
import os as _os, sys as _sys
|
||||
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
import pyupbit
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv(dotenv_path=Path(__file__).parent / ".env")
|
||||
|
||||
from core.price_db import _conn
|
||||
|
||||
# ── DDL ───────────────────────────────────────────────
|
||||
_DDL = """
|
||||
CREATE TABLE ohlcv_hourly (
|
||||
ticker VARCHAR2(20) NOT NULL,
|
||||
candle_time TIMESTAMP NOT NULL,
|
||||
open_price NUMBER(20,8) NOT NULL,
|
||||
high_price NUMBER(20,8) NOT NULL,
|
||||
low_price NUMBER(20,8) NOT NULL,
|
||||
close_price NUMBER(20,8) NOT NULL,
|
||||
volume NUMBER(30,8) NOT NULL,
|
||||
CONSTRAINT pk_ohlcv PRIMARY KEY (ticker, candle_time)
|
||||
)
|
||||
"""
|
||||
|
||||
|
||||
def ensure_table() -> None:
|
||||
with _conn() as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute("SELECT COUNT(*) FROM user_tables WHERE table_name='OHLCV_HOURLY'")
|
||||
if cur.fetchone()[0] == 0:
|
||||
conn.cursor().execute(_DDL)
|
||||
print("ohlcv_hourly 테이블 생성 완료")
|
||||
else:
|
||||
print("ohlcv_hourly 테이블 이미 존재")
|
||||
|
||||
|
||||
# ── 적재 ──────────────────────────────────────────────
|
||||
def insert_df(ticker: str, df: pd.DataFrame, batch: int = 500) -> int:
|
||||
"""DataFrame → ohlcv_hourly 배치 삽입.
|
||||
|
||||
신규 레코드만 삽입: 기존 candle_time 조회 후 Python에서 필터링.
|
||||
"""
|
||||
sql_existing = """
|
||||
SELECT candle_time FROM ohlcv_hourly
|
||||
WHERE ticker = :1
|
||||
"""
|
||||
sql_insert = """
|
||||
INSERT INTO ohlcv_hourly
|
||||
(ticker, candle_time, open_price, high_price, low_price, close_price, volume)
|
||||
VALUES (:1, :2, :3, :4, :5, :6, :7)
|
||||
"""
|
||||
|
||||
rows = [
|
||||
(
|
||||
ticker,
|
||||
row.name.to_pydatetime().replace(tzinfo=None),
|
||||
float(row["open"]),
|
||||
float(row["high"]),
|
||||
float(row["low"]),
|
||||
float(row["close"]),
|
||||
float(row["volume"]),
|
||||
)
|
||||
for _, row in df.iterrows()
|
||||
]
|
||||
|
||||
with _conn() as conn:
|
||||
cur = conn.cursor()
|
||||
# 기존 candle_time 조회 → 중복 제거
|
||||
cur.execute(sql_existing, [ticker])
|
||||
existing = {r[0].replace(tzinfo=None) for r in cur.fetchall()}
|
||||
new_rows = [r for r in rows if r[1] not in existing]
|
||||
|
||||
if not new_rows:
|
||||
return 0
|
||||
|
||||
for i in range(0, len(new_rows), batch):
|
||||
cur.executemany(sql_insert, new_rows[i : i + batch])
|
||||
|
||||
return len(new_rows)
|
||||
|
||||
|
||||
def load_from_pkl(pkl_path: str | Path) -> None:
|
||||
"""pkl 파일의 모든 종목을 DB에 적재."""
|
||||
pkl_path = Path(pkl_path)
|
||||
data = pickle.load(open(pkl_path, "rb"))
|
||||
ensure_table()
|
||||
total = 0
|
||||
for ticker, df in data.items():
|
||||
n = insert_df(ticker, df)
|
||||
total += n
|
||||
print(f" {ticker}: {n}건 적재")
|
||||
print(f"\n총 {total:,}건 적재 완료")
|
||||
|
||||
|
||||
# ── 로드 ──────────────────────────────────────────────
|
||||
def load_from_db(tickers: list[str], from_date: str = "2025-03-02") -> dict:
|
||||
"""DB → {ticker: DataFrame} 반환 (시뮬용)."""
|
||||
from_dt = datetime.strptime(from_date, "%Y-%m-%d")
|
||||
data = {}
|
||||
sql = """
|
||||
SELECT candle_time, open_price, high_price, low_price, close_price, volume
|
||||
FROM ohlcv_hourly
|
||||
WHERE ticker = :1 AND candle_time >= :2
|
||||
ORDER BY candle_time
|
||||
"""
|
||||
with _conn() as conn:
|
||||
for ticker in tickers:
|
||||
cur = conn.cursor()
|
||||
cur.execute(sql, [ticker, from_dt])
|
||||
rows = cur.fetchall()
|
||||
if not rows:
|
||||
continue
|
||||
df = pd.DataFrame(
|
||||
rows,
|
||||
columns=["candle_time", "open", "high", "low", "close", "volume"],
|
||||
)
|
||||
df.set_index("candle_time", inplace=True)
|
||||
df.index = pd.to_datetime(df.index)
|
||||
data[ticker] = df
|
||||
return data
|
||||
|
||||
|
||||
# ── 증분 업데이트 ──────────────────────────────────────
|
||||
def update_incremental(tickers: list[str]) -> None:
|
||||
"""각 종목의 최신 봉 이후 데이터를 API에서 가져와 적재."""
|
||||
sql_max = "SELECT MAX(candle_time) FROM ohlcv_hourly WHERE ticker = :1"
|
||||
|
||||
for ticker in tickers:
|
||||
with _conn() as conn:
|
||||
cur = conn.cursor()
|
||||
cur.execute(sql_max, [ticker])
|
||||
row = cur.fetchone()
|
||||
|
||||
latest = row[0] if row and row[0] else None
|
||||
|
||||
if latest:
|
||||
to_dt = None # 최신까지 fetch
|
||||
kwargs: dict = dict(ticker=ticker, interval="minute60", count=200)
|
||||
df = pyupbit.get_ohlcv(**kwargs)
|
||||
if df is None or df.empty:
|
||||
continue
|
||||
df.index = df.index.tz_localize(None)
|
||||
# latest 이후만 삽입
|
||||
new_df = df[df.index > latest.replace(tzinfo=None)]
|
||||
if new_df.empty:
|
||||
print(f" {ticker}: 신규 봉 없음")
|
||||
continue
|
||||
n = insert_df(ticker, new_df)
|
||||
print(f" {ticker}: +{n}봉 추가")
|
||||
else:
|
||||
print(f" {ticker}: DB에 없음, 전체 로드 필요")
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
|
||||
# ── CLI ───────────────────────────────────────────────
|
||||
if __name__ == "__main__":
|
||||
import sys
|
||||
|
||||
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
|
||||
|
||||
if cmd == "init":
|
||||
# pkl → DB 최초 적재
|
||||
pkl = sys.argv[2] if len(sys.argv) > 2 else "vol_lead_cache_365.pkl"
|
||||
print(f"pkl 적재: {pkl}")
|
||||
load_from_pkl(pkl)
|
||||
|
||||
elif cmd == "update":
|
||||
# 증분 업데이트
|
||||
import pickle as _pk
|
||||
top30 = _pk.load(open("top30_tickers.pkl", "rb"))
|
||||
print("증분 업데이트...")
|
||||
update_incremental(top30)
|
||||
|
||||
elif cmd == "status":
|
||||
# 종목별 레코드 수 확인
|
||||
with _conn() as conn:
|
||||
cur = conn.cursor()
|
||||
try:
|
||||
cur.execute("""
|
||||
SELECT ticker, COUNT(*), MIN(candle_time), MAX(candle_time)
|
||||
FROM ohlcv_hourly
|
||||
GROUP BY ticker
|
||||
ORDER BY ticker
|
||||
""")
|
||||
rows = cur.fetchall()
|
||||
if rows:
|
||||
print(f"{'종목':<16} {'봉수':>6} {'시작':^12} {'종료':^12}")
|
||||
print("-" * 52)
|
||||
for r in rows:
|
||||
print(f"{r[0]:<16} {r[1]:>6}봉 "
|
||||
f"{r[2].strftime('%y-%m-%d'):^12} "
|
||||
f"{r[3].strftime('%y-%m-%d'):^12}")
|
||||
print(f"\n총 {sum(r[1] for r in rows):,}봉 / {len(rows)}종목")
|
||||
else:
|
||||
print("ohlcv_hourly 테이블이 비어 있거나 없음")
|
||||
except Exception as e:
|
||||
print(f"오류: {e}")
|
||||
Reference in New Issue
Block a user