From 0b264b304c4d661d1aa737ac8a31debca870ee77 Mon Sep 17 00:00:00 2001 From: joungmin Date: Sat, 28 Feb 2026 23:28:27 +0900 Subject: [PATCH] feat: add backtest module with DB cache and scenario comparison Backtest improvements: - Add backtest.py with Oracle DB-backed OHLCV cache (no repeated API calls) - Add backtest_trades table to cache simulation results by params hash (same params -> instant load, skip re-simulation) - Add walk-forward scenario comparison (--walkforward-cmp) - Add trend ceiling filter (--trend-cmp, max gain threshold) - Add ticker win-rate filter (--ticker-cmp, SQL-based instant analysis) - Precompute daily_features once per data load (not per scenario) Live bot fixes: - monitor: add hard stop-loss from buy price (in addition to trailing) - strategy: fix re-entry condition to require +1% above last sell price - price_collector: add 48h backfill on startup for trend calculation - main: call backfill_prices() at startup Co-Authored-By: Claude Sonnet 4.6 --- backtest.py | 1556 +++++++++++++++++++++++++++++++++++++++ core/monitor.py | 25 +- core/price_collector.py | 35 +- core/strategy.py | 14 +- main.py | 8 +- 5 files changed, 1621 insertions(+), 17 deletions(-) create mode 100644 backtest.py diff --git a/backtest.py b/backtest.py new file mode 100644 index 0000000..7f3181a --- /dev/null +++ b/backtest.py @@ -0,0 +1,1556 @@ +"""Strategy C 백테스트 — 상위 N종목 × 최근 3개월 시뮬레이션. + +실행: + python backtest.py # 손절선 비교 (1% vs 1.5% vs 2%), 15분봉 + python backtest.py --10m # 10분봉, 손절선 비교 + python backtest.py --15m # 15분봉, 손절선 비교 + python backtest.py --20m # 20분봉, 손절선 비교 + python backtest.py --interval-cmp # 봉주기 비교 (10m vs 15m vs 20m), 손절 1.5% 고정 + python backtest.py --btc-cmp # BTC 필터 유무 비교 (없음 / 2h하락 / MA20 / 둘다) + python backtest.py --vol # 변동성 돌파 전략 (K=0.3/0.5/0.7, 일봉 기반) + python backtest.py --hard-stop-cmp # 하드스탑 비교 (트레일링 1.5% 고정, 하드 1.5/3/5/없음) + python backtest.py --top50-cmp # 종목수(20vs50) + 거래대금 급증 비교 (2x/3x/4x) + python backtest.py --trend-cmp # 추세 상한선 비교 (+5/7/10/15% 상한) + python backtest.py --ticker-cmp # 종목 승률 필터 비교 (SQL 즉시 분석) + +캐시: Oracle ADB backtest_ohlcv / backtest_daily / backtest_trades 테이블 + 시뮬레이션 결과는 backtest_trades에 파라미터 해시로 저장 — 동일 파라미터 재실행 시 즉시 반환 +""" +from __future__ import annotations + +import os +import sys +import time +from contextlib import contextmanager +from datetime import datetime, timedelta +from typing import Optional + +import hashlib +import json +import oracledb +import pandas as pd +import numpy as np +import pyupbit +import requests +from dotenv import load_dotenv + +load_dotenv() + +MIN_CANDLES = 100 # DB에 이 수 이상의 봉이 있으면 API 재호출 없이 DB 사용 + +# ── 파라미터 ──────────────────────────────────────────────────────────────── +TOP_N = 20 +MONTHS = 3 +MA_PERIOD = 20 +VOLUME_MULT = 2.0 +STOP_LOSS = 0.05 # 비교 대상 — run_scenario() 에서 오버라이드 +TIME_STOP_H = 8 +TIME_STOP_GAIN = 0.03 +PER_POSITION = 3_333_333 +FEE = 0.0005 + +CANDLES_PER_PAGE = 200 + +EXCLUDE = {"KRW-USDT", "KRW-USDC", "KRW-DAI", "KRW-BUSD", "KRW-USD1", "KRW-USDE"} + +# ── 봉주기별 설정 ───────────────────────────────────────────────────────── +INTERVAL_CONFIG = { + "minute10": {"label": "10분봉", "trend": 0.007, "max_pages": 70}, + "minute15": {"label": "15분봉", "trend": 0.010, "max_pages": 50}, + "minute20": {"label": "20분봉", "trend": 0.013, "max_pages": 35}, + "minute60": {"label": "1시간봉", "trend": 0.030, "max_pages": 30}, +} + +# ── 실행 모드 결정 ──────────────────────────────────────────────────────── +COMPARE_INTERVALS = "--interval-cmp" in sys.argv # 봉주기 비교 모드 +COMPARE_BTC = "--btc-cmp" in sys.argv # BTC 필터 비교 모드 +COMPARE_VOL = "--vol" in sys.argv # 변동성 돌파 비교 모드 +COMPARE_HARDSTOP = "--hard-stop-cmp" in sys.argv # 하드 스탑 비교 모드 +COMPARE_TOP50 = "--top50-cmp" in sys.argv # 종목수 + 거래대금 급증 비교 모드 +COMPARE_TREND = "--trend-cmp" in sys.argv # 추세 상한선 비교 모드 +COMPARE_TICKER = "--ticker-cmp" in sys.argv # 종목 승률 필터 비교 모드 +COMPARE_WF = "--walkforward-cmp" in sys.argv # walk-forward 필터 비교 모드 + +if "--10m" in sys.argv: + DEFAULT_INTERVAL = "minute10" +elif "--15m" in sys.argv: + DEFAULT_INTERVAL = "minute15" +elif "--20m" in sys.argv: + DEFAULT_INTERVAL = "minute20" +else: + DEFAULT_INTERVAL = "minute15" # 기본값: 15분봉 + + +# ── 1. 데이터 수집 ──────────────────────────────────────────────────────────── + +def get_top_tickers(n: int = TOP_N) -> list[str]: + """24시간 거래대금 기준 상위 N개 KRW 종목 (6개월 이상 데이터 있는 것만).""" + all_tickers = pyupbit.get_tickers(fiat="KRW") + resp = requests.get( + "https://api.upbit.com/v1/ticker", + params={"markets": ",".join(all_tickers[:100])}, + timeout=10, + ) + data = [d for d in resp.json() if d["market"] not in EXCLUDE] + data.sort(key=lambda x: x.get("acc_trade_price_24h", 0), reverse=True) + + result = [] + for d in data: + if len(result) >= n: + break + ticker = d["market"] + daily = pyupbit.get_ohlcv(ticker, interval="day", count=200) + if daily is None or len(daily) < 180: + print(f" {ticker} 스킵 (데이터 {0 if daily is None else len(daily)}일)") + time.sleep(0.1) + continue + result.append(ticker) + time.sleep(0.1) + return result + + +def fetch_ohlcv(ticker: str, months: int = MONTHS, + interval: str = DEFAULT_INTERVAL, + max_pages: int = 50) -> pd.DataFrame: + """N개월치 OHLCV 수집 (200개씩 역방향 페이징).""" + start = datetime.now() - timedelta(days=30 * months) + frames: list[pd.DataFrame] = [] + to = datetime.now() + + for _ in range(max_pages): + df = pyupbit.get_ohlcv(ticker, interval=interval, count=CANDLES_PER_PAGE, to=to) + if df is None or df.empty: + break + frames.append(df) + if df.index[0] <= start: + break + to = df.index[0] - timedelta(seconds=1) + time.sleep(0.12) + + if not frames: + return pd.DataFrame() + + result = pd.concat(frames).sort_index() + result = result[~result.index.duplicated(keep="last")] + return result[result.index >= start].copy() + + +def fetch_daily(ticker: str) -> pd.DataFrame: + """일봉 200개 수집 (MA 계산용).""" + df = pyupbit.get_ohlcv(ticker, interval="day", count=200) + return df if df is not None else pd.DataFrame() + + +# ── Oracle DB 캐시 ──────────────────────────────────────────────────────────── + +_db_pool: Optional[oracledb.ConnectionPool] = None + + +def _get_pool() -> oracledb.ConnectionPool: + global _db_pool + if _db_pool is None: + kwargs: dict = dict( + user=os.environ["ORACLE_USER"], + password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"], + min=1, max=3, increment=1, + ) + wallet = os.environ.get("ORACLE_WALLET") + if wallet: + kwargs["config_dir"] = wallet + _db_pool = oracledb.create_pool(**kwargs) + return _db_pool + + +@contextmanager +def _conn(): + pool = _get_pool() + conn = pool.acquire() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + pool.release(conn) + + +def ensure_tables() -> None: + """백테스트 캐시 테이블 없으면 생성.""" + ddl_ohlcv = """ + CREATE TABLE backtest_ohlcv ( + ticker VARCHAR2(20) NOT NULL, + interval_cd VARCHAR2(20) NOT NULL, + ts TIMESTAMP NOT NULL, + open_p NUMBER(20,8), + high_p NUMBER(20,8), + low_p NUMBER(20,8), + close_p NUMBER(20,8), + volume_p NUMBER(30,8), + fetched_at TIMESTAMP DEFAULT SYSTIMESTAMP, + CONSTRAINT pk_bt_ohlcv PRIMARY KEY (ticker, interval_cd, ts) + ) + """ + ddl_daily = """ + CREATE TABLE backtest_daily ( + ticker VARCHAR2(20) NOT NULL, + ts DATE NOT NULL, + open_p NUMBER(20,8), + high_p NUMBER(20,8), + low_p NUMBER(20,8), + close_p NUMBER(20,8), + volume_p NUMBER(30,8), + fetched_at TIMESTAMP DEFAULT SYSTIMESTAMP, + CONSTRAINT pk_bt_daily PRIMARY KEY (ticker, ts) + ) + """ + ddl_trades = """ + CREATE TABLE backtest_trades ( + id VARCHAR2(36) DEFAULT SYS_GUID() PRIMARY KEY, + params_hash VARCHAR2(32) NOT NULL, + ticker VARCHAR2(20) NOT NULL, + interval_cd VARCHAR2(20) NOT NULL, + stop_loss NUMBER(8,5) NOT NULL, + trend_min NUMBER(8,5) NOT NULL, + trend_max NUMBER(8,5) NOT NULL, + volume_mult NUMBER(6,2) NOT NULL, + entry_ts TIMESTAMP NOT NULL, + exit_ts TIMESTAMP NOT NULL, + buy_price NUMBER(20,8) NOT NULL, + sell_price NUMBER(20,8) NOT NULL, + pnl_pct NUMBER(10,4) NOT NULL, + elapsed_h NUMBER(8,2) NOT NULL, + reason VARCHAR2(30) NOT NULL, + created_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL + ) + """ + ddl_trades_idx = """ + CREATE INDEX idx_bt_trades_hash ON backtest_trades (params_hash) + """ + with _conn() as conn: + for ddl in (ddl_ohlcv, ddl_daily, ddl_trades): + try: + conn.cursor().execute(ddl) + except oracledb.DatabaseError as e: + if e.args[0].code != 955: # ORA-00955: 이미 존재 + raise + try: + conn.cursor().execute(ddl_trades_idx) + except oracledb.DatabaseError as e: + if e.args[0].code not in (955, 1408): # 이미 존재 + raise + + +def _is_fresh(ticker: str, interval_cd: str) -> bool: + """DB에 해당 ticker+interval 데이터가 3개월치 충분히 있는지 확인. + + 한 번 수집된 데이터는 API 재호출 없이 DB에서만 사용한다. + """ + start = datetime.now() - timedelta(days=30 * MONTHS) + sql = """ + SELECT COUNT(*) FROM backtest_ohlcv + WHERE ticker = :t AND interval_cd = :i AND ts >= :s + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"t": ticker, "i": interval_cd, "s": start}) + row = cur.fetchone() + return bool(row and row[0] >= MIN_CANDLES) + + +def _is_daily_fresh(ticker: str) -> bool: + """일봉 캐시 확인 (데이터 충분히 있으면 API 재호출 안 함).""" + sql = "SELECT COUNT(*) FROM backtest_daily WHERE ticker = :t" + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"t": ticker}) + row = cur.fetchone() + return bool(row and row[0] >= 30) + + +def load_ohlcv_from_db(ticker: str, interval_cd: str, months: int) -> pd.DataFrame: + """DB에서 OHLCV 로드.""" + start = datetime.now() - timedelta(days=30 * months) + sql = """ + SELECT ts, open_p, high_p, low_p, close_p, volume_p + FROM backtest_ohlcv + WHERE ticker = :t AND interval_cd = :i AND ts >= :s + ORDER BY ts + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"t": ticker, "i": interval_cd, "s": start}) + rows = cur.fetchall() + if not rows: + return pd.DataFrame() + idx = pd.DatetimeIndex([r[0] for r in rows]) + df = pd.DataFrame( + [(float(r[1]), float(r[2]), float(r[3]), float(r[4]), float(r[5])) for r in rows], + columns=["open", "high", "low", "close", "volume"], + index=idx, + ) + df.index.name = None + return df + + +def load_daily_from_db(ticker: str) -> pd.DataFrame: + """DB에서 일봉 로드.""" + sql = """ + SELECT ts, open_p, high_p, low_p, close_p, volume_p + FROM backtest_daily + WHERE ticker = :t + ORDER BY ts + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"t": ticker}) + rows = cur.fetchall() + if not rows: + return pd.DataFrame() + idx = pd.DatetimeIndex([r[0] for r in rows]) + df = pd.DataFrame( + [(float(r[1]), float(r[2]), float(r[3]), float(r[4]), float(r[5])) for r in rows], + columns=["open", "high", "low", "close", "volume"], + index=idx, + ) + df.index.name = None + return df + + +def save_ohlcv_to_db(ticker: str, interval_cd: str, df: pd.DataFrame) -> None: + """OHLCV DataFrame을 DB에 저장 (중복 키 무시 — 기존 데이터 유지).""" + rows = [ + (ticker, interval_cd, ts.to_pydatetime(), + float(row["open"]), float(row["high"]), + float(row["low"]), float(row["close"]), float(row["volume"])) + for ts, row in df.iterrows() + ] + sql_ins = """ + INSERT INTO backtest_ohlcv + (ticker, interval_cd, ts, open_p, high_p, low_p, close_p, volume_p) + VALUES (:1, :2, :3, :4, :5, :6, :7, :8) + """ + with _conn() as conn: + conn.cursor().executemany(sql_ins, rows, batcherrors=True) + + +def save_daily_to_db(ticker: str, df: pd.DataFrame) -> None: + """일봉 DataFrame을 DB에 저장 (중복 키 무시).""" + rows = [ + (ticker, ts.to_pydatetime().date(), + float(row["open"]), float(row["high"]), + float(row["low"]), float(row["close"]), float(row["volume"])) + for ts, row in df.iterrows() + ] + sql_ins = """ + INSERT INTO backtest_daily + (ticker, ts, open_p, high_p, low_p, close_p, volume_p) + VALUES (:1, :2, :3, :4, :5, :6, :7) + """ + with _conn() as conn: + conn.cursor().executemany(sql_ins, rows, batcherrors=True) + + +def fetch_all_data(tickers: list[str], interval: str) -> dict: + """모든 종목의 OHLCV + daily 데이터를 수집. + + DB 캐시(24h 이내)가 있으면 DB에서 로드, 없으면 API → DB 저장. + + Returns: + {ticker: {"ohlcv": DataFrame, "daily": DataFrame}} + """ + cfg = INTERVAL_CONFIG[interval] + max_pages = cfg["max_pages"] + ensure_tables() + result = {} + for i, ticker in enumerate(tickers, 1): + # ── OHLCV ────────────────────────────────────────────────────────── + if _is_fresh(ticker, interval): + ohlcv = load_ohlcv_from_db(ticker, interval, MONTHS) + src_ohlcv = "DB캐시" + else: + ohlcv = fetch_ohlcv(ticker, MONTHS, interval, max_pages) + if not ohlcv.empty: + save_ohlcv_to_db(ticker, interval, ohlcv) + src_ohlcv = "API→DB저장" + time.sleep(0.3) + + # ── Daily ─────────────────────────────────────────────────────────── + if _is_daily_fresh(ticker): + daily = load_daily_from_db(ticker) + src_daily = "DB캐시" + else: + daily = fetch_daily(ticker) + if not daily.empty: + save_daily_to_db(ticker, daily) + src_daily = "API→DB저장" + time.sleep(0.1) + + if ohlcv.empty or daily.empty: + print(f" [{i}/{len(tickers)}] {ticker} → 데이터 없음, 스킵") + continue + + print(f" [{i}/{len(tickers)}] {ticker} " + f"(OHLCV:{src_ohlcv} | Daily:{src_daily} | {len(ohlcv)}봉)") + result[ticker] = {"ohlcv": ohlcv, "daily": daily} + return result + + +# ── 시뮬레이션 결과 DB 캐시 ────────────────────────────────────────────────── + +def _params_hash(params: dict) -> str: + """파라미터 딕셔너리 → SHA256 hex (32자).""" + s = json.dumps(params, sort_keys=True, default=str) + return hashlib.sha256(s.encode()).hexdigest()[:32] + + +def _trades_cached(params_hash: str) -> bool: + sql = "SELECT COUNT(*) FROM backtest_trades WHERE params_hash = :h" + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"h": params_hash}) + return bool(cur.fetchone()[0] > 0) + + +def save_trades_to_db(trades: list[dict], params: dict) -> None: + """시뮬레이션 결과 bulk insert.""" + if not trades: + return + ph = _params_hash(params) + sql = """ + INSERT INTO backtest_trades + (params_hash, ticker, interval_cd, + stop_loss, trend_min, trend_max, volume_mult, + entry_ts, exit_ts, buy_price, sell_price, + pnl_pct, elapsed_h, reason) + VALUES + (:ph, :ticker, :icd, + :sl, :tmin, :tmax, :vmult, + :entry, :exit, :bp, :sp, + :pnl, :el, :reason) + """ + rows = [ + { + "ph": ph, + "ticker": t["ticker"], + "icd": params["interval_cd"], + "sl": params["stop_loss"], + "tmin": params["trend_min_gain"], + "tmax": params["trend_max_gain"], + "vmult": params["volume_mult"], + "entry": t["entry"], + "exit": t["exit"], + "bp": t["buy_price"], + "sp": t["sell_price"], + "pnl": t["pnl_pct"], + "el": t["elapsed_h"], + "reason": t["reason"], + } + for t in trades + ] + with _conn() as conn: + conn.cursor().executemany(sql, rows) + print(f" → DB 저장: {len(rows)}건 (hash={ph[:8]}...)") + + +def load_trades_from_db(params: dict) -> Optional[list[dict]]: + """캐시된 결과 반환. 없으면 None.""" + ph = _params_hash(params) + sql = """ + SELECT ticker, entry_ts, exit_ts, buy_price, sell_price, + pnl_pct, elapsed_h, reason + FROM backtest_trades + WHERE params_hash = :h + ORDER BY entry_ts + """ + with _conn() as conn: + cur = conn.cursor() + cur.execute(sql, {"h": ph}) + rows = cur.fetchall() + if not rows: + return None + return [ + { + "ticker": r[0], + "entry": r[1], + "exit": r[2], + "buy_price": float(r[3]), + "sell_price": float(r[4]), + "pnl_pct": float(r[5]), + "elapsed_h": float(r[6]), + "reason": r[7], + } + for r in rows + ] + + +def build_daily_features(daily: pd.DataFrame) -> pd.DataFrame: + """MA20, 평균 거래량 계산 (look-ahead 방지: shift(1) 사용).""" + d = daily.copy() + d["ma20"] = d["close"].rolling(MA_PERIOD).mean().shift(1) + d["vol_avg"] = d["volume"].rolling(MA_PERIOD).mean().shift(1) + d["prev_vol"] = d["volume"].shift(1) + d.index = pd.Index([dt.date() for dt in d.index]) + return d + + +def fetch_btc_data(interval: str) -> dict: + """BTC OHLCV + 일봉 데이터 로드 (DB 캐시 우선). + + Returns: + {"ohlcv": DataFrame, "daily": DataFrame} + """ + ticker = "KRW-BTC" + cfg = INTERVAL_CONFIG[interval] + ensure_tables() + + if _is_fresh(ticker, interval): + ohlcv = load_ohlcv_from_db(ticker, interval, MONTHS) + print(f" BTC OHLCV: DB캐시 ({len(ohlcv)}봉)") + else: + ohlcv = fetch_ohlcv(ticker, MONTHS, interval, cfg["max_pages"]) + if not ohlcv.empty: + save_ohlcv_to_db(ticker, interval, ohlcv) + print(f" BTC OHLCV: API→DB저장 ({len(ohlcv)}봉)") + time.sleep(0.3) + + if _is_daily_fresh(ticker): + daily = load_daily_from_db(ticker) + print(f" BTC Daily: DB캐시 ({len(daily)}일)") + else: + daily = fetch_daily(ticker) + if not daily.empty: + save_daily_to_db(ticker, daily) + print(f" BTC Daily: API→DB저장 ({len(daily)}일)") + + return {"ohlcv": ohlcv, "daily": daily} + + +def build_btc_context(btc_data: dict, lookback_candles: int = 8) -> dict: + """BTC 필터용 시계열 인덱스 준비. + + Args: + lookback_candles: 단기 하락 판단 기준봉 수 (15분봉 기준 8봉 = 2시간) + + Returns: + { + "close": Series (timestamp → BTC 종가), + "2h_change": Series (timestamp → 2h 전 대비 변화율), + "daily_ma20": dict (date → BTC 일봉 MA20), + } + """ + ohlcv = btc_data["ohlcv"] + daily = btc_data["daily"].copy() + + btc_close = ohlcv["close"] + btc_2h_change = btc_close.pct_change(periods=lookback_candles) + + daily["ma20"] = daily["close"].rolling(20).mean().shift(1) + daily.index = pd.Index([dt.date() for dt in daily.index]) + btc_daily_ma20 = daily["ma20"].to_dict() + + return { + "close": btc_close, + "2h_change": btc_2h_change, + "daily_ma20": btc_daily_ma20, + } + + +# ── 2. 시뮬레이션 ───────────────────────────────────────────────────────────── + +def simulate( + ticker: str, + hourly: pd.DataFrame, + daily_feat: pd.DataFrame, + stop_loss: float, + trend_min_gain: float, + trend_max_gain: float = 999.0, # 추세 상한선 (이 % 초과 급등 시 진입 차단) + reentry_above_sell: bool = False, + btc_ctx: Optional[dict] = None, + btc_drop_threshold: float = -0.015, + btc_use_ma_filter: bool = False, + hard_stop: Optional[float] = None, # 매수가 기준 하드 스탑 (None=트레일링과 동일) + volume_mult: float = VOLUME_MULT, # 거래대금 급증 기준 배수 + wf_min_wr: float = 0.0, # walk-forward: 직전 N건 승률 임계값 (0=비활성) + wf_window: int = 5, # walk-forward: 승률 계산 윈도우 크기 +) -> list[dict]: + """단일 종목 전략 시뮬레이션. + + stop_loss : 트레일링 스탑 (최고가 대비 하락률) + hard_stop : 하드 스탑 (매수가 대비 하락률). None이면 stop_loss 값 사용. + volume_mult : 진입 조건 — 전일 거래량 > N일 평균 × volume_mult + wf_min_wr : walk-forward 필터 — 직전 wf_window건 승률이 이 값 미만이면 진입 차단 + 윈도우가 채워지기 전(워밍업)에는 필터 미적용 + """ + _hard_stop = hard_stop if hard_stop is not None else stop_loss + trades: list[dict] = [] + pos: Optional[dict] = None + last_sell_price: Optional[float] = None + wf_history: list[bool] = [] # True=수익, False=손실 (직전 wf_window건) + + closes = hourly["close"].values + times = hourly.index.to_pydatetime() + + for i in range(1, len(hourly)): + current = closes[i] + t = times[i] + date = t.date() + + # ── 포지션 관리 (매도 우선) ────────────────────────────────────────── + if pos is not None: + if current > pos["peak"]: + pos["peak"] = current + + peak = pos["peak"] + bp = pos["buy_price"] + drop_pk = (peak - current) / peak + drop_buy = (bp - current) / bp + pnl = (current - bp) / bp + elapsed = (t - pos["entry"]).total_seconds() / 3600 + + reason = None + if drop_pk >= stop_loss: + reason = "trailing_stop" + elif _hard_stop < 999 and drop_buy >= _hard_stop: + reason = "hard_stop" + elif elapsed >= TIME_STOP_H and pnl < TIME_STOP_GAIN: + reason = "time_stop" + + if reason: + net_pnl = ( + current * (1 - FEE) - bp * (1 + FEE) + ) / (bp * (1 + FEE)) * 100 + trades.append({ + "ticker": ticker, + "entry": pos["entry"], + "exit": t, + "buy_price": round(bp, 4), + "sell_price": round(current, 4), + "pnl_pct": round(net_pnl, 3), + "elapsed_h": round(elapsed, 1), + "reason": reason, + }) + wf_history.append(net_pnl > 0) # walk-forward 이력 업데이트 + last_sell_price = current + pos = None + continue + + # ── 매수 신호 ──────────────────────────────────────────────────────── + if pos is None: + if reentry_above_sell and last_sell_price is not None: + if current < last_sell_price: + continue + + prev_close = closes[i - 1] + if prev_close <= 0: + continue + gain_pct = (current - prev_close) / prev_close + if gain_pct < trend_min_gain: + continue + if gain_pct > trend_max_gain: + continue + + # ── Walk-forward 승률 필터 ──────────────────────────────────────── + # 워밍업(window 미달) 동안은 필터 미적용, 이후 직전 N건 승률 체크 + if wf_min_wr > 0 and len(wf_history) >= wf_window: + recent_wr = sum(wf_history[-wf_window:]) / wf_window + if recent_wr < wf_min_wr: + continue + + if date not in daily_feat.index: + continue + dr = daily_feat.loc[date] + if pd.isna(dr["ma20"]) or pd.isna(dr["vol_avg"]) or dr["vol_avg"] <= 0: + continue + if not (current > dr["ma20"] and dr["prev_vol"] > dr["vol_avg"] * volume_mult): + continue + + # ── BTC 전체 흐름 필터 ──────────────────────────────────────────── + if btc_ctx is not None: + # 1) BTC 2시간 단기 하락 필터 + btc_2h_chg = btc_ctx["2h_change"].asof(t) + if not pd.isna(btc_2h_chg) and btc_2h_chg < btc_drop_threshold: + continue + + # 2) BTC 일봉 MA20 필터 + if btc_use_ma_filter: + btc_ma20 = btc_ctx["daily_ma20"].get(date) + btc_price = btc_ctx["close"].asof(t) + if btc_ma20 and not pd.isna(btc_ma20) and btc_price < btc_ma20: + continue + + bp = current * (1 + FEE) + pos = {"buy_price": bp, "peak": current, "entry": t} + + # 기간 종료 시 미청산 포지션 강제 종료 + if pos is not None: + current = closes[-1] + elapsed = (times[-1] - pos["entry"]).total_seconds() / 3600 + net_pnl = ( + current * (1 - FEE) - pos["buy_price"] * (1 + FEE) + ) / (pos["buy_price"] * (1 + FEE)) * 100 + trades.append({ + "ticker": ticker, + "entry": pos["entry"], + "exit": times[-1], + "buy_price": round(pos["buy_price"], 4), + "sell_price": round(current, 4), + "pnl_pct": round(net_pnl, 3), + "elapsed_h": round(elapsed, 1), + "reason": "open_at_end", + }) + + return trades + + +# ── 3. 결과 리포트 ──────────────────────────────────────────────────────────── + +def report(all_trades: list[dict], label: str = "") -> None: + if not all_trades: + print(f"\n{label} — 거래 없음") + return + + df = pd.DataFrame(all_trades) + n = len(df) + wins = int((df["pnl_pct"] > 0).sum()) + losses = n - wins + wr = wins / n * 100 + avg_pnl = df["pnl_pct"].mean() + avg_win = df[df["pnl_pct"] > 0]["pnl_pct"].mean() if wins > 0 else 0.0 + avg_loss = df[df["pnl_pct"] < 0]["pnl_pct"].mean() if losses > 0 else 0.0 + rr = abs(avg_win / avg_loss) if avg_loss != 0 else float("inf") + + capital = float(PER_POSITION) + peak_cap = capital + mdd = 0.0 + for pnl in df["pnl_pct"]: + capital *= 1 + pnl / 100 + if capital > peak_cap: + peak_cap = capital + dd = (peak_cap - capital) / peak_cap + if dd > mdd: + mdd = dd + total_return = (capital - PER_POSITION) / PER_POSITION * 100 + + if n > 0: + span_days = (df["exit"].max() - df["entry"].min()).total_seconds() / 86400 + trades_per_month = n / (span_days / 30) if span_days > 0 else 0 + else: + trades_per_month = 0 + + bar = "=" * 60 + print(f"\n{bar}") + print(f" {label} | {MONTHS}개월 | 상위 {TOP_N}종목") + print(bar) + print(f" 총 거래수 : {n}건 ({trades_per_month:.1f}건/월)") + print(f" 승률 : {wr:.1f}% ({wins}승 {losses}패)") + print(f" 평균 수익률 : {avg_pnl:+.2f}%") + print(f" 평균 수익 (win) : {avg_win:+.2f}%") + print(f" 평균 손실 (loss) : {avg_loss:+.2f}%") + print(f" 손익비 : {rr:.2f} (>1 이면 유리)") + print(f" 누적 수익률 : {total_return:+.1f}% " + f"({PER_POSITION:,}원 → {capital:,.0f}원)") + print(f" 최대 낙폭 (MDD) : -{mdd * 100:.1f}%") + print(bar) + + print("\n [청산 사유별]") + for reason, grp in df.groupby("reason"): + w = int((grp["pnl_pct"] > 0).sum()) + print(f" {reason:15s}: {len(grp):3d}건 | " + f"승률 {w/len(grp)*100:.0f}% | 평균 {grp['pnl_pct'].mean():+.2f}%") + + print("\n [종목별]") + for ticker, grp in df.groupby("ticker"): + w = int((grp["pnl_pct"] > 0).sum()) + print(f" {ticker:12s}: {len(grp):3d}건 | " + f"승률 {w/len(grp)*100:.0f}% | 평균 {grp['pnl_pct'].mean():+.2f}%") + + print(f"\n [최근 거래 20건]") + print(f" {'종목':<12} {'진입':<18} {'청산':<18} " + f"{'수익률':>7} {'보유':>6} {'사유'}") + print(f" {'-'*75}") + for _, row in df.tail(20).iterrows(): + entry_str = row["entry"].strftime("%m/%d %H:%M") + exit_str = row["exit"].strftime("%m/%d %H:%M") + sign = "✅" if row["pnl_pct"] > 0 else "❌" + print(f" {row['ticker']:<12} {entry_str:<18} {exit_str:<18} " + f"{row['pnl_pct']:>+6.2f}% {row['elapsed_h']:>5.1f}h " + f"{row['reason']} {sign}") + + print(f"\n{bar}\n") + + +def compare_report(scenarios: list[tuple], title: str = "시나리오 비교") -> None: + """여러 시나리오 핵심 지표 비교. scenarios = [(label, trades), ...]""" + bar = "=" * 72 + + def stats(trades: list[dict]) -> dict: + if not trades: + return {} + df = pd.DataFrame(trades) + n = len(df) + wins = int((df["pnl_pct"] > 0).sum()) + capital = float(PER_POSITION) + peak_cap = capital + mdd = 0.0 + for pnl in df["pnl_pct"]: + capital *= 1 + pnl / 100 + if capital > peak_cap: + peak_cap = capital + dd = (peak_cap - capital) / peak_cap + if dd > mdd: + mdd = dd + total_ret = (capital - PER_POSITION) / PER_POSITION * 100 + df_w = df[df["pnl_pct"] > 0]["pnl_pct"] + df_l = df[df["pnl_pct"] < 0]["pnl_pct"] + return { + "n": n, "wins": wins, + "wr": wins / n * 100, + "avg": df["pnl_pct"].mean(), + "ret": total_ret, + "mdd": mdd * 100, + "avg_w": df_w.mean() if len(df_w) else 0, + "avg_l": df_l.mean() if len(df_l) else 0, + } + + all_stats = [(label, stats(trades)) for label, trades in scenarios] + + header = f"{'항목':<16}" + "".join(f"{label:>16}" for label, _ in all_stats) + print(f"\n{bar}") + print(f" {title} | {MONTHS}개월 | 상위 {TOP_N}종목") + print(f" {header}") + print(bar) + for key, label_k in [("n","총 거래수"), ("wr","승률(%)"), + ("avg","평균 손익(%)"), ("ret","누적 수익(%)"), + ("mdd","MDD(%)"), ("avg_w","평균 수익(win)"), + ("avg_l","평균 손실(loss)")]: + row = f" {label_k:<16}" + for _, s in all_stats: + v = s.get(key, 0) + row += f"{v:>+15.1f}%" if key != "n" else f"{v:>16.0f} " + print(row) + print(f"{bar}\n") + + +# ── 4. 시나리오 실행 ────────────────────────────────────────────────────────── + +def run_scenario( + data: dict, + stop_loss: float, + trend_min_gain: float, + trend_max_gain: float = 999.0, + reentry_above_sell: bool = False, + btc_ctx: Optional[dict] = None, + btc_drop_threshold: float = -0.015, + btc_use_ma_filter: bool = False, + hard_stop: Optional[float] = None, + volume_mult: float = VOLUME_MULT, + interval_cd: str = DEFAULT_INTERVAL, + use_db_cache: bool = False, + daily_features: Optional[dict] = None, # 미리 계산된 daily_feat {ticker: df} + wf_min_wr: float = 0.0, # walk-forward 승률 임계값 + wf_window: int = 5, # walk-forward 윈도우 +) -> list[dict]: + """주어진 파라미터로 전체 종목 시뮬레이션 (데이터는 외부에서 주입). + + use_db_cache=True 이면: + - 동일 파라미터 결과가 DB에 있으면 즉시 반환 (재시뮬레이션 없음) + - 없으면 시뮬레이션 후 DB에 저장 + daily_features 를 주입하면 build_daily_features() 중복 호출 생략. + """ + params = { + "interval_cd": interval_cd, + "stop_loss": stop_loss, + "trend_min_gain": trend_min_gain, + "trend_max_gain": round(trend_max_gain, 4), + "volume_mult": volume_mult, + "reentry": reentry_above_sell, + "wf_min_wr": round(wf_min_wr, 3), + "wf_window": wf_window, + } + + if use_db_cache: + cached = load_trades_from_db(params) + if cached is not None: + print(f" → DB 캐시 히트 ({len(cached)}건, hash={_params_hash(params)[:8]}...)") + return cached + + all_trades: list[dict] = [] + for ticker, frames in data.items(): + df = (daily_features or {}).get(ticker) + if df is None: + df = build_daily_features(frames["daily"]) + trades = simulate( + ticker, frames["ohlcv"], df, + stop_loss=stop_loss, + trend_min_gain=trend_min_gain, + trend_max_gain=trend_max_gain, + reentry_above_sell=reentry_above_sell, + btc_ctx=btc_ctx, + btc_drop_threshold=btc_drop_threshold, + btc_use_ma_filter=btc_use_ma_filter, + hard_stop=hard_stop, + volume_mult=volume_mult, + wf_min_wr=wf_min_wr, + wf_window=wf_window, + ) + all_trades.extend(trades) + + if use_db_cache: + save_trades_to_db(all_trades, params) + + return all_trades + + +# ── 5. 메인 ────────────────────────────────────────────────────────────────── + +def main_stop_loss_cmp(interval: str = DEFAULT_INTERVAL) -> None: + """손절선 비교 모드: 1% vs 1.5% vs 2% (봉주기 고정).""" + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + + print(f"\n{'='*60}") + print(f" 손절선 비교 백테스트 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}") + print(f" (재진입: 직전 매도가 이상만 허용 | 모멘텀 기준: {trend*100:.1f}%)") + print(f"{'='*60}") + + print("\n▶ 종목 데이터 로드 중 (DB 캐시 우선)...") + tickers = get_top_tickers(TOP_N) + print(f" → {tickers}") + data = fetch_all_data(tickers, interval) + print(f" 사용 종목: {list(data.keys())}") + daily_features = {t: build_daily_features(f["daily"]) for t, f in data.items()} + + common = dict(reentry_above_sell=True, interval_cd=interval, + use_db_cache=True, daily_features=daily_features) + + print("\n▶ A: 손절 1% ...") + trades_a = run_scenario(data, stop_loss=0.01, trend_min_gain=trend, **common) + print(f" 완료 ({len(trades_a)}건)") + + print("\n▶ B: 손절 1.5% ...") + trades_b = run_scenario(data, stop_loss=0.015, trend_min_gain=trend, **common) + print(f" 완료 ({len(trades_b)}건)") + + print("\n▶ C: 손절 2% ...") + trades_c = run_scenario(data, stop_loss=0.02, trend_min_gain=trend, **common) + print(f" 완료 ({len(trades_c)}건)") + + compare_report([ + ("손절1%", trades_a), + ("손절1.5%", trades_b), + ("손절2%", trades_c), + ], title=f"손절선 비교 ({label}) — 1% vs 1.5% vs 2%") + + report(trades_a, f"손절 1% | {label}") + report(trades_b, f"손절 1.5% | {label}") + report(trades_c, f"손절 2% | {label}") + + +def main_interval_cmp() -> None: + """봉주기 비교 모드: 10분 vs 15분 vs 20분 (손절 1.5% 고정).""" + intervals = ["minute10", "minute15", "minute20"] + stop_loss = 0.015 + + print(f"\n{'='*60}") + print(f" 봉주기 비교 백테스트 | {MONTHS}개월 | 상위 {TOP_N}종목") + print(f" 손절 {stop_loss*100:.1f}% 고정 | (10분봉 vs 15분봉 vs 20분봉)") + print(f"{'='*60}") + + print("\n▶ 종목 목록 조회 중...") + tickers = get_top_tickers(TOP_N) + print(f" → {tickers}") + + all_scenario_trades = [] + all_scenario_data = {} + + for interval in intervals: + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + print(f"\n▶ {label} 데이터 로드 중 (DB 캐시 24h 유효)...") + data = fetch_all_data(tickers, interval) + all_scenario_data[interval] = data + print(f" 사용 종목: {list(data.keys())} ({len(data)}개)") + + print(f" 시뮬레이션 중 (손절={stop_loss*100:.1f}%, 모멘텀={trend*100:.1f}%)...") + trades = run_scenario(data, stop_loss=stop_loss, trend_min_gain=trend, reentry_above_sell=True) + all_scenario_trades.append((label, trades)) + print(f" 완료 ({len(trades)}건)") + + compare_report( + all_scenario_trades, + title=f"봉주기 비교 (손절 {stop_loss*100:.1f}% 고정) — 10분 vs 15분 vs 20분" + ) + + for (label, trades), interval in zip(all_scenario_trades, intervals): + cfg = INTERVAL_CONFIG[interval] + report(trades, f"{label} | 손절 {stop_loss*100:.1f}%") + + +# ── 변동성 돌파 전략 ────────────────────────────────────────────────────────── + +def simulate_vol( + ticker: str, + daily: pd.DataFrame, + K: float = 0.5, + ma_period: int = 5, + use_ma_filter: bool = False, +) -> list[dict]: + """변동성 돌파 전략 시뮬레이션 (일봉 기반). + + 진입: 당일 고가 ≥ 전일종가 + K × 전일변동폭 → 목표가에 매수 + 청산: 익일 시가에 전량 매도 + """ + if len(daily) < ma_period + 2: + return [] + + closes = daily["close"].values + highs = daily["high"].values + lows = daily["low"].values + opens = daily["open"].values + times = daily.index.to_pydatetime() + ma = pd.Series(closes).rolling(ma_period).mean().values + + trades: list[dict] = [] + + for i in range(1, len(daily) - 1): # i+1 필요 (익일 시가 매도) + prev_range = highs[i - 1] - lows[i - 1] + if prev_range <= 0: + continue + + target = closes[i - 1] + K * prev_range + + # MA 필터: 전일 종가 > MA(5) 인 상승장에서만 진입 + if use_ma_filter: + if np.isnan(ma[i - 1]) or closes[i - 1] < ma[i - 1]: + continue + + # 당일 목표가 돌파 여부 + if highs[i] < target: + continue + + buy_price = target * (1 + FEE) + sell_price = opens[i + 1] * (1 - FEE) + pnl = (sell_price - buy_price) / buy_price * 100 + + trades.append({ + "ticker": ticker, + "entry": times[i], + "exit": times[i + 1], + "buy_price": round(buy_price, 4), + "sell_price": round(sell_price, 4), + "pnl_pct": round(pnl, 3), + "elapsed_h": 24.0, + "reason": "vol_breakout", + }) + + return trades + + +def run_vol_scenario( + data: dict, + K: float, + use_ma_filter: bool = False, +) -> list[dict]: + """변동성 돌파 전체 종목 시뮬레이션.""" + all_trades: list[dict] = [] + for ticker, frames in data.items(): + trades = simulate_vol(ticker, frames["daily"], K=K, use_ma_filter=use_ma_filter) + all_trades.extend(trades) + return all_trades + + +def main_vol_cmp() -> None: + """변동성 돌파 전략 K값 비교.""" + print(f"\n{'='*60}") + print(f" 변동성 돌파 백테스트 | {MONTHS}개월 | 상위 {TOP_N}종목 | 일봉") + print(f" 진입: 목표가 돌파 시 매수 | 청산: 익일 시가") + print(f"{'='*60}") + + # 일봉만 필요하므로 daily 캐시 사용 + print("\n▶ 종목 목록 조회 중...") + tickers = get_top_tickers(TOP_N) + print(f" → {tickers}") + + ensure_tables() + data: dict = {} + for i, ticker in enumerate(tickers, 1): + if _is_daily_fresh(ticker): + daily = load_daily_from_db(ticker) + src = "DB캐시" + else: + daily = fetch_daily(ticker) + if not daily.empty: + save_daily_to_db(ticker, daily) + src = "API→DB저장" + time.sleep(0.1) + + if daily.empty: + continue + start = datetime.now() - timedelta(days=30 * MONTHS) + daily = daily[daily.index >= start] + print(f" [{i}/{len(tickers)}] {ticker} ({src} | {len(daily)}일)") + data[ticker] = {"daily": daily} + + print(f"\n 사용 종목: {len(data)}개") + + print("\n▶ A: K=0.3 ...") + trades_a = run_vol_scenario(data, K=0.3) + print(f" 완료 ({len(trades_a)}건)") + + print("\n▶ B: K=0.5 ...") + trades_b = run_vol_scenario(data, K=0.5) + print(f" 완료 ({len(trades_b)}건)") + + print("\n▶ C: K=0.7 ...") + trades_c = run_vol_scenario(data, K=0.7) + print(f" 완료 ({len(trades_c)}건)") + + print("\n▶ D: K=0.5 + MA5 필터 ...") + trades_d = run_vol_scenario(data, K=0.5, use_ma_filter=True) + print(f" 완료 ({len(trades_d)}건)") + + compare_report([ + ("K=0.3", trades_a), + ("K=0.5", trades_b), + ("K=0.7", trades_c), + ("K=0.5+MA5", trades_d), + ], title="변동성 돌파 비교 (K=0.3/0.5/0.7, 일봉)") + + for lbl, trades in [ + ("K=0.3", trades_a), + ("K=0.5", trades_b), + ("K=0.7", trades_c), + ("K=0.5+MA5", trades_d), + ]: + report(trades, lbl) + + +def main_top50_cmp(interval: str = DEFAULT_INTERVAL) -> None: + """종목수(Top20 vs Top50) + 거래대금 급증 배수 비교.""" + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + stop_loss = 0.015 + + print(f"\n{'='*60}") + print(f" 종목수/거래대금 비교 백테스트 | {MONTHS}개월 | {label}") + print(f" 손절 {stop_loss*100:.1f}% 고정 | Top20 vs Top50, 거래대금 2x/3x/4x") + print(f"{'='*60}") + + common = dict(stop_loss=stop_loss, trend_min_gain=trend, reentry_above_sell=True) + + # ── Top 20 데이터 (DB 캐시 대부분 존재) ────────────────────────────────── + print("\n▶ Top 20 종목 데이터 로드 중...") + tickers_20 = get_top_tickers(20) + print(f" → {tickers_20}") + data_20 = fetch_all_data(tickers_20, interval) + print(f" 사용 종목: {len(data_20)}개") + + # ── Top 50 데이터 (추가 30개는 API 수집 필요할 수 있음) ────────────────── + print("\n▶ Top 50 종목 데이터 로드 중...") + tickers_50 = get_top_tickers(50) + print(f" → {tickers_50}") + data_50 = fetch_all_data(tickers_50, interval) + print(f" 사용 종목: {len(data_50)}개") + + # ── 시나리오 실행 ───────────────────────────────────────────────────────── + print("\n▶ A: Top20 / 거래대금 2배 (현재 설정) ...") + trades_a = run_scenario(data_20, **common, volume_mult=2.0) + print(f" 완료 ({len(trades_a)}건)") + + print("\n▶ B: Top50 / 거래대금 2배 ...") + trades_b = run_scenario(data_50, **common, volume_mult=2.0) + print(f" 완료 ({len(trades_b)}건)") + + print("\n▶ C: Top50 / 거래대금 3배 ...") + trades_c = run_scenario(data_50, **common, volume_mult=3.0) + print(f" 완료 ({len(trades_c)}건)") + + print("\n▶ D: Top50 / 거래대금 4배 ...") + trades_d = run_scenario(data_50, **common, volume_mult=4.0) + print(f" 완료 ({len(trades_d)}건)") + + compare_report([ + ("Top20/Vol×2", trades_a), + ("Top50/Vol×2", trades_b), + ("Top50/Vol×3", trades_c), + ("Top50/Vol×4", trades_d), + ], title=f"종목수/거래대금 비교 ({label}, 손절 {stop_loss*100:.1f}%)") + + for lbl, trades in [ + ("Top20 / 거래대금 2배 (현재)", trades_a), + ("Top50 / 거래대금 2배", trades_b), + ("Top50 / 거래대금 3배", trades_c), + ("Top50 / 거래대금 4배", trades_d), + ]: + report(trades, lbl) + + +def main_hard_stop_cmp(interval: str = DEFAULT_INTERVAL) -> None: + """하드 스탑 비교 모드 (트레일링 1.5% 고정, 하드 스탑 가변).""" + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + trailing = 0.015 + + print(f"\n{'='*60}") + print(f" 하드스탑 비교 백테스트 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}") + print(f" 트레일링 {trailing*100:.1f}% 고정 | 하드스탑 가변") + print(f"{'='*60}") + + print("\n▶ 종목 데이터 로드 중...") + tickers = get_top_tickers(TOP_N) + print(f" → {tickers}") + data = fetch_all_data(tickers, interval) + print(f" 사용 종목: {len(data)}개") + + common = dict(stop_loss=trailing, trend_min_gain=trend, reentry_above_sell=True) + + print("\n▶ A: 하드스탑 1.5% (트레일링과 동일, 현재 설정) ...") + trades_a = run_scenario(data, **common, hard_stop=0.015) + print(f" 완료 ({len(trades_a)}건)") + + print("\n▶ B: 하드스탑 3% ...") + trades_b = run_scenario(data, **common, hard_stop=0.03) + print(f" 완료 ({len(trades_b)}건)") + + print("\n▶ C: 하드스탑 5% ...") + trades_c = run_scenario(data, **common, hard_stop=0.05) + print(f" 완료 ({len(trades_c)}건)") + + print("\n▶ D: 하드스탑 없음 (트레일링만) ...") + trades_d = run_scenario(data, **common, hard_stop=999) + print(f" 완료 ({len(trades_d)}건)") + + compare_report([ + ("하드1.5%(현재)", trades_a), + ("하드3%", trades_b), + ("하드5%", trades_c), + ("하드없음", trades_d), + ], title=f"하드스탑 비교 ({label}, 트레일링 {trailing*100:.1f}% 고정)") + + for lbl, trades in [ + ("하드스탑 1.5% (현재)", trades_a), + ("하드스탑 3%", trades_b), + ("하드스탑 5%", trades_c), + ("하드스탑 없음", trades_d), + ]: + report(trades, lbl) + + +def main_btc_cmp(interval: str = DEFAULT_INTERVAL) -> None: + """BTC 전체 흐름 필터 비교 모드.""" + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + stop_loss = 0.015 + + print(f"\n{'='*60}") + print(f" BTC 필터 비교 백테스트 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}") + print(f" 손절 {stop_loss*100:.1f}% 고정") + print(f"{'='*60}") + + print("\n▶ 종목 데이터 로드 중...") + tickers = get_top_tickers(TOP_N) + print(f" → {tickers}") + data = fetch_all_data(tickers, interval) + print(f" 사용 종목: {len(data)}개") + + print("\n▶ BTC 데이터 로드 중...") + btc_raw = fetch_btc_data(interval) + btc_ctx = build_btc_context(btc_raw, lookback_candles=8) # 8봉 = 2시간 (15분봉 기준) + + common = dict(stop_loss=stop_loss, trend_min_gain=trend, reentry_above_sell=True) + + print("\n▶ A: BTC 필터 없음 ...") + trades_a = run_scenario(data, **common) + print(f" 완료 ({len(trades_a)}건)") + + print("\n▶ B: BTC 2h 하락 -1.5% 차단 ...") + trades_b = run_scenario(data, **common, btc_ctx=btc_ctx, btc_drop_threshold=-0.015) + print(f" 완료 ({len(trades_b)}건)") + + print("\n▶ C: BTC MA20 아래 차단 ...") + trades_c = run_scenario(data, **common, btc_ctx=btc_ctx, + btc_drop_threshold=-9999, # 하락 필터 OFF + btc_use_ma_filter=True) + print(f" 완료 ({len(trades_c)}건)") + + print("\n▶ D: BTC 2h 하락 + MA20 동시 적용 ...") + trades_d = run_scenario(data, **common, btc_ctx=btc_ctx, + btc_drop_threshold=-0.015, btc_use_ma_filter=True) + print(f" 완료 ({len(trades_d)}건)") + + compare_report([ + ("필터없음", trades_a), + ("BTC2h하락", trades_b), + ("BTCMA20", trades_c), + ("2h+MA20", trades_d), + ], title=f"BTC 전체흐름 필터 비교 ({label}, 손절{stop_loss*100:.1f}%)") + + for label_s, trades in [ + ("A: 필터 없음", trades_a), + ("B: BTC 2h 하락 차단", trades_b), + ("C: BTC MA20 차단", trades_c), + ("D: 2h 하락 + MA20", trades_d), + ]: + report(trades, label_s) + + +def main_trend_cmp(interval: str = DEFAULT_INTERVAL) -> None: + """추세 상한선 비교: 상한 없음 / +5% / +7% / +10%.""" + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + stop_loss = 0.015 # 트레일링 스탑 고정 + + print(f"\n{'='*60}") + print(f" 추세 상한선 비교 백테스트 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}") + print(f" (트레일링={stop_loss*100:.1f}%, 하한={trend*100:.1f}%, 재진입차단O)") + print(f"{'='*60}") + + print("\n▶ 종목 데이터 로드 중 (DB 캐시 우선)...") + tickers = get_top_tickers(TOP_N) + print(f" → {tickers}") + data = fetch_all_data(tickers, interval) + print(f" 사용 종목: {list(data.keys())}") + daily_features = {t: build_daily_features(f["daily"]) for t, f in data.items()} + + common = dict(stop_loss=stop_loss, trend_min_gain=trend, reentry_above_sell=True, + interval_cd=interval, use_db_cache=True, daily_features=daily_features) + + print("\n▶ A: 상한선 없음 (현행) ...") + trades_a = run_scenario(data, **common) + print(f" 완료 ({len(trades_a)}건)") + + print("\n▶ B: 상한 +5% ...") + trades_b = run_scenario(data, **common, trend_max_gain=0.05) + print(f" 완료 ({len(trades_b)}건)") + + print("\n▶ C: 상한 +7% ...") + trades_c = run_scenario(data, **common, trend_max_gain=0.07) + print(f" 완료 ({len(trades_c)}건)") + + print("\n▶ D: 상한 +10% ...") + trades_d = run_scenario(data, **common, trend_max_gain=0.10) + print(f" 완료 ({len(trades_d)}건)") + + print("\n▶ E: 상한 +15% ...") + trades_e = run_scenario(data, **common, trend_max_gain=0.15) + print(f" 완료 ({len(trades_e)}건)") + + compare_report([ + ("상한없음(현행)", trades_a), + ("상한+5%", trades_b), + ("상한+7%", trades_c), + ("상한+10%", trades_d), + ("상한+15%", trades_e), + ], title=f"추세 상한선 비교 ({label}, 손절{stop_loss*100:.1f}%)") + + for label_s, trades in [ + ("A: 상한 없음", trades_a), + ("B: 상한 +5%", trades_b), + ("C: 상한 +7%", trades_c), + ("D: 상한 +10%", trades_d), + ("E: 상한 +15%", trades_e), + ]: + report(trades, label_s) + + +def main_ticker_walkforward_cmp(interval: str = DEFAULT_INTERVAL) -> None: + """Walk-forward 종목 필터 비교. + + 각 거래 시점에서 '직전 N건 승률'만 참조 → look-ahead bias 없음. + 워밍업(N건 미달) 기간에는 필터 미적용. + + 테스트 조합: + A: 필터 없음 (baseline) + B: 직전 5건 승률 ≥ 30% + C: 직전 5건 승률 ≥ 40% + D: 직전 10건 승률 ≥ 30% + E: 직전 10건 승률 ≥ 40% + """ + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + stop_loss = 0.015 + + print(f"\n{'='*60}") + print(f" Walk-Forward 필터 비교 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}") + print(f" (손절={stop_loss*100:.1f}%, 하한={trend*100:.1f}% / look-ahead bias 없음)") + print(f"{'='*60}") + + print("\n▶ 종목 데이터 로드 중 (DB 캐시 우선)...") + tickers = get_top_tickers(TOP_N) + print(f" → {tickers}") + data = fetch_all_data(tickers, interval) + print(f" 사용 종목: {list(data.keys())}") + daily_features = {t: build_daily_features(f["daily"]) for t, f in data.items()} + + base = dict(stop_loss=stop_loss, trend_min_gain=trend, reentry_above_sell=True, + interval_cd=interval, use_db_cache=True, daily_features=daily_features) + + scenarios = [ + ("A: 필터없음", dict(**base)), + ("B: W5≥30%", dict(**base, wf_min_wr=0.30, wf_window=5)), + ("C: W5≥40%", dict(**base, wf_min_wr=0.40, wf_window=5)), + ("D: W10≥30%", dict(**base, wf_min_wr=0.30, wf_window=10)), + ("E: W10≥40%", dict(**base, wf_min_wr=0.40, wf_window=10)), + ] + + results = [] + for sc_label, sc_params in scenarios: + print(f"\n▶ {sc_label} ...") + trades = run_scenario(data, **sc_params) + print(f" 완료 ({len(trades)}건)") + results.append((sc_label, trades)) + + compare_report(results, title=f"Walk-Forward 필터 비교 ({label})") + + for sc_label, trades in results: + report(trades, sc_label) + + +def main_ticker_filter_cmp(interval: str = DEFAULT_INTERVAL) -> None: + """종목 승률 필터 비교 — 기존 DB 캐시 기반, 순수 SQL 분석. + + 먼저 기본 시나리오(상한없음, 손절1.5%)를 DB에 캐시해두고, + 이후 SQL로 win_rate 임계값별 누적 수익을 즉시 계산. + ※ 전체 기간 승률로 필터링하므로 look-ahead bias 있음 — 방향성 확인용. + """ + cfg = INTERVAL_CONFIG[interval] + label = cfg["label"] + trend = cfg["trend"] + stop_loss = 0.015 + + params = { + "interval_cd": interval, + "stop_loss": stop_loss, + "trend_min_gain": trend, + "trend_max_gain": round(999.0, 4), + "volume_mult": VOLUME_MULT, + "reentry": True, + } + ph = _params_hash(params) + + print(f"\n{'='*60}") + print(f" 종목 승률 필터 비교 | {MONTHS}개월 | 상위 {TOP_N}종목 | {label}") + print(f" (손절={stop_loss*100:.1f}%, 하한={trend*100:.1f}% / look-ahead 주의)") + print(f"{'='*60}") + + ensure_tables() + + # DB에 결과 없으면 시뮬레이션 + if not _trades_cached(ph): + print("\n▶ DB 캐시 없음 — 시뮬레이션 실행 중...") + tickers = get_top_tickers(TOP_N) + data = fetch_all_data(tickers, interval) + daily_features = {t: build_daily_features(f["daily"]) for t, f in data.items()} + run_scenario(data, stop_loss=stop_loss, trend_min_gain=trend, + reentry_above_sell=True, interval_cd=interval, + use_db_cache=True, daily_features=daily_features) + else: + print(f"\n▶ DB 캐시 히트 (hash={ph[:8]}...) — 시뮬레이션 생략") + + # ── 순수 SQL 분석 ────────────────────────────────────────────────────────── + with _conn() as conn: + cur = conn.cursor() + + # 전체 요약 + cur.execute(""" + SELECT COUNT(*), + ROUND(SUM(CASE WHEN pnl_pct > 0 THEN 1 ELSE 0 END)*100/COUNT(*), 1), + ROUND(AVG(pnl_pct), 3), + ROUND(SUM(pnl_pct), 2) + FROM backtest_trades WHERE params_hash = :h + """, {"h": ph}) + total_n, total_wr, total_avg, total_sum = cur.fetchone() + + # 티커별 성과 (SQL) + cur.execute(""" + SELECT ticker, + COUNT(*) as trades, + ROUND(SUM(CASE WHEN pnl_pct > 0 THEN 1 ELSE 0 END)*100/COUNT(*), 1) as win_rate, + ROUND(AVG(pnl_pct), 3) as avg_pnl, + ROUND(SUM(pnl_pct), 2) as total_pnl + FROM backtest_trades + WHERE params_hash = :h + GROUP BY ticker + ORDER BY avg_pnl DESC + """, {"h": ph}) + ticker_rows = cur.fetchall() + + print(f"\n▶ 티커별 성과 (전체 {total_n}건 | 승률 {total_wr}% | 합산 {total_sum:+.1f}%)") + print(f" {'종목':<12} {'건수':>4} {'승률':>6} {'평균':>7} {'합산':>8}") + print(f" {'-'*45}") + for row in ticker_rows: + flag = "✅" if float(row[3]) > 0 else "❌" + print(f" {row[0]:<12} {row[1]:>4}건 {row[2]:>5.0f}% {row[3]:>+6.2f}% {row[4]:>+7.1f}% {flag}") + + # 승률 임계값별 비교 (순수 SQL — 즉시 결과) + print(f"\n▶ 승률 임계값별 필터 비교 (SQL)") + print(f" {'조건':<20} {'거래수':>5} {'승률':>6} {'누적합산':>9}") + print(f" {'-'*45}") + + for min_wr in [0, 20, 25, 30, 35, 40]: + cur.execute(""" + WITH good AS ( + SELECT ticker + FROM backtest_trades + WHERE params_hash = :h + GROUP BY ticker + HAVING SUM(CASE WHEN pnl_pct > 0 THEN 1 ELSE 0 END)*100/COUNT(*) >= :wr + ) + SELECT COUNT(*), + ROUND(SUM(CASE WHEN t.pnl_pct > 0 THEN 1 ELSE 0 END)*100/COUNT(*), 1), + ROUND(SUM(t.pnl_pct), 2) + FROM backtest_trades t + WHERE t.params_hash = :h AND t.ticker IN (SELECT ticker FROM good) + """, {"h": ph, "wr": min_wr}) + r = cur.fetchone() + n_r, wr_r, sum_r = r[0], r[1], r[2] + label_s = "필터 없음(전체)" if min_wr == 0 else f"승률>{min_wr}% 종목만" + print(f" {label_s:<20} {n_r:>5}건 {wr_r:>5.1f}% {sum_r:>+8.1f}%") + + # 어떤 종목이 포함/제외되는지 (30% 기준) + cur.execute(""" + SELECT ticker, + ROUND(SUM(CASE WHEN pnl_pct > 0 THEN 1 ELSE 0 END)*100/COUNT(*), 1) as wr + FROM backtest_trades + WHERE params_hash = :h + GROUP BY ticker + ORDER BY wr + """, {"h": ph}) + print(f"\n [30% 기준 제외 대상]") + for row in cur.fetchall(): + if float(row[1]) < 30: + print(f" ✗ {row[0]:<12} 승률 {row[1]:.0f}%") + + +def main() -> None: + if COMPARE_TOP50: + main_top50_cmp(DEFAULT_INTERVAL) + elif COMPARE_HARDSTOP: + main_hard_stop_cmp(DEFAULT_INTERVAL) + elif COMPARE_VOL: + main_vol_cmp() + elif COMPARE_BTC: + main_btc_cmp(DEFAULT_INTERVAL) + elif COMPARE_TREND: + main_trend_cmp(DEFAULT_INTERVAL) + elif COMPARE_WF: + main_ticker_walkforward_cmp(DEFAULT_INTERVAL) + elif COMPARE_TICKER: + main_ticker_filter_cmp(DEFAULT_INTERVAL) + elif COMPARE_INTERVALS: + main_interval_cmp() + else: + main_stop_loss_cmp(DEFAULT_INTERVAL) + + +if __name__ == "__main__": + main() diff --git a/core/monitor.py b/core/monitor.py index a4538eb..d7bb22d 100644 --- a/core/monitor.py +++ b/core/monitor.py @@ -19,7 +19,7 @@ TIME_STOP_MIN_GAIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) def _check_trailing_stop(ticker: str, pos: dict, current: float) -> bool: - """트레일링 스탑 체크. 매도 시 True 반환.""" + """트레일링 스탑(최고가 기준) + 고정 스탑(매수가 기준) 체크. 매도 시 True 반환.""" trader.update_peak(ticker, current) pos = trader.get_positions().get(ticker) @@ -27,15 +27,24 @@ def _check_trailing_stop(ticker: str, pos: dict, current: float) -> bool: return False peak = pos["peak_price"] + buy_price = pos["buy_price"] drop_from_peak = (peak - current) / peak + drop_from_buy = (buy_price - current) / buy_price # 구매가 대비 하락률 if drop_from_peak >= STOP_LOSS_PCT: reason = ( f"트레일링스탑 | 최고가={peak:,.0f}원 → " f"현재={current:,.0f}원 ({drop_from_peak:.1%} 하락)" ) - trader.sell(ticker, reason=reason) - return True + return trader.sell(ticker, reason=reason) + + if drop_from_buy >= STOP_LOSS_PCT: + reason = ( + f"스탑로스 | 매수가={buy_price:,.0f}원 → " + f"현재={current:,.0f}원 ({drop_from_buy:.1%} 하락)" + ) + return trader.sell(ticker, reason=reason) + return False @@ -70,15 +79,17 @@ def _check_position(ticker: str, pos: dict) -> None: if current is None: return - pnl = (current - pos["buy_price"]) / pos["buy_price"] * 100 + buy_price = pos["buy_price"] + pnl = (current - buy_price) / buy_price * 100 peak = pos["peak_price"] drop_from_peak = (peak - current) / peak + drop_from_buy = (buy_price - current) / buy_price entry_time = pos.get("entry_time", datetime.now()) elapsed_hours = (datetime.now() - entry_time).total_seconds() / 3600 logger.info( - f"[감시] {ticker} 현재={current:,.0f} | 최고={peak:,.0f} | " - f"하락={drop_from_peak:.1%} | 수익률={pnl:+.1f}% | " + f"[감시] {ticker} 현재={current:,.0f} | 매수가={buy_price:,.0f} | 최고={peak:,.0f} | " + f"수익률={pnl:+.1f}% | peak하락={drop_from_peak:.1%} | buy하락={drop_from_buy:.1%} | " f"보유={elapsed_hours:.1f}h" ) @@ -94,7 +105,7 @@ def run_monitor(interval: int = CHECK_INTERVAL) -> None: """전체 포지션 감시 루프.""" logger.info( f"모니터 시작 | 체크={interval}초 | " - f"트레일링스탑={STOP_LOSS_PCT:.0%} | " + f"트레일링스탑={STOP_LOSS_PCT:.1%} | " f"타임스탑={TIME_STOP_HOURS:.0f}h/{TIME_STOP_MIN_GAIN_PCT:+.0f}%" ) while True: diff --git a/core/price_collector.py b/core/price_collector.py index 425ceed..149544e 100644 --- a/core/price_collector.py +++ b/core/price_collector.py @@ -5,10 +5,11 @@ from __future__ import annotations import logging import time +import pyupbit import requests from .market import get_top_tickers -from .price_db import cleanup_old_prices, insert_prices +from .price_db import cleanup_old_prices, insert_prices, insert_prices_with_time logger = logging.getLogger(__name__) @@ -16,6 +17,38 @@ COLLECT_INTERVAL = 600 # 10분 (초) CLEANUP_EVERY = 6 # 1시간(10분 × 6)마다 오래된 데이터 정리 +def backfill_prices(hours: int = 48) -> None: + """시작 시 과거 N시간치 1시간봉 종가를 DB에 백필. + + price_history에 데이터가 없으면 추세 판단이 불가능하므로 + 봇 시작 직후 한 번 호출해 과거 데이터를 채운다. + """ + tickers = get_top_tickers() + if not tickers: + logger.warning("[백필] 종목 목록 없음, 스킵") + return + + count = hours + 2 # 여유 있게 요청 + total_rows = 0 + + for ticker in tickers: + try: + df = pyupbit.get_ohlcv(ticker, interval="minute60", count=count) + if df is None or df.empty: + continue + rows = [ + (ticker, float(row["close"]), ts.to_pydatetime()) + for ts, row in df.iterrows() + ] + insert_prices_with_time(rows) + total_rows += len(rows) + time.sleep(0.1) + except Exception as e: + logger.error(f"[백필] {ticker} 오류: {e}") + + logger.info(f"[백필] 완료 — {len(tickers)}개 종목 / {total_rows}개 레코드 저장") + + def run_collector(interval: int = COLLECT_INTERVAL) -> None: """가격 수집 루프.""" logger.info(f"가격 수집기 시작 (주기={interval//60}분)") diff --git a/core/strategy.py b/core/strategy.py index ea6a1cb..7f57b0f 100644 --- a/core/strategy.py +++ b/core/strategy.py @@ -1,4 +1,4 @@ -"""Strategy C: 실시간 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호.""" +"""Strategy C: 현재 기준 N시간 전 대비 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호.""" from __future__ import annotations @@ -10,8 +10,8 @@ from .price_db import get_price_n_hours_ago logger = logging.getLogger(__name__) -# 추세 판단: N시간 전 대비 +M% 이상이면 상승 중 -TREND_HOURS = float(os.getenv("TREND_HOURS", "1")) +# 추세 판단: 현재 기준 N시간 전 DB 가격 대비 +M% 이상이면 상승 중 +TREND_HOURS = float(os.getenv("TREND_HOURS", "12")) TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "3")) # 모멘텀: MA 기간, 거래량 급증 배수 @@ -20,10 +20,10 @@ VOLUME_MULTIPLIER = 2.0 def check_trend(ticker: str) -> bool: - """상승 추세 조건: 현재가가 N시간 전 대비 +M% 이상.""" + """상승 추세 조건: 현재가가 DB에 저장된 N시간 전 가격 대비 +M% 이상.""" past_price = get_price_n_hours_ago(ticker, TREND_HOURS) if past_price is None: - logger.debug(f"[추세] {ticker} 과거 가격 없음 (데이터 수집 중)") + logger.debug(f"[추세] {ticker} {TREND_HOURS:.0f}h 전 가격 없음 (수집 중)") return False current = get_current_price(ticker) @@ -35,8 +35,8 @@ def check_trend(ticker: str) -> bool: if result: logger.info( - f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.0f} " - f"현재={current:,.0f} (+{gain_pct:.1f}%)" + f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.2f} " + f"현재={current:,.2f} (+{gain_pct:.1f}%)" ) else: logger.debug( diff --git a/main.py b/main.py index d2053c5..bb0e335 100644 --- a/main.py +++ b/main.py @@ -19,7 +19,7 @@ logging.basicConfig( from core.monitor import run_monitor from core.notify import notify_error, notify_status -from core.price_collector import run_collector +from core.price_collector import backfill_prices, run_collector from core.trader import get_positions, restore_positions from daemon.runner import run_scanner @@ -45,6 +45,10 @@ def main() -> None: # 재시작 시 기존 잔고 복원 (이중 매수 방지) restore_positions() + # 과거 가격 백필 (추세 판단용 DB 데이터가 없는 경우 채움) + logger.info("과거 가격 백필 시작 (48시간)...") + backfill_prices(hours=48) + # 트레일링 스탑 감시 스레드 (10초 주기) monitor_thread = threading.Thread( target=run_monitor, args=(10,), daemon=True, name="monitor" @@ -57,7 +61,7 @@ def main() -> None: ) status_thread.start() - # 10분 주기 가격 수집 스레드 (추세 판단용 DB 저장) + # 가격 수집 스레드 (10분 주기 → Oracle DB price_history 저장, 추세 판단용) collector_thread = threading.Thread( target=run_collector, daemon=True, name="collector" )