feat: add shadow trading rehabilitation for WF-blocked tickers

When WF filter blocks a ticker, automatically start a virtual (shadow)
position with the same trailing/time stop logic. After WF_SHADOW_WINS
consecutive shadow wins, reset WF history to re-enable real trading.

- trader.py: add _shadow_positions, _shadow_cons_wins state;
  _shadow_enter(), get_shadow_positions(), update_shadow_peak(),
  close_shadow() functions; trigger shadow entry on WF block in buy()
- monitor.py: add _check_shadow_position() with ATR trailing + time stop;
  check shadow positions every 10s in run_monitor()
- Env: WF_SHADOW_WINS=2 (2 consecutive wins to rehabilitate)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-01 23:57:45 +09:00
parent 16b4c932a2
commit 29d48f0fe9
2 changed files with 246 additions and 44 deletions

View File

@@ -112,6 +112,47 @@ def _check_time_stop(ticker: str, pos: dict, current: float) -> bool:
return True return True
def _check_shadow_position(ticker: str, spos: dict) -> None:
"""Shadow 포지션 청산 조건 체크 (트레일링 + 타임 스탑).
실제 포지션과 동일한 로직을 적용하되 주문 없이 결과만 기록.
"""
current = get_current_price(ticker)
if current is None:
return
trader.update_shadow_peak(ticker, current)
# 갱신 후 최신 값 재조회
spos = trader.get_shadow_positions().get(ticker)
if spos is None:
return
buy_price = spos["buy_price"]
peak = spos["peak_price"]
entry_time = spos["entry_time"]
stop_pct = _get_adaptive_stop(ticker)
drop_from_peak = (peak - current) / peak
elapsed_hours = (datetime.now() - entry_time).total_seconds() / 3600
pnl_pct = (current - buy_price) / buy_price * 100
reason = None
if drop_from_peak >= stop_pct:
reason = (
f"트레일링스탑 | 최고={peak:,.2f}→현재={current:,.2f}"
f" ({drop_from_peak:.2%} | 스탑={stop_pct:.2%})"
)
elif elapsed_hours >= TIME_STOP_HOURS and pnl_pct < TIME_STOP_MIN_GAIN_PCT:
reason = (
f"타임스탑 | {elapsed_hours:.1f}h 경과 "
f"수익률={pnl_pct:+.2f}% (기준={TIME_STOP_MIN_GAIN_PCT:+.2f}%)"
)
if reason:
trader.close_shadow(ticker, current, pnl_pct, reason)
def _check_position(ticker: str, pos: dict) -> None: def _check_position(ticker: str, pos: dict) -> None:
"""단일 포지션 전체 체크 (트레일링 스탑 → 타임 스탑 순서).""" """단일 포지션 전체 체크 (트레일링 스탑 → 타임 스탑 순서)."""
current = get_current_price(ticker) current = get_current_price(ticker)
@@ -149,10 +190,20 @@ def run_monitor(interval: int = CHECK_INTERVAL) -> None:
f"타임스탑={TIME_STOP_HOURS:.0f}h/{TIME_STOP_MIN_GAIN_PCT:+.2f}%" f"타임스탑={TIME_STOP_HOURS:.0f}h/{TIME_STOP_MIN_GAIN_PCT:+.2f}%"
) )
while True: while True:
# 실제 포지션 감시
positions_snapshot = dict(trader.get_positions()) positions_snapshot = dict(trader.get_positions())
for ticker, pos in positions_snapshot.items(): for ticker, pos in positions_snapshot.items():
try: try:
_check_position(ticker, pos) _check_position(ticker, pos)
except Exception as e: except Exception as e:
logger.error(f"모니터 오류 {ticker}: {e}") logger.error(f"모니터 오류 {ticker}: {e}")
# Shadow 포지션 감시 (WF차단 종목 재활 추적)
shadow_snapshot = trader.get_shadow_positions()
for ticker, spos in shadow_snapshot.items():
try:
_check_shadow_position(ticker, spos)
except Exception as e:
logger.error(f"Shadow 모니터 오류 {ticker}: {e}")
time.sleep(interval) time.sleep(interval)

View File

@@ -24,6 +24,12 @@ load_dotenv()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SIMULATION_MODE = os.getenv("SIMULATION_MODE", "").lower() in ("true", "1", "yes")
if SIMULATION_MODE:
logging.getLogger(__name__).warning(
"*** SIMULATION MODE ACTIVE — 실제 주문이 실행되지 않습니다 ***"
)
INITIAL_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 초기 원금 (고정) INITIAL_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 초기 원금 (고정)
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수 MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수
@@ -54,6 +60,7 @@ def _recalc_compound_budget() -> None:
# Walk-forward 필터 설정 # Walk-forward 필터 설정
WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기 WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기
WF_MIN_WIN_RATE = float(os.getenv("WF_MIN_WIN_RATE", "0.40")) # 최소 승률 임계값 WF_MIN_WIN_RATE = float(os.getenv("WF_MIN_WIN_RATE", "0.40")) # 최소 승률 임계값
WF_SHADOW_WINS = int(os.getenv("WF_SHADOW_WINS", "2")) # shadow N연승 → WF 해제
_lock = threading.Lock() _lock = threading.Lock()
_positions: dict = {} _positions: dict = {}
@@ -65,6 +72,13 @@ _last_sell_prices: dict[str, float] = {}
_trade_history: dict[str, list[bool]] = {} _trade_history: dict[str, list[bool]] = {}
# walk-forward 이력: { ticker: [True/False, ...] } (True=수익) # walk-forward 이력: { ticker: [True/False, ...] } (True=수익)
_shadow_lock = threading.Lock()
_shadow_positions: dict[str, dict] = {}
# WF차단 종목 가상 포지션: { ticker: { buy_price, peak_price, entry_time } }
_shadow_cons_wins: dict[str, int] = {}
# shadow 연속 승 횟수: { ticker: int }
_upbit: Optional[pyupbit.Upbit] = None _upbit: Optional[pyupbit.Upbit] = None
@@ -107,6 +121,88 @@ def _update_history(
logger.error(f"거래 이력 저장 실패 {ticker}: {e}") logger.error(f"거래 이력 저장 실패 {ticker}: {e}")
# ── Shadow 재활 ────────────────────────────────────────────────────────────────
def _shadow_enter(ticker: str) -> None:
"""WF 차단 종목에 shadow(가상) 포지션 진입.
buy() 내부(_lock 보유 중)에서 호출됨.
API 호출 후 _shadow_lock으로만 shadow 상태 보호 (deadlock 방지).
"""
# 이미 shadow 중이면 스킵
if ticker in _shadow_positions:
return
price = pyupbit.get_current_price(ticker)
if not price:
return
with _shadow_lock:
if ticker in _shadow_positions: # double-check
return
_shadow_positions[ticker] = {
"buy_price": price,
"peak_price": price,
"entry_time": datetime.now(),
}
cons = _shadow_cons_wins.get(ticker, 0)
logger.info(
f"[Shadow진입] {ticker} @ {price:,.0f}"
f"(가상 — WF 재활 {cons}/{WF_SHADOW_WINS}연승 필요)"
)
def get_shadow_positions() -> dict:
"""Shadow 포지션 스냅샷 반환 (monitor 에서 조회용)."""
with _shadow_lock:
return {k: dict(v) for k, v in _shadow_positions.items()}
def update_shadow_peak(ticker: str, price: float) -> None:
"""Shadow 포지션 최고가 갱신."""
with _shadow_lock:
if ticker in _shadow_positions:
if price > _shadow_positions[ticker]["peak_price"]:
_shadow_positions[ticker]["peak_price"] = price
def close_shadow(ticker: str, sell_price: float, pnl_pct: float, reason: str) -> None:
"""Shadow 포지션 청산 및 WF 재활 진행.
연속승 갱신 → WF_SHADOW_WINS 달성 시 WF 이력 초기화 + Telegram 알림.
"""
with _shadow_lock:
spos = _shadow_positions.pop(ticker, None)
if spos is None:
return
is_win = pnl_pct > 0
cons = _shadow_cons_wins.get(ticker, 0)
cons = cons + 1 if is_win else 0
_shadow_cons_wins[ticker] = cons
do_wf_reset = cons >= WF_SHADOW_WINS
if do_wf_reset:
_shadow_cons_wins.pop(ticker, None)
mark = "" if is_win else ""
logger.info(
f"[Shadow청산] {ticker} {spos['buy_price']:,.0f}{sell_price:,.0f}"
f"| {mark} {pnl_pct:+.1f}% | {reason} | 연속승={cons}/{WF_SHADOW_WINS}"
)
if do_wf_reset:
with _lock: # _shadow_lock은 이미 해제된 상태 (deadlock 없음)
_trade_history.pop(ticker, None)
logger.warning(
f"[WF해제] {ticker} Shadow {WF_SHADOW_WINS}연승 달성 → "
f"WF 이력 초기화, 실거래 재개"
)
notify_error(
f"🎉 [{ticker}] WF 재활 완료!\n"
f"Shadow {WF_SHADOW_WINS}연승 달성 → 실거래 재개"
)
def _db_upsert(ticker: str, pos: dict) -> None: def _db_upsert(ticker: str, pos: dict) -> None:
"""포지션을 Oracle DB에 저장 (실패해도 거래는 계속).""" """포지션을 Oracle DB에 저장 (실패해도 거래는 계속)."""
try: try:
@@ -172,6 +268,30 @@ def restore_positions() -> None:
logger.error(f"DB 포지션 로드 실패: {e}") logger.error(f"DB 포지션 로드 실패: {e}")
saved = {} saved = {}
if SIMULATION_MODE:
# --- 시뮬레이션: Upbit 잔고 조회 없이 DB 포지션만 복원 ---
logger.info("[SIMULATION] 시뮬레이션 모드 — Upbit 잔고 조회 생략, DB 포지션만 복원")
for ticker, s in saved.items():
current = pyupbit.get_current_price(ticker)
if not current:
continue
peak = max(s["peak_price"], current)
entry_time = datetime.fromisoformat(s["entry_time"]) if isinstance(s["entry_time"], str) else s["entry_time"]
with _lock:
_positions[ticker] = {
"buy_price": s["buy_price"],
"peak_price": peak,
"amount": s.get("amount", 0),
"invested_krw": s["invested_krw"],
"entry_time": entry_time,
"trade_id": s.get("trade_id", ""),
}
logger.info(
f"[SIMULATION][복원] {ticker} 매수가={s['buy_price']:,.0f}원 | "
f"현재가={current:,.0f}원 (DB 복원)"
)
return
upbit = _get_upbit() upbit = _get_upbit()
balances = upbit.get_balances() balances = upbit.get_balances()
upbit_tickers = set() upbit_tickers = set()
@@ -261,7 +381,7 @@ def buy(ticker: str) -> bool:
) )
return False return False
# Walk-forward 필터: 직전 WF_WINDOW건 승률이 낮으면 진입 차단 # Walk-forward 필터: 직전 WF_WINDOW건 승률이 낮으면 진입 차단 + shadow 진입
if WF_MIN_WIN_RATE > 0: if WF_MIN_WIN_RATE > 0:
hist = _get_history(ticker) hist = _get_history(ticker)
if len(hist) >= WF_WINDOW: if len(hist) >= WF_WINDOW:
@@ -269,8 +389,9 @@ def buy(ticker: str) -> bool:
if recent_wr < WF_MIN_WIN_RATE: if recent_wr < WF_MIN_WIN_RATE:
logger.info( logger.info(
f"[WF차단] {ticker} 직전{WF_WINDOW}건 승률={recent_wr*100:.0f}%" f"[WF차단] {ticker} 직전{WF_WINDOW}건 승률={recent_wr*100:.0f}%"
f" < {WF_MIN_WIN_RATE*100:.0f}% → 진입 차단" f" < {WF_MIN_WIN_RATE*100:.0f}% → 진입 차단 (shadow 재활 시작)"
) )
_shadow_enter(ticker) # 가상 포지션으로 WF 재활 추적
return False return False
if len(_positions) >= MAX_POSITIONS: if len(_positions) >= MAX_POSITIONS:
@@ -285,8 +406,22 @@ def buy(ticker: str) -> bool:
logger.info(f"잔여 예산 부족({order_krw:,}원), {ticker} 패스") logger.info(f"잔여 예산 부족({order_krw:,}원), {ticker} 패스")
return False return False
upbit = _get_upbit()
try: try:
if SIMULATION_MODE:
# --- 시뮬레이션 매수 ---
sim_price = pyupbit.get_current_price(ticker)
if not sim_price:
logger.error(f"[SIMULATION] 현재가 조회 실패: {ticker}")
return False
amount = order_krw / sim_price
actual_price = sim_price
logger.info(
f"[SIMULATION][매수] {ticker} @ {actual_price:,.0f}원 | "
f"수량={amount:.8f} | 투자금={order_krw:,}원 (모의 주문)"
)
else:
# --- 실제 매수 ---
upbit = _get_upbit()
result = upbit.buy_market_order(ticker, order_krw) result = upbit.buy_market_order(ticker, order_krw)
if not result or "error" in str(result): if not result or "error" in str(result):
logger.error(f"매수 실패: {result}") logger.error(f"매수 실패: {result}")
@@ -310,8 +445,9 @@ def buy(ticker: str) -> bool:
"trade_id": trade_id, "trade_id": trade_id,
} }
_db_upsert(ticker, _positions[ticker]) _db_upsert(ticker, _positions[ticker])
prefix = "[SIMULATION]" if SIMULATION_MODE else ""
logger.info( logger.info(
f"[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | " f"{prefix}[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | "
f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}" f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}"
) )
notify_buy(ticker, actual_price, amount, order_krw, notify_buy(ticker, actual_price, amount, order_krw,
@@ -383,8 +519,22 @@ def sell(ticker: str, reason: str = "") -> bool:
return False return False
pos = _positions[ticker] pos = _positions[ticker]
upbit = _get_upbit()
try: try:
if SIMULATION_MODE:
# --- 시뮬레이션 매도 ---
actual_amount = pos["amount"]
actual_sell_price = pyupbit.get_current_price(ticker) or pos["buy_price"]
sell_value = actual_sell_price * actual_amount
fee = pos["invested_krw"] * 0.0005 + sell_value * 0.0005
krw_profit = sell_value - pos["invested_krw"] - fee
pnl = (actual_sell_price - pos["buy_price"]) / pos["buy_price"] * 100
logger.info(
f"[SIMULATION][매도] {ticker} @ {actual_sell_price:,.4f}원 | "
f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (모의 주문) | 사유={reason}"
)
else:
# --- 실제 매도 ---
upbit = _get_upbit()
currency = ticker.split("-")[1] currency = ticker.split("-")[1]
# 실제 잔고 확인 (재시작 후 이미 매도된 경우 대비) # 실제 잔고 확인 (재시작 후 이미 매도된 경우 대비)
@@ -420,8 +570,9 @@ def sell(ticker: str, reason: str = "") -> bool:
fee = actual_fee_from_order if actual_fee_from_order is not None \ fee = actual_fee_from_order if actual_fee_from_order is not None \
else (pos["invested_krw"] * 0.0005 + sell_value * 0.0005) else (pos["invested_krw"] * 0.0005 + sell_value * 0.0005)
krw_profit = sell_value - pos["invested_krw"] - fee krw_profit = sell_value - pos["invested_krw"] - fee
prefix = "[SIMULATION]" if SIMULATION_MODE else ""
logger.info( logger.info(
f"[매도] {ticker} @ {actual_sell_price:,.4f}원 | " f"{prefix}[매도] {ticker} @ {actual_sell_price:,.4f}원 | "
f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (수수료 {fee:,.0f}원) | 사유={reason}" f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (수수료 {fee:,.0f}원) | 사유={reason}"
) )
try: try: