diff --git a/core/price_db.py b/core/price_db.py index b762124..7159854 100644 --- a/core/price_db.py +++ b/core/price_db.py @@ -44,7 +44,7 @@ def _conn() -> Generator[oracledb.Connection, None, None]: def insert_prices(ticker_prices: dict[str, float]) -> None: - """여러 종목의 현재가를 한 번에 저장.""" + """여러 종목의 현재가를 한 번에 저장 (recorded_at = 현재 시각).""" if not ticker_prices: return rows = [(ticker, price) for ticker, price in ticker_prices.items()] @@ -53,6 +53,18 @@ def insert_prices(ticker_prices: dict[str, float]) -> None: conn.cursor().executemany(sql, rows) +def insert_prices_with_time(rows: list[tuple]) -> None: + """(ticker, price, recorded_at) 튜플 리스트를 한 번에 저장 (백필용).""" + if not rows: + return + sql = """ + INSERT INTO price_history (ticker, price, recorded_at) + VALUES (:1, :2, :3) + """ + with _conn() as conn: + conn.cursor().executemany(sql, rows) + + def get_price_n_hours_ago(ticker: str, hours: float) -> Optional[float]: """N시간 전 가장 가까운 가격 반환. 데이터 없으면 None.""" sql = """ @@ -89,3 +101,120 @@ def cleanup_old_prices(keep_hours: int = 48) -> None: sql = f"DELETE FROM price_history WHERE recorded_at < SYSTIMESTAMP - ({keep_hours}/24)" with _conn() as conn: conn.cursor().execute(sql) + + +# ── 포지션 영구 저장 (재시작 후 실제 매수가 복원용) ────────────────────────── + +def upsert_position( + ticker: str, + buy_price: float, + peak_price: float, + amount: float, + invested_krw: int, + entry_time: str, # ISO 포맷 문자열 +) -> None: + """포지션 저장 또는 갱신 (MERGE).""" + sql = """ + MERGE INTO positions p + USING (SELECT :ticker AS ticker FROM dual) s + ON (p.ticker = s.ticker) + WHEN MATCHED THEN + UPDATE SET peak_price = :peak_price, + amount = :amount, + invested_krw = :invested_krw, + updated_at = SYSTIMESTAMP + WHEN NOT MATCHED THEN + INSERT (ticker, buy_price, peak_price, amount, invested_krw, entry_time) + VALUES (:ticker, :buy_price, :peak_price, :amount, :invested_krw, + TO_TIMESTAMP(:entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6')) + """ + with _conn() as conn: + conn.cursor().execute(sql, { + "ticker": ticker, + "buy_price": buy_price, + "peak_price": peak_price, + "amount": amount, + "invested_krw": invested_krw, + "entry_time": entry_time, + }) + + +def delete_position(ticker: str) -> None: + """포지션 삭제 (매도 완료 시).""" + with _conn() as conn: + conn.cursor().execute( + "DELETE FROM positions WHERE ticker = :ticker", {"ticker": ticker} + ) + + +# ── Walk-forward 거래 이력 ──────────────────────────────────────────────────── + +def ensure_trade_results_table() -> None: + """trade_results 테이블이 없으면 생성.""" + ddl = """ + CREATE TABLE trade_results ( + id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + ticker VARCHAR2(20) NOT NULL, + is_win NUMBER(1) NOT NULL, + pnl_pct NUMBER(10,4), + traded_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL + ) + """ + idx = "CREATE INDEX idx_tr_ticker ON trade_results (ticker, traded_at DESC)" + with _conn() as conn: + for sql in (ddl, idx): + try: + conn.cursor().execute(sql) + except oracledb.DatabaseError as e: + if e.args[0].code not in (955, 1408): + raise + + +def record_trade(ticker: str, is_win: bool, pnl_pct: float) -> None: + """거래 결과 저장.""" + with _conn() as conn: + conn.cursor().execute( + "INSERT INTO trade_results (ticker, is_win, pnl_pct) VALUES (:t, :w, :p)", + {"t": ticker, "w": 1 if is_win else 0, "p": round(pnl_pct, 4)}, + ) + + +def load_recent_wins(ticker: str, n: int = 5) -> list[bool]: + """직전 N건 거래의 승/패 리스트 반환 (오래된 순). 없으면 빈 리스트.""" + sql = """ + SELECT is_win FROM ( + SELECT is_win FROM trade_results + WHERE ticker = :t + ORDER BY traded_at DESC + FETCH FIRST :n ROWS ONLY + ) ORDER BY ROWNUM DESC + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"t": ticker, "n": n}) + rows = cur.fetchall() + return [bool(r[0]) for r in rows] + + +def load_positions() -> list[dict]: + """저장된 전체 포지션 로드.""" + sql = """ + SELECT ticker, buy_price, peak_price, amount, invested_krw, + TO_CHAR(entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6') AS entry_time + FROM positions + """ + with _conn() as conn: + cursor = conn.cursor() + cursor.execute(sql) + rows = cursor.fetchall() + return [ + { + "ticker": r[0], + "buy_price": float(r[1]), + "peak_price": float(r[2]), + "amount": float(r[3]), + "invested_krw": int(r[4]), + "entry_time": r[5], + } + for r in rows + ] diff --git a/core/trader.py b/core/trader.py index 32d7818..45f7c3c 100644 --- a/core/trader.py +++ b/core/trader.py @@ -12,19 +12,33 @@ from typing import Optional import pyupbit from dotenv import load_dotenv from .notify import notify_buy, notify_sell, notify_error +from .price_db import ( + delete_position, load_positions, upsert_position, + ensure_trade_results_table, record_trade, load_recent_wins, +) load_dotenv() logger = logging.getLogger(__name__) -MAX_BUDGET = 1_000_000 # 총 운용 한도: 100만원 -MAX_POSITIONS = 3 # 최대 동시 보유 종목 수 -PER_POSITION = MAX_BUDGET // MAX_POSITIONS # 종목당 33만3천원 +MAX_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 총 운용 한도 +MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수 +PER_POSITION = MAX_BUDGET // MAX_POSITIONS # 종목당 투자금 + +# Walk-forward 필터 설정 +WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기 +WF_MIN_WIN_RATE = float(os.getenv("WF_MIN_WIN_RATE", "0.40")) # 최소 승률 임계값 _lock = threading.Lock() _positions: dict = {} # 구조: { ticker: { buy_price, peak_price, amount, invested_krw, entry_time } } +_last_sell_prices: dict[str, float] = {} +# 직전 매도가 기록 — 재매수 시 이 가격 이상일 때만 진입 허용 + +_trade_history: dict[str, list[bool]] = {} +# walk-forward 이력: { ticker: [True/False, ...] } (True=수익) + _upbit: Optional[pyupbit.Upbit] = None @@ -35,14 +49,71 @@ def _get_upbit() -> pyupbit.Upbit: return _upbit +def _get_history(ticker: str) -> list[bool]: + """in-memory 이력 반환. 없으면 DB에서 초기 로드.""" + if ticker not in _trade_history: + try: + _trade_history[ticker] = load_recent_wins(ticker, WF_WINDOW) + except Exception: + _trade_history[ticker] = [] + return _trade_history[ticker] + + +def _update_history(ticker: str, is_win: bool, pnl_pct: float) -> None: + """매도 후 in-memory 이력 갱신 + DB 기록.""" + hist = _trade_history.setdefault(ticker, []) + hist.append(is_win) + # 윈도우 초과분 제거 (메모리 절약) + if len(hist) > WF_WINDOW * 2: + _trade_history[ticker] = hist[-WF_WINDOW:] + try: + record_trade(ticker, is_win, pnl_pct) + except Exception as e: + logger.error(f"거래 이력 저장 실패 {ticker}: {e}") + + +def _db_upsert(ticker: str, pos: dict) -> None: + """포지션을 Oracle DB에 저장 (실패해도 거래는 계속).""" + try: + upsert_position( + ticker=ticker, + buy_price=pos["buy_price"], + peak_price=pos["peak_price"], + amount=pos["amount"], + invested_krw=pos["invested_krw"], + entry_time=pos["entry_time"].isoformat(), + ) + except Exception as e: + logger.error(f"포지션 DB 저장 실패 {ticker}: {e}") + + def get_positions() -> dict: return _positions def restore_positions() -> None: - """시작 시 Upbit 실제 잔고를 읽어 포지션 복원 (재시작 이중 매수 방지).""" + """시작 시 Oracle DB + Upbit 잔고를 교차 확인하여 포지션 복원. + trade_results 테이블도 이 시점에 생성 (없으면). + + DB에 저장된 실제 매수가를 복원하고, Upbit 잔고에 없으면 DB에서도 삭제한다. + """ + # trade_results 테이블 초기화 + try: + ensure_trade_results_table() + except Exception as e: + logger.warning(f"trade_results 테이블 생성 실패 (무시): {e}") + + # DB에서 저장된 포지션 로드 + try: + saved = {row["ticker"]: row for row in load_positions()} + except Exception as e: + logger.error(f"DB 포지션 로드 실패: {e}") + saved = {} + upbit = _get_upbit() balances = upbit.get_balances() + upbit_tickers = set() + for b in balances: currency = b["currency"] if currency == "KRW": @@ -57,18 +128,51 @@ def restore_positions() -> None: invested_krw = int(amount * current) if invested_krw < 1_000: # 소액 잔고 무시 continue - with _lock: - _positions[ticker] = { - "buy_price": current, # 정확한 매수가 불명 → 현재가로 초기화 - "peak_price": current, - "amount": amount, - "invested_krw": min(invested_krw, PER_POSITION), - "entry_time": datetime.now(), - } - logger.info( - f"[복원] {ticker} 수량={amount} | 현재가={current:,.0f}원 " - f"(재시작 시 복원, 매수가 불명으로 현재가 기준)" - ) + + upbit_tickers.add(ticker) + + if ticker in saved: + # DB에 저장된 실제 매수가 복원 + s = saved[ticker] + peak = max(s["peak_price"], current) # 재시작 중 올랐을 수 있으므로 높은 쪽 + entry_time = datetime.fromisoformat(s["entry_time"]) if isinstance(s["entry_time"], str) else s["entry_time"] + with _lock: + _positions[ticker] = { + "buy_price": s["buy_price"], + "peak_price": peak, + "amount": amount, + "invested_krw": s["invested_krw"], + "entry_time": entry_time, + } + logger.info( + f"[복원] {ticker} 매수가={s['buy_price']:,.0f}원 | 현재가={current:,.0f}원 " + f"| 수량={amount} (DB 복원)" + ) + else: + # DB에 없음 → 현재가로 초기화 후 DB에 저장 + entry_time = datetime.now() + with _lock: + _positions[ticker] = { + "buy_price": current, + "peak_price": current, + "amount": amount, + "invested_krw": min(invested_krw, PER_POSITION), + "entry_time": entry_time, + } + _db_upsert(ticker, _positions[ticker]) + logger.warning( + f"[복원] {ticker} 현재가={current:,.0f}원 | 수량={amount} " + f"(DB 기록 없음 → 현재가로 초기화)" + ) + + # Upbit 잔고에 없는데 DB에 남아있는 항목 정리 + for ticker in saved: + if ticker not in upbit_tickers: + try: + delete_position(ticker) + logger.info(f"[정리] {ticker} Upbit 잔고 없음 → DB 포지션 삭제") + except Exception as e: + logger.error(f"DB 포지션 삭제 실패 {ticker}: {e}") def buy(ticker: str) -> bool: @@ -78,6 +182,30 @@ def buy(ticker: str) -> bool: logger.debug(f"{ticker} 이미 보유 중") return False + # 직전 매도가 +1% 이상일 때만 재진입 (손절 직후 역방향 재매수 방지) + if ticker in _last_sell_prices: + current_check = pyupbit.get_current_price(ticker) + last_sell = _last_sell_prices[ticker] + threshold = last_sell * 1.01 + if current_check and current_check < threshold: + logger.info( + f"[재매수 차단] {ticker} 현재={current_check:,.2f} < " + f"직전매도+1%={threshold:,.2f} → 상승 흐름 미확인" + ) + return False + + # Walk-forward 필터: 직전 WF_WINDOW건 승률이 낮으면 진입 차단 + if WF_MIN_WIN_RATE > 0: + hist = _get_history(ticker) + if len(hist) >= WF_WINDOW: + recent_wr = sum(hist[-WF_WINDOW:]) / WF_WINDOW + if recent_wr < WF_MIN_WIN_RATE: + logger.info( + f"[WF차단] {ticker} 직전{WF_WINDOW}건 승률={recent_wr*100:.0f}%" + f" < {WF_MIN_WIN_RATE*100:.0f}% → 진입 차단" + ) + return False + if len(_positions) >= MAX_POSITIONS: logger.info(f"최대 포지션 도달({MAX_POSITIONS}), {ticker} 패스") return False @@ -92,7 +220,6 @@ def buy(ticker: str) -> bool: upbit = _get_upbit() try: - current = pyupbit.get_current_price(ticker) result = upbit.buy_market_order(ticker, order_krw) if not result or "error" in str(result): logger.error(f"매수 실패: {result}") @@ -102,18 +229,23 @@ def buy(ticker: str) -> bool: currency = ticker.split("-")[1] amount = float(upbit.get_balance(currency) or 0) + # 실제 체결가 = 투자금 / 수량 (시장가 주문 슬리피지 반영) + actual_price = order_krw / amount if amount > 0 else pyupbit.get_current_price(ticker) + + entry_time = datetime.now() _positions[ticker] = { - "buy_price": current, - "peak_price": current, + "buy_price": actual_price, + "peak_price": actual_price, "amount": amount, "invested_krw": order_krw, - "entry_time": datetime.now(), + "entry_time": entry_time, } + _db_upsert(ticker, _positions[ticker]) logger.info( - f"[매수] {ticker} @ {current:,.0f}원 | " + f"[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | " f"수량={amount} | 투자금={order_krw:,}원" ) - notify_buy(ticker, current, amount, order_krw) + notify_buy(ticker, actual_price, amount, order_krw) return True except Exception as e: logger.error(f"매수 예외 {ticker}: {e}") @@ -130,19 +262,41 @@ def sell(ticker: str, reason: str = "") -> bool: pos = _positions[ticker] upbit = _get_upbit() try: - result = upbit.sell_market_order(ticker, pos["amount"]) + currency = ticker.split("-")[1] + + # 실제 잔고 확인 (재시작 후 이미 매도된 경우 대비) + actual_amount = float(upbit.get_balance(currency) or 0) + if actual_amount < 0.00001: + logger.warning(f"[매도] {ticker} 실제 잔고 없음 → 포지션 정리 (이미 매도됨)") + del _positions[ticker] + return True + + result = upbit.sell_market_order(ticker, actual_amount) if not result or "error" in str(result): logger.error(f"매도 실패: {result}") + # 실패 후에도 잔고 재확인 → 0이면 실제로는 매도됨 + actual_amount2 = float(upbit.get_balance(currency) or 0) + if actual_amount2 < 0.00001: + logger.warning(f"[매도] {ticker} 잔고 소진 확인 → 포지션 정리") + del _positions[ticker] + return True return False current = pyupbit.get_current_price(ticker) - pnl = (current - pos["buy_price"]) / pos["buy_price"] * 100 + pnl = (current - pos["buy_price"]) / pos["buy_price"] * 100 if current else 0.0 logger.info( f"[매도] {ticker} @ {current:,.0f}원 | " f"수익률={pnl:+.1f}% | 사유={reason}" ) notify_sell(ticker, current, pnl, reason) + if current: + _last_sell_prices[ticker] = current # 재매수 기준가 기록 + _update_history(ticker, pnl > 0, pnl) # walk-forward 이력 갱신 del _positions[ticker] + try: + delete_position(ticker) + except Exception as e: + logger.error(f"포지션 DB 삭제 실패 {ticker}: {e}") return True except Exception as e: logger.error(f"매도 예외 {ticker}: {e}") @@ -156,3 +310,4 @@ def update_peak(ticker: str, current_price: float) -> None: if ticker in _positions: if current_price > _positions[ticker]["peak_price"]: _positions[ticker]["peak_price"] = current_price + _db_upsert(ticker, _positions[ticker])