"""3봉 vol가속 시그널 + 다양한 청산 전략 비교 시뮬 (30일).""" import sys, os from datetime import datetime, timedelta sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) import oracledb LOOKBACK_DAYS = 30 VOL_LOOKBACK = 61 ATR_LOOKBACK = 28 VOL_MIN = 8.0 ATR_MULT = 1.0 ATR_MIN_R = 0.030 ATR_MAX_R = 0.050 MAX_TRAIL_BARS = 240 BUDGET = 15_000_000 MAX_POS = 3 PER_POS = BUDGET // MAX_POS FEE = 0.0005 TICKERS = [ 'KRW-XRP','KRW-BTC','KRW-ETH','KRW-SOL','KRW-DOGE', 'KRW-ADA','KRW-SUI','KRW-NEAR','KRW-KAVA','KRW-SXP', 'KRW-AKT','KRW-SONIC','KRW-IP','KRW-ORBS','KRW-VIRTUAL', 'KRW-BARD','KRW-XPL','KRW-KITE','KRW-ENSO','KRW-0G', ] _TK = ",".join(f"'{t}'" for t in TICKERS) def get_conn(): kwargs = dict(user=os.environ["ORACLE_USER"], password=os.environ["ORACLE_PASSWORD"], dsn=os.environ["ORACLE_DSN"]) if w := os.environ.get("ORACLE_WALLET"): kwargs["config_dir"] = w return oracledb.connect(**kwargs) # ── 3봉 시그널 SQL ───────────────────────────────────────────────────────────── SIGNAL_SQL_3BAR = f""" WITH base AS ( SELECT ticker, ts, open_p, close_p, high_p, low_p, volume_p, LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts) pc1, LAG(open_p,1) OVER (PARTITION BY ticker ORDER BY ts) po1, LAG(close_p,2) OVER (PARTITION BY ticker ORDER BY ts) pc2, LAG(open_p,2) OVER (PARTITION BY ticker ORDER BY ts) po2, GREATEST(high_p-low_p, ABS(high_p-LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts)), ABS(low_p -LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts))) tr FROM backtest_ohlcv WHERE interval_cd='minute1' AND ts >= TO_TIMESTAMP(:ws,'YYYY-MM-DD HH24:MI:SS') AND ticker IN ({_TK}) ), ind AS ( SELECT ticker, ts, open_p, close_p, high_p, low_p, volume_p / NULLIF(AVG(volume_p) OVER ( PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING),0) vr0, LAG(volume_p,1) OVER (PARTITION BY ticker ORDER BY ts) / NULLIF(AVG(volume_p) OVER ( PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING),0) vr1, LAG(volume_p,2) OVER (PARTITION BY ticker ORDER BY ts) / NULLIF(AVG(volume_p) OVER ( PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING),0) vr2, pc1,po1,pc2,po2, AVG(tr) OVER (PARTITION BY ticker ORDER BY ts ROWS BETWEEN {ATR_LOOKBACK} PRECEDING AND 1 PRECEDING) / NULLIF(pc1,0) atr_raw FROM base ) SELECT ticker, ts, vr0, vr1, vr2, atr_raw FROM ind WHERE ts >= TO_TIMESTAMP(:cs,'YYYY-MM-DD HH24:MI:SS') AND vr0 >= {VOL_MIN} -- 3봉 연속 양봉 AND close_p>open_p AND pc1>po1 AND pc2>po2 -- 3봉 연속 가격 가속 AND close_p>pc1 AND pc1>pc2 -- 3봉 연속 볼륨 가속 AND vr0>vr1 AND vr1>vr2 ORDER BY ticker, ts """ # ── 4봉 시그널 SQL (비교용) ──────────────────────────────────────────────────── SIGNAL_SQL_4BAR = f""" WITH base AS ( SELECT ticker, ts, open_p, close_p, high_p, low_p, volume_p, LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts) pc1, LAG(open_p,1) OVER (PARTITION BY ticker ORDER BY ts) po1, LAG(close_p,2) OVER (PARTITION BY ticker ORDER BY ts) pc2, LAG(open_p,2) OVER (PARTITION BY ticker ORDER BY ts) po2, LAG(close_p,3) OVER (PARTITION BY ticker ORDER BY ts) pc3, LAG(open_p,3) OVER (PARTITION BY ticker ORDER BY ts) po3, GREATEST(high_p-low_p, ABS(high_p-LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts)), ABS(low_p -LAG(close_p,1) OVER (PARTITION BY ticker ORDER BY ts))) tr FROM backtest_ohlcv WHERE interval_cd='minute1' AND ts >= TO_TIMESTAMP(:ws,'YYYY-MM-DD HH24:MI:SS') AND ticker IN ({_TK}) ), ind AS ( SELECT ticker, ts, open_p, close_p, high_p, low_p, volume_p / NULLIF(AVG(volume_p) OVER ( PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING),0) vr0, LAG(volume_p,1) OVER (PARTITION BY ticker ORDER BY ts) / NULLIF(AVG(volume_p) OVER ( PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING),0) vr1, LAG(volume_p,2) OVER (PARTITION BY ticker ORDER BY ts) / NULLIF(AVG(volume_p) OVER ( PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING),0) vr2, LAG(volume_p,3) OVER (PARTITION BY ticker ORDER BY ts) / NULLIF(AVG(volume_p) OVER ( PARTITION BY ticker ORDER BY ts ROWS BETWEEN {VOL_LOOKBACK} PRECEDING AND 2 PRECEDING),0) vr3, pc1,po1,pc2,po2,pc3,po3, AVG(tr) OVER (PARTITION BY ticker ORDER BY ts ROWS BETWEEN {ATR_LOOKBACK} PRECEDING AND 1 PRECEDING) / NULLIF(pc1,0) atr_raw FROM base ) SELECT ticker, ts, vr0, vr1, vr2, atr_raw FROM ind WHERE ts >= TO_TIMESTAMP(:cs,'YYYY-MM-DD HH24:MI:SS') AND vr0 >= {VOL_MIN} AND close_p>open_p AND pc1>po1 AND pc2>po2 AND pc3>po3 AND close_p>pc1 AND pc1>pc2 AND pc2>pc3 AND vr0>vr1 AND vr1>vr2 AND vr2>vr3 ORDER BY ticker, ts """ def fetch_signals(cur, sql, warmup_since, check_since): cur.execute(sql, {'ws': warmup_since, 'cs': check_since}) rows = cur.fetchall() signals = [] for row in rows: ticker, sig_ts, vr0, vr1, vr2, atr_raw = row[:6] cur.execute( """SELECT close_p, ts FROM backtest_ohlcv WHERE ticker=:t AND interval_cd='minute1' AND ts > :sig AND ts <= :sig + INTERVAL '3' MINUTE ORDER BY ts FETCH FIRST 1 ROWS ONLY""", {'t': ticker, 'sig': sig_ts} ) er = cur.fetchone() if not er: continue ep, ets = float(er[0]), er[1] cur.execute( """SELECT ts, close_p, high_p, low_p FROM backtest_ohlcv WHERE ticker=:t AND interval_cd='minute1' AND ts >= :entry ORDER BY ts FETCH FIRST :n ROWS ONLY""", {'t': ticker, 'entry': ets, 'n': MAX_TRAIL_BARS + 1} ) bars = [(r[0], float(r[1]), float(r[2]), float(r[3])) for r in cur.fetchall()] if not bars: continue signals.append({ 'ticker': ticker, 'entry_ts': ets, 'entry_price': ep, 'atr_raw': float(atr_raw) if atr_raw else 0.0, 'bars': bars, }) signals.sort(key=lambda x: x['entry_ts']) return signals # ── 청산 전략 함수들 ─────────────────────────────────────────────────────────── def sim_trail(bars, ep, ar): stop = max(ATR_MIN_R, min(ATR_MAX_R, ar * ATR_MULT)) if ar > 0 else ATR_MAX_R peak = ep for i, (ts, cp, hp, lp) in enumerate(bars): peak = max(peak, cp) if (peak - cp) / peak >= stop: return dict(status='트레일손절', exit_ts=ts, exit_price=cp, pnl=(cp - ep) / ep * 100, held=i + 1) lts, lcp = bars[-1][0], bars[-1][1] return dict(status='타임아웃' if len(bars) >= MAX_TRAIL_BARS else '진행중', exit_ts=lts, exit_price=lcp, pnl=(lcp - ep) / ep * 100, held=len(bars)) def sim_tp_trail(bars, ep, ar, tp_r): stop = max(ATR_MIN_R, min(ATR_MAX_R, ar * ATR_MULT)) if ar > 0 else ATR_MAX_R tp = ep * (1 + tp_r) peak = ep for i, (ts, cp, hp, lp) in enumerate(bars): if hp >= tp: return dict(status=f'익절+{tp_r*100:.0f}%', exit_ts=ts, exit_price=tp, pnl=tp_r * 100, held=i + 1) peak = max(peak, cp) if (peak - cp) / peak >= stop: return dict(status='트레일손절', exit_ts=ts, exit_price=cp, pnl=(cp - ep) / ep * 100, held=i + 1) lts, lcp = bars[-1][0], bars[-1][1] return dict(status='타임아웃' if len(bars) >= MAX_TRAIL_BARS else '진행중', exit_ts=lts, exit_price=lcp, pnl=(lcp - ep) / ep * 100, held=len(bars)) def sim_tp_sl(bars, ep, tp_r, sl_r): tp = ep * (1 + tp_r) sl = ep * (1 - sl_r) for i, (ts, cp, hp, lp) in enumerate(bars): if lp <= sl: return dict(status=f'손절-{sl_r*100:.0f}%', exit_ts=ts, exit_price=sl, pnl=-sl_r * 100, held=i + 1) if hp >= tp: return dict(status=f'익절+{tp_r*100:.0f}%', exit_ts=ts, exit_price=tp, pnl=tp_r * 100, held=i + 1) lts, lcp = bars[-1][0], bars[-1][1] return dict(status='타임아웃' if len(bars) >= MAX_TRAIL_BARS else '진행중', exit_ts=lts, exit_price=lcp, pnl=(lcp - ep) / ep * 100, held=len(bars)) def pos_limit(sim): opens, taken, skipped = [], [], [] for r in sim: opens = [ex for ex in opens if ex > r['entry_ts']] if len(opens) < MAX_POS: opens.append(r['exit_ts']) taken.append(r) else: skipped.append(r) return taken, skipped def run_strategies(signals, strategies): results = {} for label, mode, tp_r, sl_r in strategies: sim = [] for s in signals: if mode == 'trail': r = sim_trail(s['bars'], s['entry_price'], s['atr_raw']) elif mode == 'tp_trail': r = sim_tp_trail(s['bars'], s['entry_price'], s['atr_raw'], tp_r) else: r = sim_tp_sl(s['bars'], s['entry_price'], tp_r, sl_r) sim.append({**s, **r}) taken, _ = pos_limit(sim) results[label] = taken return results def print_table(title, results, strategies): print(f"\n{'━'*105}") print(f" {title}") print(f"{'━'*105}") print(f" {'전략':32s} {'거래':>3s} {'승률':>4s} {'합산손익':>12s} {'수익률':>5s} {'평균보유':>5s} {'손익비':>5s}") print(f"{'─'*105}") for label, _, _, _ in strategies: taken = results.get(label, []) n = len(taken) if n == 0: print(f" {label:32s} 거래없음") continue wins = sum(1 for r in taken if r['pnl'] > 0) losses = sum(1 for r in taken if r['pnl'] < 0) total = sum(PER_POS * (r['pnl'] / 100) - PER_POS * FEE * 2 for r in taken) avg_h = sum(r['held'] for r in taken) / n ret = total / BUDGET * 100 avg_w = sum(r['pnl'] for r in taken if r['pnl'] > 0) / wins if wins else 0 avg_l = abs(sum(r['pnl'] for r in taken if r['pnl'] < 0) / losses) if losses else 1 rr = avg_w / avg_l print(f" {label:32s} {n:>3d}건 {wins/n*100:>4.0f}% {total:>+12,.0f}원 " f"{ret:>+5.2f}% {avg_h:>5.0f}봉 {rr:>4.1f}:1") print(f"{'━'*105}") def print_detail(label, taken): print(f"\n[{label} 건별 상세]") print(f"{'─'*105}") for i, r in enumerate(taken, 1): krw = PER_POS * (r['pnl'] / 100) - PER_POS * FEE * 2 sign = '▲' if r['pnl'] > 0 else '▼' print(f" #{i:02d} {r['ticker']:12s}[{sign}] {str(r['entry_ts'])[:16]} " f"→ {r['status']:14s} {r['held']:3d}봉 {r['pnl']:>+.2f}% ({krw:>+,.0f}원)") def main(): now = datetime.now() check_since = (now - timedelta(days=LOOKBACK_DAYS)).strftime('%Y-%m-%d 00:00:00') warmup_since = (now - timedelta(days=LOOKBACK_DAYS + 1)).strftime('%Y-%m-%d 00:00:00') conn = get_conn() cur = conn.cursor() cur.arraysize = 10000 print(f"=== 3봉 vs 4봉 진입 비교 시뮬 ===") print(f"기간: {check_since[:10]} ~ {now.strftime('%Y-%m-%d')} (30일)") print(f"VOL≥{VOL_MIN}x | 자본 {BUDGET//10000}만원 / 포지션 {PER_POS//10000}만원 / 동시 {MAX_POS}개\n") # 시그널 수집 print("3봉 시그널 수집 중...", flush=True) sigs_3 = fetch_signals(cur, SIGNAL_SQL_3BAR, warmup_since, check_since) print("4봉 시그널 수집 중...", flush=True) sigs_4 = fetch_signals(cur, SIGNAL_SQL_4BAR, warmup_since, check_since) print(f" → 3봉: {len(sigs_3)}건 / 4봉: {len(sigs_4)}건\n") strategies = [ ('TP 3% + Trail Stop [3~5%]', 'tp_trail', 0.03, None ), ('TP 2% + Trail Stop [3~5%]', 'tp_trail', 0.02, None ), ('TP 2% + SL 2%', 'tp_sl', 0.02, 0.02 ), ('TP 2% + SL 3%', 'tp_sl', 0.02, 0.03 ), ('TP 3% + SL 2%', 'tp_sl', 0.03, 0.02 ), ('TP 3% + SL 3%', 'tp_sl', 0.03, 0.03 ), ] res_3 = run_strategies(sigs_3, strategies) res_4 = run_strategies(sigs_4, strategies) print_table(f"【3봉 진입】 시그널 {len(sigs_3)}건", res_3, strategies) print_table(f"【4봉 진입】 시그널 {len(sigs_4)}건", res_4, strategies) # 3봉에서 가장 나은 전략 상세 best_label = max(res_3, key=lambda k: sum( PER_POS * (r['pnl'] / 100) - PER_POS * FEE * 2 for r in res_3[k])) print_detail(f"3봉 최적: {best_label}", res_3[best_label]) conn.close() if __name__ == '__main__': main()