fix: persist sell prices to DB and add WF filter bootstrap

- price_db: add sell_prices table (ensure/upsert/load/delete)
- trader: restore _last_sell_prices from DB on startup so re-entry
  block survives restarts; persist each sell price immediately
- market: retry chunk requests up to 3 times with backoff on 429

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-01 05:19:22 +09:00
parent 0b264b304c
commit d2a5c3ae9e
3 changed files with 86 additions and 5 deletions

View File

@@ -29,15 +29,27 @@ def get_top_tickers() -> list[str]:
if not all_tickers: if not all_tickers:
return [] return []
# 100개씩 나눠서 조회 (URL 길이 제한) # 100개씩 나눠서 조회 (URL 길이 제한, 429 재시도 포함)
chunk_size = 100 chunk_size = 100
ticker_data = [] ticker_data = []
for i in range(0, len(all_tickers), chunk_size): for i in range(0, len(all_tickers), chunk_size):
chunk = all_tickers[i:i + chunk_size] chunk = all_tickers[i:i + chunk_size]
params = {"markets": ",".join(chunk)} params = {"markets": ",".join(chunk)}
for attempt in range(3):
try:
resp = requests.get(_TICKER_URL, params=params, timeout=5) resp = requests.get(_TICKER_URL, params=params, timeout=5)
if resp.status_code == 429:
wait = 2 ** attempt # 1s → 2s → 4s
logger.warning(f"429 Rate Limit, {wait}s 대기 후 재시도 ({attempt+1}/3)")
time.sleep(wait)
continue
resp.raise_for_status() resp.raise_for_status()
ticker_data.extend(resp.json()) ticker_data.extend(resp.json())
break
except Exception as e:
if attempt == 2:
raise
time.sleep(1)
# 스테이블코인 제외 # 스테이블코인 제외
EXCLUDE = {"KRW-USDT", "KRW-USDC", "KRW-DAI", "KRW-BUSD"} EXCLUDE = {"KRW-USDT", "KRW-USDC", "KRW-DAI", "KRW-BUSD"}

View File

@@ -196,6 +196,56 @@ def load_recent_wins(ticker: str, n: int = 5) -> list[bool]:
return [bool(r[0]) for r in rows] return [bool(r[0]) for r in rows]
# ── 직전 매도가 영구 저장 (재시작 후 재매수 차단 유지용) ──────────────────────
def ensure_sell_prices_table() -> None:
"""sell_prices 테이블이 없으면 생성."""
ddl = """
CREATE TABLE sell_prices (
ticker VARCHAR2(20) NOT NULL PRIMARY KEY,
price NUMBER(20,8) NOT NULL,
updated_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
)
"""
with _conn() as conn:
try:
conn.cursor().execute(ddl)
except oracledb.DatabaseError as e:
if e.args[0].code != 955: # ORA-00955: 이미 존재
raise
def upsert_sell_price(ticker: str, price: float) -> None:
"""직전 매도가 저장 또는 갱신."""
sql = """
MERGE INTO sell_prices s
USING (SELECT :ticker AS ticker FROM dual) d
ON (s.ticker = d.ticker)
WHEN MATCHED THEN
UPDATE SET price = :price, updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
INSERT (ticker, price) VALUES (:ticker, :price)
"""
with _conn() as conn:
conn.cursor().execute(sql, {"ticker": ticker, "price": price})
def load_sell_prices() -> dict[str, float]:
"""저장된 직전 매도가 전체 로드."""
with _conn() as conn:
cur = conn.cursor()
cur.execute("SELECT ticker, price FROM sell_prices")
return {r[0]: float(r[1]) for r in cur.fetchall()}
def delete_sell_price(ticker: str) -> None:
"""매도가 기록 삭제 (더 이상 필요 없을 때)."""
with _conn() as conn:
conn.cursor().execute(
"DELETE FROM sell_prices WHERE ticker = :ticker", {"ticker": ticker}
)
def load_positions() -> list[dict]: def load_positions() -> list[dict]:
"""저장된 전체 포지션 로드.""" """저장된 전체 포지션 로드."""
sql = """ sql = """

View File

@@ -15,6 +15,7 @@ from .notify import notify_buy, notify_sell, notify_error
from .price_db import ( from .price_db import (
delete_position, load_positions, upsert_position, delete_position, load_positions, upsert_position,
ensure_trade_results_table, record_trade, load_recent_wins, ensure_trade_results_table, record_trade, load_recent_wins,
ensure_sell_prices_table, upsert_sell_price, load_sell_prices,
) )
load_dotenv() load_dotenv()
@@ -97,12 +98,26 @@ def restore_positions() -> None:
DB에 저장된 실제 매수가를 복원하고, Upbit 잔고에 없으면 DB에서도 삭제한다. DB에 저장된 실제 매수가를 복원하고, Upbit 잔고에 없으면 DB에서도 삭제한다.
""" """
# trade_results 테이블 초기화 # trade_results / sell_prices 테이블 초기화
try: try:
ensure_trade_results_table() ensure_trade_results_table()
except Exception as e: except Exception as e:
logger.warning(f"trade_results 테이블 생성 실패 (무시): {e}") logger.warning(f"trade_results 테이블 생성 실패 (무시): {e}")
try:
ensure_sell_prices_table()
except Exception as e:
logger.warning(f"sell_prices 테이블 생성 실패 (무시): {e}")
# 직전 매도가 복원 (재매수 차단 기준 유지)
try:
loaded = load_sell_prices()
_last_sell_prices.update(loaded)
if loaded:
logger.info(f"[복원] 직전 매도가 {len(loaded)}건 복원: {list(loaded.keys())}")
except Exception as e:
logger.warning(f"직전 매도가 복원 실패 (무시): {e}")
# DB에서 저장된 포지션 로드 # DB에서 저장된 포지션 로드
try: try:
saved = {row["ticker"]: row for row in load_positions()} saved = {row["ticker"]: row for row in load_positions()}
@@ -291,6 +306,10 @@ def sell(ticker: str, reason: str = "") -> bool:
notify_sell(ticker, current, pnl, reason) notify_sell(ticker, current, pnl, reason)
if current: if current:
_last_sell_prices[ticker] = current # 재매수 기준가 기록 _last_sell_prices[ticker] = current # 재매수 기준가 기록
try:
upsert_sell_price(ticker, current) # DB 영구 저장
except Exception as e:
logger.error(f"직전 매도가 DB 저장 실패 {ticker}: {e}")
_update_history(ticker, pnl > 0, pnl) # walk-forward 이력 갱신 _update_history(ticker, pnl > 0, pnl) # walk-forward 이력 갱신
del _positions[ticker] del _positions[ticker]
try: try: