"""거래량 선행(Volume Lead) 진입 전략 시뮬레이션. 3가지 전략 비교: A (현행): 12h 가격 +5% 확인 + 1h 거래량 급증 → 진입 (이미 오른 뒤 추격) B (신규): 가격 횡보 중 거래량 급증(축적) → 그 후 추세 +N% 시작 시 선진입 C (단순): 거래량 급증만 (베이스라인, 노이즈 확인용) """ import os import pickle import time from datetime import datetime from pathlib import Path import pandas as pd from dotenv import load_dotenv load_dotenv() import pyupbit # ── 공통 파라미터 ───────────────────────────────────── STOP_LOSS_PCT = 0.015 # 트레일링 스탑 1.5% TIME_STOP_HOURS = 8 TIME_STOP_MIN_PCT = 3.0 FEE = 0.0005 LOCAL_VOL_HOURS = 5 # 거래량 기준 이전 N시간 VOL_MULT = 2.0 # 거래량 배수 기준 # 현행 전략 파라미터 TREND_HOURS = 12 TREND_MIN_PCT = 5.0 # B 전략 파라미터: 거래량 선행 + 이후 소규모 추세 확인 PRICE_QUIET_PCT = 2.0 # 거래량 급증 시점 가격 횡보 기준 (2h 변동 < N%) TREND_AFTER_VOL = 1.5 # 축적 신호 후 진입 기준 (vol 시점 대비 +N% 상승 시) SIGNAL_TIMEOUT_H = 8 # 축적 신호 후 N시간 내 추세 미발생 시 초기화 FROM_DATE = "2026-01-15 00:00:00" TICKERS = [ 'KRW-DKA', 'KRW-LAYER', 'KRW-SIGN', 'KRW-SOL', 'KRW-ETH', 'KRW-XRP', 'KRW-HOLO', 'KRW-OM', 'KRW-ORBS', ] CACHE_FILE = Path("vol_lead_cache.pkl") # ── 데이터 로드 ─────────────────────────────────────── def fetch_all(ticker: str, from_date: str): """1h봉 전체 로드 (from_date 이후, 페이지 역방향 수집).""" target = datetime.strptime(from_date, "%Y-%m-%d %H:%M:%S") frames = [] to_dt = None for _ in range(15): # 최대 15페이지 = 3000h ≈ 125일 kwargs: dict = dict(ticker=ticker, interval="minute60", count=200) if to_dt: kwargs["to"] = to_dt.strftime("%Y-%m-%d %H:%M:%S") df = pyupbit.get_ohlcv(**kwargs) if df is None or df.empty: break frames.append(df) oldest = df.index[0].to_pydatetime().replace(tzinfo=None) if oldest <= target: break to_dt = oldest time.sleep(0.2) if not frames: return None result = pd.concat(frames).sort_index().drop_duplicates() result.index = result.index.tz_localize(None) return result[result.index >= target] def load_data() -> dict: if CACHE_FILE.exists(): print(f"캐시 로드: {CACHE_FILE}") return pickle.load(open(CACHE_FILE, "rb")) data = {} for ticker in TICKERS: print(f" {ticker} 로딩...", end=" ", flush=True) df = fetch_all(ticker, FROM_DATE) if df is not None: data[ticker] = df print(f"{len(df)}봉 ({df.index[0].strftime('%m-%d')}~{df.index[-1].strftime('%m-%d')})") else: print("실패") time.sleep(0.3) pickle.dump(data, open(CACHE_FILE, "wb")) return data # ── 포지션 시뮬 ─────────────────────────────────────── def simulate_pos(df: pd.DataFrame, buy_idx: int, buy_price: float): """매수 후 청산 시뮬레이션. - 최고가: 각 봉의 high 기준 - 스탑 발동 체크: 각 봉의 low 기준 (intra-candle 포착) - 청산가: peak × (1 - stop_pct) 근사 """ buy_dt = df.index[buy_idx] peak = buy_price for i in range(buy_idx + 1, len(df)): row = df.iloc[i] ts = df.index[i] if row["high"] > peak: peak = row["high"] elapsed_h = (ts - buy_dt).total_seconds() / 3600 stop_price = peak * (1 - STOP_LOSS_PCT) # 트레일링 스탑 (low가 stop_price 이하 진입 시) if row["low"] <= stop_price: sell_price = stop_price pnl = (sell_price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, sell_price, ts, f"트레일링({pnl:+.1f}%)", pnl # 타임 스탑 pnl_now = (row["close"] - buy_price) / buy_price * 100 if elapsed_h >= TIME_STOP_HOURS and pnl_now < TIME_STOP_MIN_PCT: pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, row["close"], ts, "타임스탑", pnl last = df.iloc[-1]["close"] pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100 return pnl > 0, last, df.index[-1], "데이터종료", pnl # ── 현행 전략 (추세 확인형) ─────────────────────────── def run_trend(df: pd.DataFrame) -> list: """12h 가격 +5% 확인 + 1h 거래량 급증 + 1h 워치리스트.""" trades = [] watchlist_i = None in_pos = False buy_idx = buy_price = None i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2) while i < len(df): if in_pos: is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price) next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) trades.append((is_win, pnl, df.index[buy_idx], sdt, reason)) in_pos = False watchlist_i = None i = next_i continue close = df.iloc[i]["close"] past12 = df.iloc[i - TREND_HOURS]["close"] trend_ok = (close - past12) / past12 * 100 >= TREND_MIN_PCT vol_recent = df.iloc[i - 1]["volume"] vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean() vol_ok = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT if trend_ok and vol_ok: if watchlist_i is None: watchlist_i = i elif i - watchlist_i >= 1: # 1h 확인 in_pos = True buy_idx = i buy_price = close watchlist_i = None else: watchlist_i = None i += 1 return trades # ── B 전략: 거래량 선행 + 소규모 추세 확인 ─────────── def run_vol_lead(df: pd.DataFrame) -> list: """거래량 급증(축적) 감지 후 소규모 추세 확인 시 선진입. 흐름: 1. 직전 1h 거래량 > 이전 5h 평균 × VOL_MULT AND 2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적) → 축적 신호 기록 (signal_price = 현재가) 2. 신호 후 현재가가 signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입 (현행 +5% 대신 작은 기준으로 더 일찍 진입) 3. SIGNAL_TIMEOUT_H 시간 내 추세 미발생 → 신호 초기화 """ trades = [] signal_i = None signal_price = None in_pos = False buy_idx = buy_price = None i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2) while i < len(df): if in_pos: is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price) next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) trades.append((is_win, pnl, df.index[buy_idx], sdt, reason)) in_pos = False signal_i = None signal_price = None i = next_i continue close = df.iloc[i]["close"] close_2h = df.iloc[i - 2]["close"] quiet = abs(close - close_2h) / close_2h * 100 < PRICE_QUIET_PCT vol_recent = df.iloc[i - 1]["volume"] vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean() vol_spike = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT # 축적 신호 갱신 if quiet and vol_spike: if signal_i is None: signal_i = i signal_price = close else: if signal_i is not None and close < signal_price: # 가격 하락 → 축적 실패, 초기화 signal_i = None signal_price = None # 타임아웃 if signal_i is not None and (i - signal_i) > SIGNAL_TIMEOUT_H: signal_i = None signal_price = None # 진입: 축적 신호 후 가격 +TREND_AFTER_VOL% 이상 상승 if signal_i is not None: move = (close - signal_price) / signal_price * 100 if move >= TREND_AFTER_VOL: in_pos = True buy_idx = i buy_price = close signal_i = None signal_price = None i += 1 return trades # ── 결과 출력 ───────────────────────────────────────── def summarize(label: str, trades: list) -> dict: if not trades: print(f" [{label}] 거래 없음") return {"total": 0, "wins": 0, "wr": 0.0, "pnl": 0.0} wins = sum(1 for t in trades if t[0]) total = len(trades) pnl = sum(t[1] for t in trades) wr = wins / total * 100 print(f" [{label}] {total}건 | 승률={wr:.0f}% ({wins}승 {total-wins}패) | 누적={pnl:+.2f}%") for idx, (is_win, p, bdt, sdt, reason) in enumerate(trades, 1): mark = "✅" if is_win else "❌" print(f" #{idx}: {mark} {p:+.2f}% | {reason}" f" ({bdt.strftime('%m-%d %H:%M')}→{sdt.strftime('%m-%d %H:%M')})") return {"total": total, "wins": wins, "wr": wr, "pnl": pnl} def run_vol_lead_thresh(df: pd.DataFrame, thresh: float) -> list: """run_vol_lead의 TREND_AFTER_VOL 파라미터를 동적으로 받는 버전.""" trades = [] signal_i = None signal_price = None in_pos = False buy_idx = buy_price = None i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2) while i < len(df): if in_pos: is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price) next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df)) trades.append((is_win, pnl, df.index[buy_idx], sdt, reason)) in_pos = False signal_i = None signal_price = None i = next_i continue close = df.iloc[i]["close"] close_2h = df.iloc[i - 2]["close"] quiet = abs(close - close_2h) / close_2h * 100 < PRICE_QUIET_PCT vol_recent = df.iloc[i - 1]["volume"] vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean() vol_spike = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT if quiet and vol_spike: if signal_i is None: signal_i = i signal_price = close else: if signal_i is not None and close < signal_price: signal_i = None signal_price = None if signal_i is not None and (i - signal_i) > SIGNAL_TIMEOUT_H: signal_i = None signal_price = None if signal_i is not None: move = (close - signal_price) / signal_price * 100 if move >= thresh: in_pos = True buy_idx = i buy_price = close signal_i = None signal_price = None i += 1 return trades def main() -> None: print("데이터 로딩 중...") data = load_data() # ── A 현행 전략 (기준선) ───────────────────────────── print(f"\n{'='*72}") print(f"A(현행 12h+5%+거래량) 기준선 | {FROM_DATE[:10]} ~ 현재") print(f"{'='*72}") agg_a = {"total": 0, "wins": 0, "pnl": 0.0} trend_results = {} for ticker, df in data.items(): t = run_trend(df) trend_results[ticker] = t s = {"total": len(t), "wins": sum(1 for x in t if x[0]), "pnl": sum(x[1] for x in t)} agg_a["total"] += s["total"] agg_a["wins"] += s["wins"] agg_a["pnl"] += s["pnl"] a_wr = agg_a["wins"] / agg_a["total"] * 100 if agg_a["total"] else 0 print(f"A 합계: {agg_a['total']}건 | 승률={a_wr:.0f}% | 누적={agg_a['pnl']:+.2f}%") # ── B 전략: TREND_AFTER_VOL 파라미터 스윕 ─────────── THRESHOLDS = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0] print(f"\n{'='*72}") print(f"B(거래량→+N% 선진입) 파라미터 스윕") print(f"{'─'*72}") print(f"{'임계값':>6} │ {'거래':>5} {'승률':>6} {'누적PnL':>10} │ vs A PnL") print(f"{'─'*72}") best = None for thresh in THRESHOLDS: agg = {"total": 0, "wins": 0, "pnl": 0.0} for ticker, df in data.items(): t = run_vol_lead_thresh(df, thresh) agg["total"] += len(t) agg["wins"] += sum(1 for x in t if x[0]) agg["pnl"] += sum(x[1] for x in t) wr = agg["wins"] / agg["total"] * 100 if agg["total"] else 0 diff = agg["pnl"] - agg_a["pnl"] marker = " ← best" if (best is None or agg["pnl"] > best["pnl"]) else "" if marker: best = {**agg, "thresh": thresh, "wr": wr} print(f"+{thresh:>4.1f}% │ {agg['total']:>5}건 {wr:>5.0f}% {agg['pnl']:>+9.2f}% │ {diff:>+8.2f}%{marker}") print(f"{'─'*72}") print(f"\n★ 최적 임계값: +{best['thresh']}% → " f"{best['total']}건 | 승률={best['wr']:.0f}% | 누적={best['pnl']:+.2f}%") # ── 최적 임계값으로 종목별 상세 출력 ───────────────── best_thresh = best["thresh"] print(f"\n{'='*72}") print(f"★ B(vol→+{best_thresh}%) vs A(12h+5%+vol) 종목별 비교") print(f"{'─'*72}") print(f"{'종목':<14} │ {'A 현행':^24} │ {'B +{:.1f}%'.format(best_thresh):^24}") print(f"{'':14} │ {'거래':>4} {'승률':>5} {'누적':>9} │ {'거래':>4} {'승률':>5} {'누적':>9}") print(f"{'─'*72}") for ticker, df in data.items(): t_a = trend_results[ticker] t_b = run_vol_lead_thresh(df, best_thresh) wa = sum(1 for x in t_a if x[0]) wb = sum(1 for x in t_b if x[0]) pa = sum(x[1] for x in t_a) pb = sum(x[1] for x in t_b) wr_a = wa / len(t_a) * 100 if t_a else 0 wr_b = wb / len(t_b) * 100 if t_b else 0 print(f"{ticker:<14} │ {len(t_a):>4}건 {wr_a:>4.0f}% {pa:>+8.2f}% │" f" {len(t_b):>4}건 {wr_b:>4.0f}% {pb:>+8.2f}%") if __name__ == "__main__": main()