- price_db: add sell_prices table (ensure/upsert/load/delete) - trader: restore _last_sell_prices from DB on startup so re-entry block survives restarts; persist each sell price immediately - market: retry chunk requests up to 3 times with backoff on 429 Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
271 lines
8.9 KiB
Python
271 lines
8.9 KiB
Python
"""Oracle ADB price_history CRUD."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import os
|
|
from contextlib import contextmanager
|
|
from typing import Generator, Optional
|
|
|
|
import oracledb
|
|
|
|
_pool: Optional[oracledb.ConnectionPool] = None
|
|
|
|
|
|
def _get_pool() -> oracledb.ConnectionPool:
|
|
global _pool
|
|
if _pool is None:
|
|
kwargs: dict = dict(
|
|
user=os.environ["ORACLE_USER"],
|
|
password=os.environ["ORACLE_PASSWORD"],
|
|
dsn=os.environ["ORACLE_DSN"],
|
|
min=1,
|
|
max=3,
|
|
increment=1,
|
|
)
|
|
wallet = os.environ.get("ORACLE_WALLET")
|
|
if wallet:
|
|
kwargs["config_dir"] = wallet
|
|
_pool = oracledb.create_pool(**kwargs)
|
|
return _pool
|
|
|
|
|
|
@contextmanager
|
|
def _conn() -> Generator[oracledb.Connection, None, None]:
|
|
pool = _get_pool()
|
|
conn = pool.acquire()
|
|
try:
|
|
yield conn
|
|
conn.commit()
|
|
except Exception:
|
|
conn.rollback()
|
|
raise
|
|
finally:
|
|
pool.release(conn)
|
|
|
|
|
|
def insert_prices(ticker_prices: dict[str, float]) -> None:
|
|
"""여러 종목의 현재가를 한 번에 저장 (recorded_at = 현재 시각)."""
|
|
if not ticker_prices:
|
|
return
|
|
rows = [(ticker, price) for ticker, price in ticker_prices.items()]
|
|
sql = "INSERT INTO price_history (ticker, price) VALUES (:1, :2)"
|
|
with _conn() as conn:
|
|
conn.cursor().executemany(sql, rows)
|
|
|
|
|
|
def insert_prices_with_time(rows: list[tuple]) -> None:
|
|
"""(ticker, price, recorded_at) 튜플 리스트를 한 번에 저장 (백필용)."""
|
|
if not rows:
|
|
return
|
|
sql = """
|
|
INSERT INTO price_history (ticker, price, recorded_at)
|
|
VALUES (:1, :2, :3)
|
|
"""
|
|
with _conn() as conn:
|
|
conn.cursor().executemany(sql, rows)
|
|
|
|
|
|
def get_price_n_hours_ago(ticker: str, hours: float) -> Optional[float]:
|
|
"""N시간 전 가장 가까운 가격 반환. 데이터 없으면 None."""
|
|
sql = """
|
|
SELECT price FROM price_history
|
|
WHERE ticker = :ticker
|
|
AND recorded_at BETWEEN
|
|
SYSTIMESTAMP - INTERVAL ':h' HOUR - INTERVAL '10' MINUTE
|
|
AND SYSTIMESTAMP - INTERVAL ':h' HOUR + INTERVAL '10' MINUTE
|
|
ORDER BY ABS(CAST(recorded_at AS DATE) -
|
|
CAST(SYSTIMESTAMP - INTERVAL ':h' HOUR AS DATE))
|
|
FETCH FIRST 1 ROWS ONLY
|
|
"""
|
|
# Oracle INTERVAL bind param 미지원으로 직접 포맷
|
|
h = int(hours)
|
|
sql = f"""
|
|
SELECT price FROM price_history
|
|
WHERE ticker = :ticker
|
|
AND recorded_at BETWEEN
|
|
SYSTIMESTAMP - ({h}/24) - (10/1440)
|
|
AND SYSTIMESTAMP - ({h}/24) + (10/1440)
|
|
ORDER BY ABS(CAST(recorded_at AS DATE) -
|
|
CAST(SYSTIMESTAMP - ({h}/24) AS DATE))
|
|
FETCH FIRST 1 ROWS ONLY
|
|
"""
|
|
with _conn() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql, {"ticker": ticker})
|
|
row = cursor.fetchone()
|
|
return float(row[0]) if row else None
|
|
|
|
|
|
def cleanup_old_prices(keep_hours: int = 48) -> None:
|
|
"""N시간 이상 오래된 데이터 삭제 (DB 용량 관리)."""
|
|
sql = f"DELETE FROM price_history WHERE recorded_at < SYSTIMESTAMP - ({keep_hours}/24)"
|
|
with _conn() as conn:
|
|
conn.cursor().execute(sql)
|
|
|
|
|
|
# ── 포지션 영구 저장 (재시작 후 실제 매수가 복원용) ──────────────────────────
|
|
|
|
def upsert_position(
|
|
ticker: str,
|
|
buy_price: float,
|
|
peak_price: float,
|
|
amount: float,
|
|
invested_krw: int,
|
|
entry_time: str, # ISO 포맷 문자열
|
|
) -> None:
|
|
"""포지션 저장 또는 갱신 (MERGE)."""
|
|
sql = """
|
|
MERGE INTO positions p
|
|
USING (SELECT :ticker AS ticker FROM dual) s
|
|
ON (p.ticker = s.ticker)
|
|
WHEN MATCHED THEN
|
|
UPDATE SET peak_price = :peak_price,
|
|
amount = :amount,
|
|
invested_krw = :invested_krw,
|
|
updated_at = SYSTIMESTAMP
|
|
WHEN NOT MATCHED THEN
|
|
INSERT (ticker, buy_price, peak_price, amount, invested_krw, entry_time)
|
|
VALUES (:ticker, :buy_price, :peak_price, :amount, :invested_krw,
|
|
TO_TIMESTAMP(:entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6'))
|
|
"""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(sql, {
|
|
"ticker": ticker,
|
|
"buy_price": buy_price,
|
|
"peak_price": peak_price,
|
|
"amount": amount,
|
|
"invested_krw": invested_krw,
|
|
"entry_time": entry_time,
|
|
})
|
|
|
|
|
|
def delete_position(ticker: str) -> None:
|
|
"""포지션 삭제 (매도 완료 시)."""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(
|
|
"DELETE FROM positions WHERE ticker = :ticker", {"ticker": ticker}
|
|
)
|
|
|
|
|
|
# ── Walk-forward 거래 이력 ────────────────────────────────────────────────────
|
|
|
|
def ensure_trade_results_table() -> None:
|
|
"""trade_results 테이블이 없으면 생성."""
|
|
ddl = """
|
|
CREATE TABLE trade_results (
|
|
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
|
|
ticker VARCHAR2(20) NOT NULL,
|
|
is_win NUMBER(1) NOT NULL,
|
|
pnl_pct NUMBER(10,4),
|
|
traded_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
|
|
)
|
|
"""
|
|
idx = "CREATE INDEX idx_tr_ticker ON trade_results (ticker, traded_at DESC)"
|
|
with _conn() as conn:
|
|
for sql in (ddl, idx):
|
|
try:
|
|
conn.cursor().execute(sql)
|
|
except oracledb.DatabaseError as e:
|
|
if e.args[0].code not in (955, 1408):
|
|
raise
|
|
|
|
|
|
def record_trade(ticker: str, is_win: bool, pnl_pct: float) -> None:
|
|
"""거래 결과 저장."""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(
|
|
"INSERT INTO trade_results (ticker, is_win, pnl_pct) VALUES (:t, :w, :p)",
|
|
{"t": ticker, "w": 1 if is_win else 0, "p": round(pnl_pct, 4)},
|
|
)
|
|
|
|
|
|
def load_recent_wins(ticker: str, n: int = 5) -> list[bool]:
|
|
"""직전 N건 거래의 승/패 리스트 반환 (오래된 순). 없으면 빈 리스트."""
|
|
sql = """
|
|
SELECT is_win FROM (
|
|
SELECT is_win FROM trade_results
|
|
WHERE ticker = :t
|
|
ORDER BY traded_at DESC
|
|
FETCH FIRST :n ROWS ONLY
|
|
) ORDER BY ROWNUM DESC
|
|
"""
|
|
with _conn() as conn:
|
|
cur = conn.cursor()
|
|
cur.execute(sql, {"t": ticker, "n": n})
|
|
rows = cur.fetchall()
|
|
return [bool(r[0]) for r in rows]
|
|
|
|
|
|
# ── 직전 매도가 영구 저장 (재시작 후 재매수 차단 유지용) ──────────────────────
|
|
|
|
def ensure_sell_prices_table() -> None:
|
|
"""sell_prices 테이블이 없으면 생성."""
|
|
ddl = """
|
|
CREATE TABLE sell_prices (
|
|
ticker VARCHAR2(20) NOT NULL PRIMARY KEY,
|
|
price NUMBER(20,8) NOT NULL,
|
|
updated_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
|
|
)
|
|
"""
|
|
with _conn() as conn:
|
|
try:
|
|
conn.cursor().execute(ddl)
|
|
except oracledb.DatabaseError as e:
|
|
if e.args[0].code != 955: # ORA-00955: 이미 존재
|
|
raise
|
|
|
|
|
|
def upsert_sell_price(ticker: str, price: float) -> None:
|
|
"""직전 매도가 저장 또는 갱신."""
|
|
sql = """
|
|
MERGE INTO sell_prices s
|
|
USING (SELECT :ticker AS ticker FROM dual) d
|
|
ON (s.ticker = d.ticker)
|
|
WHEN MATCHED THEN
|
|
UPDATE SET price = :price, updated_at = SYSTIMESTAMP
|
|
WHEN NOT MATCHED THEN
|
|
INSERT (ticker, price) VALUES (:ticker, :price)
|
|
"""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(sql, {"ticker": ticker, "price": price})
|
|
|
|
|
|
def load_sell_prices() -> dict[str, float]:
|
|
"""저장된 직전 매도가 전체 로드."""
|
|
with _conn() as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT ticker, price FROM sell_prices")
|
|
return {r[0]: float(r[1]) for r in cur.fetchall()}
|
|
|
|
|
|
def delete_sell_price(ticker: str) -> None:
|
|
"""매도가 기록 삭제 (더 이상 필요 없을 때)."""
|
|
with _conn() as conn:
|
|
conn.cursor().execute(
|
|
"DELETE FROM sell_prices WHERE ticker = :ticker", {"ticker": ticker}
|
|
)
|
|
|
|
|
|
def load_positions() -> list[dict]:
|
|
"""저장된 전체 포지션 로드."""
|
|
sql = """
|
|
SELECT ticker, buy_price, peak_price, amount, invested_krw,
|
|
TO_CHAR(entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6') AS entry_time
|
|
FROM positions
|
|
"""
|
|
with _conn() as conn:
|
|
cursor = conn.cursor()
|
|
cursor.execute(sql)
|
|
rows = cursor.fetchall()
|
|
return [
|
|
{
|
|
"ticker": r[0],
|
|
"buy_price": float(r[1]),
|
|
"peak_price": float(r[2]),
|
|
"amount": float(r[3]),
|
|
"invested_krw": int(r[4]),
|
|
"entry_time": r[5],
|
|
}
|
|
for r in rows
|
|
]
|