"""1분봉 연속 상승 vol spike 전략 파라미터 스윕. 시그널 조건: 봉[n-1]: vol_ratio >= VOL_MIN, 양봉 (close > open) 봉[n] : vol_ratio >= VOL_MIN, 양봉, close > 봉[n-1].close (연속 상승) 진입: 봉[n] 다음 봉 (봉[n+1]) close에서 즉시 추적: 1분봉 trail stop (ATR 기반) + time stop DB 계산: 지표·시그널·진입·running_peak 모두 Oracle SQL / 월별 배치 """ import sys, os, itertools from datetime import datetime, timedelta sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) import pandas as pd import oracledb import time as _time # ── 고정 파라미터 ───────────────────────────────────────────────────────────── VOL_LOOKBACK = 61 # vol_ratio 기준: 이전 60봉 평균 ATR_LOOKBACK = 28 # ATR 계산 봉 수 TS_N = 240 # 타임스탑 봉수 (240분 = 4시간) TIME_STOP_PCT = 0.0 / 100 FEE = 0.0005 BUDGET = 15_000_000 MAX_POS = 3 PER_POS = BUDGET // MAX_POS # ── 스윕 파라미터 ───────────────────────────────────────────────────────────── SWEEP = { "VOL": [15.0, 20.0, 25.0, 30.0, 40.0, 50.0], # 시그널 거래량 배율 (상위 범위 탐색) "ATR_MULT": [1.5, 2.0, 2.5, 3.0], "ATR_MIN": [0.005, 0.010, 0.015], "ATR_MAX": [0.020, 0.025, 0.030], } VOL_MIN = min(SWEEP["VOL"]) # SQL pre-filter (15x 이상만 로드) # ── 시뮬 구간 ───────────────────────────────────────────────────────────────── SIM_START = datetime(2025, 8, 1) SIM_END = datetime(2026, 3, 4) WARMUP_MINS = 120 TICKERS = [ 'KRW-XRP','KRW-BTC','KRW-ETH','KRW-SOL','KRW-DOGE', 'KRW-ADA','KRW-SUI','KRW-NEAR','KRW-KAVA','KRW-SXP', 'KRW-AKT','KRW-SONIC','KRW-IP','KRW-ORBS','KRW-VIRTUAL', 'KRW-BARD','KRW-XPL','KRW-KITE','KRW-ENSO','KRW-0G', ] _TK = ",".join(f"'{t}'" for t in TICKERS) def _months(start: datetime, end: datetime): m = start.replace(day=1) while m < end: nxt = (m + timedelta(days=32)).replace(day=1) if nxt > end: nxt = end yield m, nxt m = nxt def _get_conn(): kwargs = dict(user=os.environ["ORACLE_USER"], password=os.environ["ORACLE_PASSWORD"], dsn=os.environ["ORACLE_DSN"]) wallet = os.environ.get("ORACLE_WALLET") if wallet: kwargs["config_dir"] = wallet return oracledb.connect(**kwargs) # ── 핵심 SQL ────────────────────────────────────────────────────────────────── # 연속 2봉 vol spike + 상승 확인 후 다음 봉 즉시 진입 TRADE_SQL = f""" WITH -- 1) 1분봉 + TR + 이전 봉 정보 base AS ( SELECT ticker, ts, open_p, close_p, high_p, low_p, volume_p, LAG(close_p, 1) OVER (PARTITION BY ticker ORDER BY ts) prev_close, LAG(open_p, 1) OVER (PARTITION BY ticker ORDER BY ts) prev_open, LAG(volume_p, 1) OVER (PARTITION BY ticker ORDER BY ts) prev_volume, GREATEST( high_p - low_p, ABS(high_p - LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts)), ABS(low_p - LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts)) ) tr FROM backtest_ohlcv WHERE interval_cd = 'minute1' AND ts >= TO_TIMESTAMP(:load_since, 'YYYY-MM-DD HH24:MI:SS') AND ts < TO_TIMESTAMP(:sim_end, 'YYYY-MM-DD HH24:MI:SS') AND ticker IN ({_TK}) ), -- 2) 지표: vol_ratio (현재봉), prev_vol_ratio (이전봉), atr_raw indicators AS ( SELECT ticker, ts, open_p, close_p, prev_close, prev_open, -- 현재 봉 vol_ratio volume_p / NULLIF( AVG(volume_p) OVER (PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING), 0 ) vol_ratio, -- 이전 봉 vol_ratio (LAG로 한 봉 앞) prev_volume / NULLIF( AVG(volume_p) OVER (PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING), 0 ) prev_vol_ratio, -- ATR AVG(tr) OVER (PARTITION BY ticker ORDER BY ts ROWS BETWEEN {ATR_LOOKBACK} PRECEDING AND 1 PRECEDING) / NULLIF(prev_close, 0) atr_raw FROM base ), -- 3) 연속 2봉 조건: -- 봉[n-1]: prev_vol_ratio >= min_vol, 이전 봉 양봉 -- 봉[n] : vol_ratio >= min_vol, 양봉, close > prev_close (상승 지속) signals AS ( SELECT ticker, ts sig_ts, close_p sig_price, vol_ratio, atr_raw FROM indicators WHERE ts >= TO_TIMESTAMP(:sim_start, 'YYYY-MM-DD HH24:MI:SS') AND ts < TO_TIMESTAMP(:sim_end, 'YYYY-MM-DD HH24:MI:SS') -- 현재 봉 조건 AND vol_ratio >= :min_vol AND close_p > open_p -- 이전 봉 조건 AND prev_vol_ratio >= :min_vol AND prev_close > prev_open -- 연속 상승 AND close_p > prev_close ), -- 4) 진입: 시그널 다음 1분봉 즉시 entry_cands AS ( SELECT s.ticker, s.sig_ts, s.sig_price, s.vol_ratio, s.atr_raw, e.ts entry_ts, e.close_p entry_price, ROW_NUMBER() OVER (PARTITION BY s.ticker, s.sig_ts ORDER BY e.ts) rn FROM signals s JOIN backtest_ohlcv e ON e.ticker = s.ticker AND e.interval_cd = 'minute1' AND e.ts > s.sig_ts AND e.ts <= s.sig_ts + INTERVAL '3' MINUTE ), -- 5) 첫 봉만 entries AS ( SELECT ticker, sig_ts, sig_price, vol_ratio, atr_raw, entry_ts, entry_price FROM entry_cands WHERE rn = 1 ), -- 6) 진입 후 TS_N분 1분봉 + 롤링 피크 post_entry AS ( SELECT e.ticker, e.sig_ts, e.entry_ts, e.entry_price, e.vol_ratio, e.atr_raw, b.close_p bar_price, ROW_NUMBER() OVER (PARTITION BY e.ticker, e.entry_ts ORDER BY b.ts) bar_n, MAX(b.close_p) OVER (PARTITION BY e.ticker, e.entry_ts ORDER BY b.ts ROWS UNBOUNDED PRECEDING) running_peak FROM entries e JOIN backtest_ohlcv b ON b.ticker = e.ticker AND b.interval_cd = 'minute1' AND b.ts >= e.entry_ts AND b.ts <= e.entry_ts + INTERVAL '{TS_N}' MINUTE ) SELECT ticker, sig_ts, entry_ts, entry_price, vol_ratio, atr_raw, bar_n, bar_price, running_peak FROM post_entry WHERE bar_n <= :ts_n + 1 ORDER BY ticker, entry_ts, bar_n """ # ── 월별 데이터 로드 ────────────────────────────────────────────────────────── print(f"연속 2봉 vol spike 전략 (VOL>={VOL_MIN}x, 연속 상승 후 즉시 진입)\n", flush=True) print("월별 DB 로드...\n", flush=True) conn = _get_conn() cur = conn.cursor() cur.arraysize = 100_000 ENTRIES: dict = {} t_load = _time.time() for m_start, m_end in _months(SIM_START, SIM_END): load_since = (m_start - timedelta(minutes=WARMUP_MINS)).strftime('%Y-%m-%d %H:%M:%S') sim_start = m_start.strftime('%Y-%m-%d %H:%M:%S') sim_end = m_end.strftime('%Y-%m-%d %H:%M:%S') t0 = _time.time() cur.execute(TRADE_SQL, { "load_since": load_since, "sim_start": sim_start, "sim_end": sim_end, "min_vol": VOL_MIN, "ts_n": TS_N, }) rows = cur.fetchall() t1 = _time.time() n_new = 0 for row in rows: (ticker, sig_ts, entry_ts, entry_price, vol_ratio, atr_raw, bar_n, bar_price, running_peak) = row key = (ticker, entry_ts) if key not in ENTRIES: ENTRIES[key] = { 'entry_price': float(entry_price), 'vol_ratio': float(vol_ratio), 'atr_raw': float(atr_raw) if atr_raw is not None else float('nan'), 'bars': [], } n_new += 1 ENTRIES[key]['bars'].append((float(bar_price), float(running_peak))) print(f" {sim_start[:7]}: {len(rows):>8,}행 ({t1-t0:.1f}s) | 진입 {n_new:>5}건", flush=True) conn.close() print(f"\n총 진입 이벤트: {len(ENTRIES):,}건 | 로드 {_time.time()-t_load:.1f}s\n", flush=True) # ── 출구 탐색 ───────────────────────────────────────────────────────────────── def find_exit(entry_price: float, atr_stop: float, bars: list) -> float: for n, (bp, pk) in enumerate(bars): drop = (pk - bp) / pk if pk > 0 else 0.0 pnl = (bp - entry_price) / entry_price if drop >= atr_stop: return pnl * 100 if n + 1 >= TS_N and pnl < TIME_STOP_PCT: return pnl * 100 return (bars[-1][0] - entry_price) / entry_price * 100 if bars else 0.0 # ── 스윕 ────────────────────────────────────────────────────────────────────── ENTRY_LIST = list(ENTRIES.values()) keys = list(SWEEP.keys()) combos = list(itertools.product(*SWEEP.values())) print(f"총 {len(combos)}가지 조합 스윕...\n", flush=True) t_sweep = _time.time() results = [] for combo in combos: params = dict(zip(keys, combo)) if params['ATR_MIN'] >= params['ATR_MAX']: continue vol_thr = params['VOL'] atr_mult = params['ATR_MULT'] atr_min = params['ATR_MIN'] atr_max = params['ATR_MAX'] trades = [] for e in ENTRY_LIST: if e['vol_ratio'] < vol_thr: continue ar = e['atr_raw'] atr_s = (atr_min if (ar != ar) else max(atr_min, min(atr_max, ar * atr_mult))) pnl_pct = find_exit(e['entry_price'], atr_s, e['bars']) krw = PER_POS * (pnl_pct / 100) - PER_POS * FEE * 2 trades.append((pnl_pct, krw)) if not trades: results.append({**params, 'trades': 0, 'wins': 0, 'win_rate': 0.0, 'avg_pnl': 0.0, 'total_krw': 0.0}) continue wins = sum(1 for p, _ in trades if p > 0) results.append({ **params, 'trades': len(trades), 'wins': wins, 'win_rate': wins / len(trades) * 100, 'avg_pnl': sum(p for p, _ in trades) / len(trades), 'total_krw': sum(k for _, k in trades), }) print(f"스윕 완료 ({_time.time()-t_sweep:.1f}s)\n") # ── 결과 출력 ───────────────────────────────────────────────────────────────── df_r = pd.DataFrame(results) df_r = df_r[df_r['trades'] > 0].sort_values('total_krw', ascending=False) print("=" * 100) print(f"{'순위':>4} {'VOL':>5} {'ATR_M':>6} {'ATR_N':>6} {'ATR_X':>6} " f"{'건수':>5} {'승률':>6} {'평균PNL':>8} {'총손익':>14}") print("=" * 100) for rank, (_, row) in enumerate(df_r.head(20).iterrows(), 1): print(f"{rank:>4} {row['VOL']:>4.0f}x {row['ATR_MULT']:>6.1f} " f"{row['ATR_MIN']*100:>5.1f}% {row['ATR_MAX']*100:>5.1f}% " f"{int(row['trades']):>5}건 {row['win_rate']:>5.0f}% " f"{row['avg_pnl']:>+7.2f}% {row['total_krw']:>+14,.0f}원") # VOL별 최상위 요약 print("\n" + "─" * 75) print(f" {'VOL':>5} {'건수':>5} {'승률':>5} {'평균PNL':>8} {'총손익':>14} (최적 ATR)") print("─" * 75) for vol in SWEEP["VOL"]: sub = df_r[df_r['VOL'] == vol] if sub.empty: continue best = sub.iloc[0] print(f" {vol:>4.0f}x {int(best['trades']):>5}건 {best['win_rate']:>4.0f}% " f"{best['avg_pnl']:>+7.2f}% {best['total_krw']:>+14,.0f}원 " f"(M={best['ATR_MULT']:.1f} N={best['ATR_MIN']*100:.1f}% X={best['ATR_MAX']*100:.1f}%)")