diff --git a/atr_sweep.py b/atr_sweep.py new file mode 100644 index 0000000..caac239 --- /dev/null +++ b/atr_sweep.py @@ -0,0 +1,214 @@ +"""ATR_MAX_STOP 파라미터 스윕 시뮬레이션. + +실제 봇과 동일하게 ATR을 계산하되, ATR_MAX_STOP 상한만 바꿔가며 성과를 비교한다. + - ATR_MIN_STOP = 1.0% (고정) + - ATR_MULT = 1.5 (고정) + - ATR_CANDLES = 5 (고정) + - ATR_MAX_STOP : [1.5%, 2.0%, 2.5%, 3.0%, 3.5%, 4.0%] 스윕 + +데이터: Oracle ADB ohlcv_hourly (top30_tickers.pkl 상위 20종목) +""" + +import pickle +import sys +from pathlib import Path + +from dotenv import load_dotenv + +load_dotenv(dotenv_path=Path(__file__).parent / ".env") + +sys.path.insert(0, str(Path(__file__).parent)) +from ohlcv_db import load_from_db + +import pandas as pd + +# ── 고정 파라미터 ───────────────────────────────────────── +TIME_STOP_HOURS = 8 +TIME_STOP_MIN_PCT = 3.0 +FEE = 0.0005 +LOCAL_VOL_HOURS = 5 +VOL_MULT = 2.0 +PRICE_QUIET_PCT = 2.0 +SIGNAL_TIMEOUT_H = 8 +THRESH = 4.8 +FROM_DATE = "2025-03-02" + +# ATR 고정값 +ATR_CANDLES = 5 +ATR_MULT = 1.5 +ATR_MIN = 0.010 # 1.0% + +# 스윕 대상: ATR_MAX_STOP +ATR_MAX_CANDIDATES = [0.015, 0.020, 0.025, 0.030, 0.035, 0.040] + +TOP30_FILE = Path("top30_tickers.pkl") + + +# ── 매수 시점 ATR 계산 ──────────────────────────────────── +def calc_atr_stop(df: pd.DataFrame, buy_idx: int, atr_max: float) -> float: + """매수 직전 ATR_CANDLES개 봉으로 스탑 비율 계산. + + 실제 봇(monitor.py)의 _get_adaptive_stop() 로직과 동일. + 계산 실패 시 ATR_MIN 반환. + """ + start = max(0, buy_idx - ATR_CANDLES - 1) + sub = df.iloc[start:buy_idx] + if len(sub) < ATR_CANDLES: + return ATR_MIN + try: + ranges = (sub["high"] - sub["low"]) / sub["low"] + avg_range = ranges.iloc[-ATR_CANDLES:].mean() + return float(max(ATR_MIN, min(atr_max, avg_range * ATR_MULT))) + except Exception: + return ATR_MIN + + +# ── 포지션 시뮬 ─────────────────────────────────────────── +def simulate_pos(df: pd.DataFrame, buy_idx: int, buy_price: float, stop_pct: float): + """매수 후 청산 시뮬레이션 (고정 stop_pct 사용).""" + buy_dt = df.index[buy_idx] + peak = buy_price + + for i in range(buy_idx + 1, len(df)): + row = df.iloc[i] + ts = df.index[i] + + if row["high"] > peak: + peak = row["high"] + + stop_price = peak * (1 - stop_pct) + elapsed_h = (ts - buy_dt).total_seconds() / 3600 + + # 트레일링 스탑 + if row["low"] <= stop_price: + sell_price = stop_price + pnl = (sell_price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, sell_price, ts, f"트레일링({pnl:+.1f}%)", pnl + + # 타임 스탑 + pnl_now = (row["close"] - buy_price) / buy_price * 100 + if elapsed_h >= TIME_STOP_HOURS and pnl_now < TIME_STOP_MIN_PCT: + pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, row["close"], ts, "타임스탑", pnl + + last = df.iloc[-1]["close"] + pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 + return pnl > 0, last, df.index[-1], "데이터종료", pnl + + +# ── vol-lead 전략 실행 (ATR_MAX 동적 주입) ──────────────── +def run_vol_lead(df: pd.DataFrame, thresh: float, atr_max: float) -> list: + """vol-lead 신호 → 진입 → ATR 기반 청산 시뮬. + + 진입 시점의 ATR을 계산해 stop_pct를 결정하고 청산 시뮬에 전달. + """ + trades = [] + signal_i = None + signal_price = None + in_pos = False + buy_idx = buy_price = stop_pct = None + i = max(12, LOCAL_VOL_HOURS + 2) + + while i < len(df): + if in_pos: + is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct) + next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) + trades.append((is_win, pnl, df.index[buy_idx], sdt, reason, stop_pct)) + in_pos = False + signal_i = None + signal_price = None + i = next_i + continue + + close = df.iloc[i]["close"] + close_2h = df.iloc[i - 2]["close"] + quiet = abs(close - close_2h) / close_2h * 100 < PRICE_QUIET_PCT + + vol_recent = df.iloc[i - 1]["volume"] + vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean() + vol_spike = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT + + if quiet and vol_spike: + if signal_i is None: + signal_i = i + signal_price = close + else: + if signal_i is not None and close < signal_price: + signal_i = signal_price = None + + if signal_i is not None and (i - signal_i) > SIGNAL_TIMEOUT_H: + signal_i = signal_price = None + + if signal_i is not None: + move = (close - signal_price) / signal_price * 100 + if move >= thresh: + in_pos = True + buy_idx = i + buy_price = close + stop_pct = calc_atr_stop(df, i, atr_max) # ← 진입 시점 ATR 계산 + signal_i = signal_price = None + + i += 1 + + return trades + + +# ── 최대 낙폭 계산 ──────────────────────────────────────── +def calc_max_drawdown(trades: list) -> float: + if not trades: + return 0.0 + cum = peak = max_dd = 0.0 + for t in sorted(trades, key=lambda x: x[2]): + cum += t[1] + if cum > peak: + peak = cum + dd = peak - cum + if dd > max_dd: + max_dd = dd + return max_dd + + +# ── 메인 ───────────────────────────────────────────────── +def main() -> None: + top30: list = pickle.load(open(TOP30_FILE, "rb")) + print(f"DB 로드 중... ({len(top30)}종목)") + data = load_from_db(top30, from_date=FROM_DATE) + + valid = [t for t in top30 if t in data and len(data[t]) >= 500] + use20 = valid[:20] + print(f"유효 종목: {len(use20)}개\n") + + print(f"{'='*72}") + print(f"ATR_MAX_STOP 스윕 | ATR×{ATR_MULT} (최소={ATR_MIN:.1%}) | vol-lead +{THRESH}% | {len(use20)}종목") + print(f"{'='*72}") + print(f"{'ATR_MAX':>8} | {'거래수':>6} | {'승률':>6} | {'누적PnL%':>10} | {'최대낙폭%':>10} | {'평균스탑%':>9}") + print(f"{'─'*72}") + + for atr_max in ATR_MAX_CANDIDATES: + all_trades = [] + for ticker in use20: + if ticker not in data: + continue + trades = run_vol_lead(data[ticker], THRESH, atr_max) + all_trades.extend(trades) + + total = len(all_trades) + if total == 0: + print(f"{atr_max*100:>7.1f}% | {'0':>6} | {'N/A':>6} | {'N/A':>10} | {'N/A':>10} | {'N/A':>9}") + continue + + wins = sum(1 for t in all_trades if t[0]) + win_rate = wins / total * 100 + cum_pnl = sum(t[1] for t in all_trades) + max_dd = calc_max_drawdown(all_trades) + avg_stop = sum(t[5] for t in all_trades) / total * 100 # 실제 평균 스탑% + + print(f"{atr_max*100:>7.1f}% | {total:>6}건 | {win_rate:>5.1f}% | " + f"{cum_pnl:>+9.2f}% | {-max_dd:>+9.2f}% | {avg_stop:>8.2f}%") + + print(f"{'='*72}") + print("\n※ 평균스탑% = 실제 거래에서 적용된 ATR 스탑의 평균 (ATR_MAX에 걸렸는지 확인)") + + +if __name__ == "__main__": + main() diff --git a/core/monitor.py b/core/monitor.py index 9c93d4a..0363455 100644 --- a/core/monitor.py +++ b/core/monitor.py @@ -22,7 +22,7 @@ TIME_STOP_MIN_GAIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) ATR_CANDLES = 5 # 최근 N개 1h봉으로 자연 진폭 계산 ATR_MULT = 1.5 # 평균 진폭 × 배수 = 스탑 임계값 ATR_MIN_STOP = 0.010 # 최소 스탑 1.0% (너무 좁아지는 거 방지) -ATR_MAX_STOP = 0.040 # 최대 스탑 4.0% (너무 넓어지는 거 방지) +ATR_MAX_STOP = 0.020 # 최대 스탑 2.0% (너무 넓어지는 거 방지) # ATR 캐시: 종목별 (스탑비율, 계산시각) — 10분마다 갱신 _atr_cache: dict[str, tuple[float, float]] = {} diff --git a/core/price_db.py b/core/price_db.py index d68e36b..07bd3ca 100644 --- a/core/price_db.py +++ b/core/price_db.py @@ -282,6 +282,71 @@ def delete_sell_price(ticker: str) -> None: ) +# ── WF 상태 영구 저장 (재시작 후 shadow 재활 상태 유지) ────────────────────── + +def ensure_wf_state_table() -> None: + """wf_state 테이블이 없으면 생성.""" + ddl = """ + CREATE TABLE wf_state ( + ticker VARCHAR2(20) NOT NULL PRIMARY KEY, + is_blocked NUMBER(1) DEFAULT 0 NOT NULL, + shadow_cons_wins NUMBER DEFAULT 0 NOT NULL, + updated_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL + ) + """ + with _conn() as conn: + try: + conn.cursor().execute(ddl) + except oracledb.DatabaseError as e: + if e.args[0].code != 955: # ORA-00955: 이미 존재 + raise + + +def upsert_wf_state(ticker: str, is_blocked: bool, shadow_cons_wins: int) -> None: + """WF 차단 상태 저장 또는 갱신.""" + sql = """ + MERGE INTO wf_state w + USING (SELECT :ticker AS ticker FROM dual) d + ON (w.ticker = d.ticker) + WHEN MATCHED THEN + UPDATE SET is_blocked = :is_blocked, + shadow_cons_wins = :shadow_cons_wins, + updated_at = SYSTIMESTAMP + WHEN NOT MATCHED THEN + INSERT (ticker, is_blocked, shadow_cons_wins) + VALUES (:ticker, :is_blocked, :shadow_cons_wins) + """ + with _conn() as conn: + conn.cursor().execute(sql, { + "ticker": ticker, + "is_blocked": 1 if is_blocked else 0, + "shadow_cons_wins": shadow_cons_wins, + }) + + +def load_wf_states() -> dict[str, dict]: + """저장된 WF 상태 전체 로드. + + Returns: + {ticker: {"is_blocked": bool, "shadow_cons_wins": int}} + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute("SELECT ticker, is_blocked, shadow_cons_wins FROM wf_state") + return { + r[0]: {"is_blocked": bool(r[1]), "shadow_cons_wins": int(r[2])} + for r in cur.fetchall() + } + + +def delete_wf_state(ticker: str) -> None: + """WF 상태 삭제 (WF 해제 시).""" + with _conn() as conn: + conn.cursor().execute( + "DELETE FROM wf_state WHERE ticker = :ticker", {"ticker": ticker} + ) + + def load_positions() -> list[dict]: """저장된 전체 포지션 로드.""" sql = """ diff --git a/core/trader.py b/core/trader.py index a51633e..656d0c5 100644 --- a/core/trader.py +++ b/core/trader.py @@ -18,6 +18,7 @@ from .price_db import ( ensure_trade_results_table, record_trade, load_recent_wins, ensure_sell_prices_table, upsert_sell_price, load_sell_prices, get_cumulative_krw_profit, + ensure_wf_state_table, upsert_wf_state, load_wf_states, delete_wf_state, ) load_dotenv() @@ -148,6 +149,10 @@ def _shadow_enter(ticker: str) -> None: } cons = _shadow_cons_wins.get(ticker, 0) + try: + upsert_wf_state(ticker, is_blocked=True, shadow_cons_wins=cons) + except Exception as e: + logger.error(f"WF 상태 DB 저장 실패 {ticker}: {e}") logger.info( f"[Shadow진입] {ticker} @ {price:,.0f}원 " f"(가상 — WF 재활 {cons}/{WF_SHADOW_WINS}연승 필요)" @@ -185,6 +190,15 @@ def close_shadow(ticker: str, sell_price: float, pnl_pct: float, reason: str) -> if do_wf_reset: _shadow_cons_wins.pop(ticker, None) + # shadow 상태 DB 갱신 (_shadow_lock 해제 후) + try: + if do_wf_reset: + delete_wf_state(ticker) + else: + upsert_wf_state(ticker, is_blocked=True, shadow_cons_wins=cons) + except Exception as e: + logger.error(f"WF 상태 DB 갱신 실패 {ticker}: {e}") + mark = "✅" if is_win else "❌" logger.info( f"[Shadow청산] {ticker} {spos['buy_price']:,.0f}→{sell_price:,.0f}원 " @@ -239,7 +253,7 @@ def restore_positions() -> None: DB에 저장된 실제 매수가를 복원하고, Upbit 잔고에 없으면 DB에서도 삭제한다. """ - # trade_results / sell_prices 테이블 초기화 + # trade_results / sell_prices / wf_state 테이블 초기화 try: ensure_trade_results_table() except Exception as e: @@ -248,6 +262,21 @@ def restore_positions() -> None: # 시작 시 복리 예산 복원 (이전 세션 수익 반영) _recalc_compound_budget() + # WF 상태 복원 (shadow 연속승 횟수 유지) + try: + ensure_wf_state_table() + wf_states = load_wf_states() + for ticker, state in wf_states.items(): + if state["is_blocked"]: + _shadow_cons_wins[ticker] = state["shadow_cons_wins"] + if wf_states: + logger.info( + f"[복원] WF 차단 상태 {len(wf_states)}건 복원: " + + ", ".join(f"{t}(shadow={s['shadow_cons_wins']})" for t, s in wf_states.items()) + ) + except Exception as e: + logger.warning(f"WF 상태 복원 실패 (무시): {e}") + try: ensure_sell_prices_table() except Exception as e: