After sell_market_order, query Upbit /v1/order API to get actual trade fills. If split across multiple fills, compute weighted average price and use actual paid_fee instead of estimate. Falls back to get_current_price if order query fails. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
423 lines
16 KiB
Python
423 lines
16 KiB
Python
"""매수/매도 실행 및 포지션 관리."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
import os
|
|
import threading
|
|
import time
|
|
import uuid
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import pyupbit
|
|
from dotenv import load_dotenv
|
|
from .notify import notify_buy, notify_sell, notify_error
|
|
from .price_db import (
|
|
delete_position, load_positions, upsert_position,
|
|
ensure_trade_results_table, record_trade, load_recent_wins,
|
|
ensure_sell_prices_table, upsert_sell_price, load_sell_prices,
|
|
)
|
|
|
|
load_dotenv()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
MAX_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 총 운용 한도
|
|
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수
|
|
PER_POSITION = MAX_BUDGET // MAX_POSITIONS # 종목당 투자금
|
|
|
|
# Walk-forward 필터 설정
|
|
WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기
|
|
WF_MIN_WIN_RATE = float(os.getenv("WF_MIN_WIN_RATE", "0.40")) # 최소 승률 임계값
|
|
|
|
_lock = threading.Lock()
|
|
_positions: dict = {}
|
|
# 구조: { ticker: { buy_price, peak_price, amount, invested_krw, entry_time } }
|
|
|
|
_last_sell_prices: dict[str, float] = {}
|
|
# 직전 매도가 기록 — 재매수 시 이 가격 이상일 때만 진입 허용
|
|
|
|
_trade_history: dict[str, list[bool]] = {}
|
|
# walk-forward 이력: { ticker: [True/False, ...] } (True=수익)
|
|
|
|
_upbit: Optional[pyupbit.Upbit] = None
|
|
|
|
|
|
def _get_upbit() -> pyupbit.Upbit:
|
|
global _upbit
|
|
if _upbit is None:
|
|
_upbit = pyupbit.Upbit(os.getenv("ACCESS_KEY"), os.getenv("SECRET_KEY"))
|
|
return _upbit
|
|
|
|
|
|
def _get_history(ticker: str) -> list[bool]:
|
|
"""in-memory 이력 반환. 없으면 DB에서 초기 로드."""
|
|
if ticker not in _trade_history:
|
|
try:
|
|
_trade_history[ticker] = load_recent_wins(ticker, WF_WINDOW)
|
|
except Exception:
|
|
_trade_history[ticker] = []
|
|
return _trade_history[ticker]
|
|
|
|
|
|
def _update_history(
|
|
ticker: str, is_win: bool, pnl_pct: float,
|
|
fee_krw: float = 0.0, krw_profit: float = 0.0,
|
|
trade_id: str = "", buy_price: float = 0.0,
|
|
sell_price: float = 0.0, invested_krw: int = 0,
|
|
sell_reason: str = "",
|
|
) -> None:
|
|
"""매도 후 in-memory 이력 갱신 + DB 기록."""
|
|
hist = _trade_history.setdefault(ticker, [])
|
|
hist.append(is_win)
|
|
# 윈도우 초과분 제거 (메모리 절약)
|
|
if len(hist) > WF_WINDOW * 2:
|
|
_trade_history[ticker] = hist[-WF_WINDOW:]
|
|
try:
|
|
record_trade(
|
|
ticker, is_win, pnl_pct, fee_krw, krw_profit,
|
|
trade_id, buy_price, sell_price, invested_krw, sell_reason,
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"거래 이력 저장 실패 {ticker}: {e}")
|
|
|
|
|
|
def _db_upsert(ticker: str, pos: dict) -> None:
|
|
"""포지션을 Oracle DB에 저장 (실패해도 거래는 계속)."""
|
|
try:
|
|
upsert_position(
|
|
ticker=ticker,
|
|
buy_price=pos["buy_price"],
|
|
peak_price=pos["peak_price"],
|
|
amount=pos["amount"],
|
|
invested_krw=pos["invested_krw"],
|
|
entry_time=pos["entry_time"].isoformat(),
|
|
trade_id=pos.get("trade_id", ""),
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"포지션 DB 저장 실패 {ticker}: {e}")
|
|
|
|
|
|
def get_positions() -> dict:
|
|
return _positions
|
|
|
|
|
|
def restore_positions() -> None:
|
|
"""시작 시 Oracle DB + Upbit 잔고를 교차 확인하여 포지션 복원.
|
|
trade_results 테이블도 이 시점에 생성 (없으면).
|
|
|
|
DB에 저장된 실제 매수가를 복원하고, Upbit 잔고에 없으면 DB에서도 삭제한다.
|
|
"""
|
|
# trade_results / sell_prices 테이블 초기화
|
|
try:
|
|
ensure_trade_results_table()
|
|
except Exception as e:
|
|
logger.warning(f"trade_results 테이블 생성 실패 (무시): {e}")
|
|
|
|
try:
|
|
ensure_sell_prices_table()
|
|
except Exception as e:
|
|
logger.warning(f"sell_prices 테이블 생성 실패 (무시): {e}")
|
|
|
|
# 직전 매도가 복원 (재매수 차단 기준 유지)
|
|
try:
|
|
loaded = load_sell_prices()
|
|
_last_sell_prices.update(loaded)
|
|
if loaded:
|
|
logger.info(f"[복원] 직전 매도가 {len(loaded)}건 복원: {list(loaded.keys())}")
|
|
except Exception as e:
|
|
logger.warning(f"직전 매도가 복원 실패 (무시): {e}")
|
|
|
|
# DB에서 저장된 포지션 로드
|
|
try:
|
|
saved = {row["ticker"]: row for row in load_positions()}
|
|
except Exception as e:
|
|
logger.error(f"DB 포지션 로드 실패: {e}")
|
|
saved = {}
|
|
|
|
upbit = _get_upbit()
|
|
balances = upbit.get_balances()
|
|
upbit_tickers = set()
|
|
|
|
for b in balances:
|
|
currency = b["currency"]
|
|
if currency == "KRW":
|
|
continue
|
|
amount = float(b["balance"]) + float(b["locked"])
|
|
if amount <= 0:
|
|
continue
|
|
ticker = f"KRW-{currency}"
|
|
current = pyupbit.get_current_price(ticker)
|
|
if not current:
|
|
continue
|
|
invested_krw = int(amount * current)
|
|
if invested_krw < 1_000: # 소액 잔고 무시
|
|
continue
|
|
|
|
upbit_tickers.add(ticker)
|
|
|
|
if ticker in saved:
|
|
# DB에 저장된 실제 매수가 복원
|
|
s = saved[ticker]
|
|
peak = max(s["peak_price"], current) # 재시작 중 올랐을 수 있으므로 높은 쪽
|
|
entry_time = datetime.fromisoformat(s["entry_time"]) if isinstance(s["entry_time"], str) else s["entry_time"]
|
|
with _lock:
|
|
_positions[ticker] = {
|
|
"buy_price": s["buy_price"],
|
|
"peak_price": peak,
|
|
"amount": amount,
|
|
"invested_krw": s["invested_krw"],
|
|
"entry_time": entry_time,
|
|
"trade_id": s.get("trade_id", ""),
|
|
}
|
|
logger.info(
|
|
f"[복원] {ticker} 매수가={s['buy_price']:,.0f}원 | 현재가={current:,.0f}원 "
|
|
f"| 수량={amount} (DB 복원)"
|
|
)
|
|
else:
|
|
# DB에 없음 → 현재가로 초기화 후 DB에 저장
|
|
entry_time = datetime.now()
|
|
with _lock:
|
|
_positions[ticker] = {
|
|
"buy_price": current,
|
|
"peak_price": current,
|
|
"amount": amount,
|
|
"invested_krw": min(invested_krw, PER_POSITION),
|
|
"entry_time": entry_time,
|
|
}
|
|
_db_upsert(ticker, _positions[ticker])
|
|
logger.warning(
|
|
f"[복원] {ticker} 현재가={current:,.0f}원 | 수량={amount} "
|
|
f"(DB 기록 없음 → 현재가로 초기화)"
|
|
)
|
|
|
|
# Upbit 잔고에 없는데 DB에 남아있는 항목 정리
|
|
for ticker in saved:
|
|
if ticker not in upbit_tickers:
|
|
try:
|
|
delete_position(ticker)
|
|
logger.info(f"[정리] {ticker} Upbit 잔고 없음 → DB 포지션 삭제")
|
|
except Exception as e:
|
|
logger.error(f"DB 포지션 삭제 실패 {ticker}: {e}")
|
|
|
|
|
|
def buy(ticker: str) -> bool:
|
|
"""시장가 매수. 예산·포지션 수 확인 후 진입."""
|
|
with _lock:
|
|
if ticker in _positions:
|
|
logger.debug(f"{ticker} 이미 보유 중")
|
|
return False
|
|
|
|
# 직전 매도가 +1% 이상일 때만 재진입 (손절 직후 역방향 재매수 방지)
|
|
# 단, 직전 거래가 수익(승)이었으면 이 필터 스킵 — 다시 상승 시 재진입 허용
|
|
if ticker in _last_sell_prices:
|
|
hist = _get_history(ticker)
|
|
last_was_win = bool(hist[-1]) if hist else False
|
|
if not last_was_win:
|
|
current_check = pyupbit.get_current_price(ticker)
|
|
last_sell = _last_sell_prices[ticker]
|
|
threshold = last_sell * 1.01
|
|
if current_check and current_check < threshold:
|
|
logger.info(
|
|
f"[재매수 차단] {ticker} 현재={current_check:,.2f} < "
|
|
f"직전매도+1%={threshold:,.2f} → 상승 흐름 미확인"
|
|
)
|
|
return False
|
|
|
|
# Walk-forward 필터: 직전 WF_WINDOW건 승률이 낮으면 진입 차단
|
|
if WF_MIN_WIN_RATE > 0:
|
|
hist = _get_history(ticker)
|
|
if len(hist) >= WF_WINDOW:
|
|
recent_wr = sum(hist[-WF_WINDOW:]) / WF_WINDOW
|
|
if recent_wr < WF_MIN_WIN_RATE:
|
|
logger.info(
|
|
f"[WF차단] {ticker} 직전{WF_WINDOW}건 승률={recent_wr*100:.0f}%"
|
|
f" < {WF_MIN_WIN_RATE*100:.0f}% → 진입 차단"
|
|
)
|
|
return False
|
|
|
|
if len(_positions) >= MAX_POSITIONS:
|
|
logger.info(f"최대 포지션 도달({MAX_POSITIONS}), {ticker} 패스")
|
|
return False
|
|
|
|
invested = sum(p["invested_krw"] for p in _positions.values())
|
|
available = MAX_BUDGET - invested
|
|
order_krw = min(available, PER_POSITION)
|
|
|
|
if order_krw < 10_000:
|
|
logger.info(f"잔여 예산 부족({order_krw:,}원), {ticker} 패스")
|
|
return False
|
|
|
|
upbit = _get_upbit()
|
|
try:
|
|
result = upbit.buy_market_order(ticker, order_krw)
|
|
if not result or "error" in str(result):
|
|
logger.error(f"매수 실패: {result}")
|
|
return False
|
|
|
|
time.sleep(0.5) # 체결 대기
|
|
currency = ticker.split("-")[1]
|
|
amount = float(upbit.get_balance(currency) or 0)
|
|
|
|
# 실제 체결가 = 투자금 / 수량 (시장가 주문 슬리피지 반영)
|
|
actual_price = order_krw / amount if amount > 0 else pyupbit.get_current_price(ticker)
|
|
|
|
entry_time = datetime.now()
|
|
trade_id = str(uuid.uuid4())
|
|
_positions[ticker] = {
|
|
"buy_price": actual_price,
|
|
"peak_price": actual_price,
|
|
"amount": amount,
|
|
"invested_krw": order_krw,
|
|
"entry_time": entry_time,
|
|
"trade_id": trade_id,
|
|
}
|
|
_db_upsert(ticker, _positions[ticker])
|
|
logger.info(
|
|
f"[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | "
|
|
f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}"
|
|
)
|
|
notify_buy(ticker, actual_price, amount, order_krw)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"매수 예외 {ticker}: {e}")
|
|
notify_error(f"매수 실패 {ticker}: {e}")
|
|
return False
|
|
|
|
|
|
def _get_avg_fill_price(
|
|
upbit: pyupbit.Upbit,
|
|
order_uuid: str,
|
|
ticker: str,
|
|
fallback: float,
|
|
) -> tuple[float, float | None]:
|
|
"""주문 UUID로 실제 체결 내역을 조회해 가중평균 체결가와 실수수료를 반환.
|
|
|
|
분할 체결(여러 fills)이면 합산 평균가 계산.
|
|
조회 실패 시 (fallback_price, None) 반환.
|
|
"""
|
|
if not order_uuid:
|
|
return fallback, None
|
|
try:
|
|
import hashlib
|
|
import jwt as _jwt
|
|
import requests as _req
|
|
|
|
query_str = f"uuid={order_uuid}"
|
|
payload = {
|
|
"access_key": upbit.access_key,
|
|
"nonce": str(uuid.uuid4()),
|
|
"query_hash": hashlib.sha512(query_str.encode()).hexdigest(),
|
|
"query_hash_alg": "SHA512",
|
|
}
|
|
token = _jwt.encode(payload, upbit.secret_key, algorithm="HS256")
|
|
resp = _req.get(
|
|
"https://api.upbit.com/v1/order",
|
|
params={"uuid": order_uuid},
|
|
headers={"Authorization": f"Bearer {token}"},
|
|
timeout=5,
|
|
)
|
|
data = resp.json()
|
|
trades = data.get("trades", [])
|
|
if not trades:
|
|
return fallback, None
|
|
|
|
total_vol = sum(float(t["volume"]) for t in trades)
|
|
total_krw = sum(float(t["price"]) * float(t["volume"]) for t in trades)
|
|
avg_price = total_krw / total_vol if total_vol > 0 else fallback
|
|
paid_fee = float(data.get("paid_fee", 0))
|
|
|
|
if len(trades) > 1:
|
|
logger.info(
|
|
f"[분할체결] {ticker} {len(trades)}건 → "
|
|
f"평균={avg_price:,.4f}원 (수수료={paid_fee:,.0f}원)"
|
|
)
|
|
return avg_price, paid_fee
|
|
except Exception as e:
|
|
logger.debug(f"[체결조회 실패] {ticker} uuid={order_uuid[:8]}: {e}")
|
|
return fallback, None
|
|
|
|
|
|
def sell(ticker: str, reason: str = "") -> bool:
|
|
"""시장가 전량 매도."""
|
|
with _lock:
|
|
if ticker not in _positions:
|
|
return False
|
|
|
|
pos = _positions[ticker]
|
|
upbit = _get_upbit()
|
|
try:
|
|
currency = ticker.split("-")[1]
|
|
|
|
# 실제 잔고 확인 (재시작 후 이미 매도된 경우 대비)
|
|
actual_amount = float(upbit.get_balance(currency) or 0)
|
|
if actual_amount < 0.00001:
|
|
logger.warning(f"[매도] {ticker} 실제 잔고 없음 → 포지션 정리 (이미 매도됨)")
|
|
del _positions[ticker]
|
|
return True
|
|
|
|
result = upbit.sell_market_order(ticker, actual_amount)
|
|
if not result or "error" in str(result):
|
|
logger.error(f"매도 실패: {result}")
|
|
# 실패 후에도 잔고 재확인 → 0이면 실제로는 매도됨
|
|
actual_amount2 = float(upbit.get_balance(currency) or 0)
|
|
if actual_amount2 < 0.00001:
|
|
logger.warning(f"[매도] {ticker} 잔고 소진 확인 → 포지션 정리")
|
|
del _positions[ticker]
|
|
return True
|
|
return False
|
|
|
|
time.sleep(0.5) # 체결 완료 대기
|
|
|
|
# 실제 체결 내역으로 가중평균 매도가 계산 (분할 체결 대응)
|
|
order_uuid = result.get("uuid", "") if isinstance(result, dict) else ""
|
|
fallback_price = pyupbit.get_current_price(ticker) or pos["buy_price"]
|
|
actual_sell_price, actual_fee_from_order = _get_avg_fill_price(
|
|
upbit, order_uuid, ticker, fallback_price
|
|
)
|
|
|
|
pnl = (actual_sell_price - pos["buy_price"]) / pos["buy_price"] * 100
|
|
sell_value = actual_sell_price * actual_amount
|
|
# 수수료: 주문 조회 성공 시 실제값, 아니면 추정값 (0.05% 양방향)
|
|
fee = actual_fee_from_order if actual_fee_from_order is not None \
|
|
else (pos["invested_krw"] * 0.0005 + sell_value * 0.0005)
|
|
krw_profit = sell_value - pos["invested_krw"] - fee
|
|
logger.info(
|
|
f"[매도] {ticker} @ {actual_sell_price:,.4f}원 | "
|
|
f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (수수료 {fee:,.0f}원) | 사유={reason}"
|
|
)
|
|
notify_sell(ticker, actual_sell_price, pnl, reason)
|
|
_last_sell_prices[ticker] = actual_sell_price
|
|
try:
|
|
upsert_sell_price(ticker, actual_sell_price)
|
|
except Exception as e:
|
|
logger.error(f"직전 매도가 DB 저장 실패 {ticker}: {e}")
|
|
_update_history(
|
|
ticker, pnl > 0, pnl, fee, krw_profit,
|
|
trade_id=pos.get("trade_id", ""),
|
|
buy_price=pos["buy_price"],
|
|
sell_price=actual_sell_price,
|
|
invested_krw=pos["invested_krw"],
|
|
sell_reason=reason,
|
|
)
|
|
del _positions[ticker]
|
|
try:
|
|
delete_position(ticker)
|
|
except Exception as e:
|
|
logger.error(f"포지션 DB 삭제 실패 {ticker}: {e}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"매도 예외 {ticker}: {e}")
|
|
notify_error(f"매도 실패 {ticker}: {e}")
|
|
return False
|
|
|
|
|
|
def update_peak(ticker: str, current_price: float) -> None:
|
|
"""최고가 갱신 (트레일링 스탑 기준선 상향)."""
|
|
with _lock:
|
|
if ticker in _positions:
|
|
if current_price > _positions[ticker]["peak_price"]:
|
|
_positions[ticker]["peak_price"] = current_price
|
|
_db_upsert(ticker, _positions[ticker])
|