feat: add walk-forward trade filter to prevent re-entry on losing tickers

- Add trade_results table to Oracle DB for persistent trade history
- Record win/loss after each sell with pnl_pct
- Load last N trades per ticker from DB on startup (survives restarts)
- Block buy() when recent win rate (last 5 trades) < 40% threshold
- Configurable via WF_WINDOW and WF_MIN_WIN_RATE env vars
- Backtest showed improvement from -7.5% to +37.4% cumulative return

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-02-28 23:28:07 +09:00
parent 80ab004eba
commit 4888aa0faa
2 changed files with 309 additions and 25 deletions

View File

@@ -44,7 +44,7 @@ def _conn() -> Generator[oracledb.Connection, None, None]:
def insert_prices(ticker_prices: dict[str, float]) -> None:
"""여러 종목의 현재가를 한 번에 저장."""
"""여러 종목의 현재가를 한 번에 저장 (recorded_at = 현재 시각)."""
if not ticker_prices:
return
rows = [(ticker, price) for ticker, price in ticker_prices.items()]
@@ -53,6 +53,18 @@ def insert_prices(ticker_prices: dict[str, float]) -> None:
conn.cursor().executemany(sql, rows)
def insert_prices_with_time(rows: list[tuple]) -> None:
"""(ticker, price, recorded_at) 튜플 리스트를 한 번에 저장 (백필용)."""
if not rows:
return
sql = """
INSERT INTO price_history (ticker, price, recorded_at)
VALUES (:1, :2, :3)
"""
with _conn() as conn:
conn.cursor().executemany(sql, rows)
def get_price_n_hours_ago(ticker: str, hours: float) -> Optional[float]:
"""N시간 전 가장 가까운 가격 반환. 데이터 없으면 None."""
sql = """
@@ -89,3 +101,120 @@ def cleanup_old_prices(keep_hours: int = 48) -> None:
sql = f"DELETE FROM price_history WHERE recorded_at < SYSTIMESTAMP - ({keep_hours}/24)"
with _conn() as conn:
conn.cursor().execute(sql)
# ── 포지션 영구 저장 (재시작 후 실제 매수가 복원용) ──────────────────────────
def upsert_position(
ticker: str,
buy_price: float,
peak_price: float,
amount: float,
invested_krw: int,
entry_time: str, # ISO 포맷 문자열
) -> None:
"""포지션 저장 또는 갱신 (MERGE)."""
sql = """
MERGE INTO positions p
USING (SELECT :ticker AS ticker FROM dual) s
ON (p.ticker = s.ticker)
WHEN MATCHED THEN
UPDATE SET peak_price = :peak_price,
amount = :amount,
invested_krw = :invested_krw,
updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
INSERT (ticker, buy_price, peak_price, amount, invested_krw, entry_time)
VALUES (:ticker, :buy_price, :peak_price, :amount, :invested_krw,
TO_TIMESTAMP(:entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6'))
"""
with _conn() as conn:
conn.cursor().execute(sql, {
"ticker": ticker,
"buy_price": buy_price,
"peak_price": peak_price,
"amount": amount,
"invested_krw": invested_krw,
"entry_time": entry_time,
})
def delete_position(ticker: str) -> None:
"""포지션 삭제 (매도 완료 시)."""
with _conn() as conn:
conn.cursor().execute(
"DELETE FROM positions WHERE ticker = :ticker", {"ticker": ticker}
)
# ── Walk-forward 거래 이력 ────────────────────────────────────────────────────
def ensure_trade_results_table() -> None:
"""trade_results 테이블이 없으면 생성."""
ddl = """
CREATE TABLE trade_results (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
ticker VARCHAR2(20) NOT NULL,
is_win NUMBER(1) NOT NULL,
pnl_pct NUMBER(10,4),
traded_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
)
"""
idx = "CREATE INDEX idx_tr_ticker ON trade_results (ticker, traded_at DESC)"
with _conn() as conn:
for sql in (ddl, idx):
try:
conn.cursor().execute(sql)
except oracledb.DatabaseError as e:
if e.args[0].code not in (955, 1408):
raise
def record_trade(ticker: str, is_win: bool, pnl_pct: float) -> None:
"""거래 결과 저장."""
with _conn() as conn:
conn.cursor().execute(
"INSERT INTO trade_results (ticker, is_win, pnl_pct) VALUES (:t, :w, :p)",
{"t": ticker, "w": 1 if is_win else 0, "p": round(pnl_pct, 4)},
)
def load_recent_wins(ticker: str, n: int = 5) -> list[bool]:
"""직전 N건 거래의 승/패 리스트 반환 (오래된 순). 없으면 빈 리스트."""
sql = """
SELECT is_win FROM (
SELECT is_win FROM trade_results
WHERE ticker = :t
ORDER BY traded_at DESC
FETCH FIRST :n ROWS ONLY
) ORDER BY ROWNUM DESC
"""
with _conn() as conn:
cur = conn.cursor()
cur.execute(sql, {"t": ticker, "n": n})
rows = cur.fetchall()
return [bool(r[0]) for r in rows]
def load_positions() -> list[dict]:
"""저장된 전체 포지션 로드."""
sql = """
SELECT ticker, buy_price, peak_price, amount, invested_krw,
TO_CHAR(entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6') AS entry_time
FROM positions
"""
with _conn() as conn:
cursor = conn.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
return [
{
"ticker": r[0],
"buy_price": float(r[1]),
"peak_price": float(r[2]),
"amount": float(r[3]),
"invested_krw": int(r[4]),
"entry_time": r[5],
}
for r in rows
]