Files
upbit-trader/archive/tests/velocity_backtest.py
joungmin 6e0c4508fa refactor: MVC 구조 분리 + 미사용 파일 archive 정리
- tick_trader.py를 Controller로 축소, 로직을 3개 모듈로 분리:
  - core/signal.py: 시그널 감지, 지표 계산 (calc_vr, calc_atr, detect_signal)
  - core/order.py: Upbit 주문 실행 (매수/매도/취소/조회)
  - core/position_manager.py: 포지션 관리, DB sync, 복구, 청산 조건
- type hints, Google docstring, 구체적 예외 타입 적용
- 50줄 초과 함수 분리 (process_signal, restore_positions)
- 미사용 파일 58개 archive/ 폴더로 이동
- README.md 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 20:46:47 +09:00

462 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""velocity_backtest.py — 속도 진입 효과 비교 백테스트.
전략 A: 기존 거리 기반 (signal_price 대비 +THRESH% 도달 시 진입)
전략 B: 거리 + 속도 기반 (velocity >= 레짐별 VELOCITY_THRESHOLD 시 조기 진입)
BULL → vel_thresh = 0.10 (공격적)
NEUTRAL → vel_thresh = 0.15 (보수적)
BEAR → vel_thresh = 0.20 (더 높음)
레짐 판단: KRW-BTC 1h 변동률 (캐시 데이터 활용)
10분봉 캐시(sim10m_cache.pkl)를 사용.
신호 감지: 40분봉 vol spike + 2h 횡보 (10분봉 합산/슬라이스로 계산)
진입/청산: 10분봉 단위로 체크 (실제 시스템의 15초 폴링 근사)
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
import pickle
from pathlib import Path
import pandas as pd
# ── 파라미터 ──────────────────────────────────────────────────────────────────
CACHE_FILE = Path("sim10m_cache.pkl")
TOP_FILE = Path("top30_tickers.pkl")
TOP_N = 20
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
# 전략 파라미터
VOL_MULT = 2.0 # 거래량 배수 기준
QUIET_PCT = 2.0 # 2h 횡보 기준 (%)
THRESH = 4.8 # 거리 기반 진입 임계값 (%)
# 10분봉 기준 캔들 수
QUIET_C = 12 # 2h = 12 × 10분
VOL40_C = 4 # 40분봉 1개 = 4 × 10분봉
LOCAL_C = 7 # 로컬 평균 40분봉 7개 = 28 × 10분봉
TIMEOUT_C = 48 # 신호 타임아웃 8h = 48 × 10분봉
TS_C = 48 # 타임스탑 8h = 48 × 10분봉
ATR_C = 28 # ATR 5h = 7 × 40분 = 28 × 10분봉
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
TS_MIN_PCT = 3.0
MIN_I = LOCAL_C * VOL40_C + VOL40_C + QUIET_C + 2 # = 42
# 속도 기반 진입 파라미터 (레짐별)
VEL_THRESH_BULL = 0.10 # BULL: 0.10%/분 (공격적)
VEL_THRESH_NEUTRAL = 0.15 # NEUTRAL: 0.15%/분 (보수적)
VEL_THRESH_BEAR = 0.20 # BEAR: 0.20%/분 (더 높음)
VELOCITY_MIN_MOVE = 0.5 # 최소 이동 % (잡음 제거)
VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분
# 레짐 판단 기준 (BTC 1h 변동률)
REGIME_BULL_CHANGE = 5.0 # +5% 이상 → BULL
REGIME_BEAR_CHANGE = -5.0 # -5% 이하 → BEAR
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
# ── 낙폭 제어 파라미터 ────────────────────────────────────────────────────────
HARD_STOP_PCT = 0.015 # 진입가 대비 -1.5% 즉시 청산 (하드 손절)
STREAK_TIGHT_N = 2 # 연속 N회 손절 시 타임스탑 강화
TS_C_TIGHT = 24 # 강화 타임스탑 보유 시간 (4h = 24 × 10분)
TS_MIN_PCT_TIGHT = 0.0 # 강화 타임스탑 최소 수익률 (0%)
# ── 레짐 헬퍼 ─────────────────────────────────────────────────────────────────
def build_regime_series(btc_df: pd.DataFrame) -> pd.Series:
"""BTC 10분봉으로 1h 변동률 계산 → 레짐 시리즈 반환."""
close = btc_df["close"]
change = close.pct_change(6) * 100 # 6 × 10분 = 1h
regime = pd.Series("neutral", index=close.index, dtype=object)
regime[change > REGIME_BULL_CHANGE] = "bull"
regime[change < REGIME_BEAR_CHANGE] = "bear"
return regime
def _vel_thresh_for(regime_s: pd.Series, ts) -> float:
"""타임스탬프 기준 레짐별 velocity 임계값 반환."""
if regime_s is None:
return VEL_THRESH_NEUTRAL
idx = regime_s.index.searchsorted(ts)
if idx >= len(regime_s):
return VEL_THRESH_NEUTRAL
r = regime_s.iloc[idx]
if r == "bull":
return VEL_THRESH_BULL
elif r == "bear":
return VEL_THRESH_BEAR
return VEL_THRESH_NEUTRAL
# ── 헬퍼 ──────────────────────────────────────────────────────────────────────
def calc_atr(df: pd.DataFrame, i: int) -> float:
sub = df.iloc[max(0, i - ATR_C):i]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
def simulate_pos(df: pd.DataFrame, buy_idx: int,
buy_price: float, stop_pct: float,
hard_stop: bool = False, tight_ts: bool = False):
"""매수 이후 청산 시점·손익 계산.
hard_stop : True → 진입가 대비 -HARD_STOP_PCT% 즉시 청산
tight_ts : True → 강화된 타임스탑 (4h / 0%) 적용
"""
peak = buy_price
hard_stop_px = buy_price * (1 - HARD_STOP_PCT) if hard_stop else None
ts_c_use = TS_C_TIGHT if tight_ts else TS_C
ts_min_use = TS_MIN_PCT_TIGHT if tight_ts else TS_MIN_PCT
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
if row["high"] > peak:
peak = row["high"]
# 1. 하드 손절 (진입가 대비 고정 %)
if hard_stop_px is not None and row["low"] <= hard_stop_px:
pnl = (hard_stop_px * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[i], pnl
# 2. 트레일링 스탑 (최고가 대비)
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[i], pnl
# 3. 타임스탑
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= ts_c_use and pnl_now < ts_min_use:
pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[i], pnl
last = df.iloc[-1]["close"]
pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[-1], pnl
def _prev_40m_vol(df: pd.DataFrame, i: int) -> float:
"""직전 완성 40분봉 거래량 (10분봉 4개 합산)."""
return df.iloc[max(0, i - VOL40_C):i]["volume"].sum()
def _local_vol_avg(df: pd.DataFrame, i: int) -> float:
"""로컬 5h 평균 (직전 7개 40분봉 각각의 합산 평균)."""
vols = []
for k in range(1, LOCAL_C + 1):
end = i - VOL40_C * (k - 1)
start = end - VOL40_C
if start < 0:
break
vols.append(df.iloc[start:end]["volume"].sum())
return sum(vols) / len(vols) if vols else 0
# ── 핵심 전략 루프 ─────────────────────────────────────────────────────────────
def run_strategy(df: pd.DataFrame, ticker: str,
use_velocity: bool = False,
regime_s: pd.Series = None,
dd_control: bool = False) -> list:
"""
Returns list of (is_win, pnl, buy_dt, sell_dt, ticker, entry_type)
entry_type: 'dist' | 'vel'
dd_control: True → 연속 손절 추적하여 hard_stop + tight_ts 적용
"""
trades = []
sig_i = sig_p = None
in_pos = False
buy_idx = buy_price = stop_pct = None
entry_type = "dist"
consec_losses = 0 # 연속 손절 횟수
i = MIN_I
while i < len(df):
# ── 포지션 중 → 청산 계산 후 다음 진입 탐색 ──────────────────
if in_pos:
use_hard = dd_control and consec_losses >= STREAK_TIGHT_N
use_tight = dd_control and consec_losses >= STREAK_TIGHT_N
is_win, sdt, pnl = simulate_pos(
df, buy_idx, buy_price, stop_pct,
hard_stop=use_hard, tight_ts=use_tight,
)
next_i = next(
(j for j in range(i, len(df)) if df.index[j] > sdt),
len(df),
)
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, entry_type))
# 연속 손절 카운터 업데이트
if is_win:
consec_losses = 0
else:
consec_losses += 1
in_pos = False
sig_i = sig_p = None
i = next_i
continue
close = df.iloc[i]["close"]
# ── 신호 감지 ─────────────────────────────────────────────────
prev_vol = _prev_40m_vol(df, i)
local_avg = _local_vol_avg(df, i)
vol_r = prev_vol / local_avg if local_avg > 0 else 0
close_2h = df.iloc[i - QUIET_C]["close"]
quiet = abs(close - close_2h) / close_2h * 100 < QUIET_PCT
spike = vol_r >= VOL_MULT
if quiet and spike:
if sig_i is None:
sig_i, sig_p = i, close
else:
if sig_i is not None and close < sig_p:
sig_i = sig_p = None
# 타임아웃
if sig_i is not None and (i - sig_i) > TIMEOUT_C:
sig_i = sig_p = None
# ── 진입 판단 ─────────────────────────────────────────────────
if sig_i is not None:
move_pct = (close - sig_p) / sig_p * 100
age_min = (i - sig_i) * 10 # 10분봉 × 10분
# A. 거리 기반
if move_pct >= THRESH:
in_pos = True
buy_idx = i
buy_price = close
stop_pct = calc_atr(df, i)
entry_type = "dist"
sig_i = sig_p = None
i += 1
continue
# B. 속도 기반 (use_velocity=True 일 때만)
if (use_velocity
and age_min >= VELOCITY_MIN_AGE_M
and move_pct >= VELOCITY_MIN_MOVE):
velocity = move_pct / age_min
vel_thresh = _vel_thresh_for(regime_s, df.index[i])
if velocity >= vel_thresh:
in_pos = True
buy_idx = i
buy_price = close
stop_pct = calc_atr(df, i)
entry_type = "vel"
sig_i = sig_p = None
i += 1
continue
i += 1
return trades
# ── WF 필터 ───────────────────────────────────────────────────────────────────
def apply_wf(trades: list) -> tuple:
history, shadow_streak, blocked = [], 0, False
accepted, blocked_cnt = [], 0
for trade in trades:
is_win = int(trade[0])
if not blocked:
accepted.append(trade)
history.append(is_win)
if len(history) >= WF_WINDOW:
if sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
blocked = True
shadow_streak = 0
else:
blocked_cnt += 1
shadow_streak = (shadow_streak + 1) if is_win else 0
if shadow_streak >= WF_SHADOW_WINS:
blocked = False
history = []
shadow_streak = 0
return accepted, blocked_cnt
# ── MAX_POSITIONS 필터 ────────────────────────────────────────────────────────
def apply_max_pos(all_trades: list) -> tuple:
open_exits, accepted, skipped = [], [], []
for trade in sorted(all_trades, key=lambda x: x[2]):
buy_dt, sell_dt = trade[2], trade[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt)
accepted.append(trade)
else:
skipped.append(trade)
return accepted, skipped
# ── 복리 시뮬 ─────────────────────────────────────────────────────────────────
def simulate(accepted: list) -> dict:
portfolio = float(BUDGET)
total_krw = 0.0
monthly = {}
peak_pf = BUDGET
max_dd = 0.0
win_cnt = 0
vel_count = sum(1 for t in accepted if t[5] == "vel")
vel_wins = sum(1 for t in accepted if t[5] == "vel" and t[0])
vel_pnls = [t[1] for t in accepted if t[5] == "vel"]
dist_pnls = [t[1] for t in accepted if t[5] == "dist"]
for is_win, pnl, buy_dt, sell_dt, ticker, etype in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
peak_pf = max(peak_pf, portfolio)
dd = (peak_pf - portfolio) / peak_pf * 100
max_dd = max(max_dd, dd)
win_cnt += int(is_win)
ym = buy_dt.strftime("%Y-%m")
m = monthly.setdefault(ym, {"t": 0, "w": 0, "krw": 0.0})
m["t"] += 1; m["w"] += int(is_win); m["krw"] += krw_profit
n = len(accepted)
return {
"portfolio": portfolio,
"total_krw": total_krw,
"roi": (portfolio - BUDGET) / BUDGET * 100,
"n": n,
"wins": win_cnt,
"wr": win_cnt / n * 100 if n else 0,
"max_dd": max_dd,
"monthly": monthly,
"vel_count": vel_count,
"vel_wins": vel_wins,
"vel_wr": vel_wins / vel_count * 100 if vel_count else 0,
"vel_avg_pnl": sum(vel_pnls) / len(vel_pnls) if vel_pnls else 0,
"dist_avg_pnl": sum(dist_pnls) / len(dist_pnls) if dist_pnls else 0,
}
# ── 메인 ──────────────────────────────────────────────────────────────────────
def main():
print("캐시 로드...")
cache = pickle.load(open(CACHE_FILE, "rb"))
top30 = pickle.load(open(TOP_FILE, "rb"))
tickers = [t for t in top30[:TOP_N] if t in cache["10m"]]
print(f"유효 종목: {len(tickers)}\n")
# BTC 레짐 시리즈 빌드
btc_df = cache["10m"].get("KRW-BTC")
regime_s = build_regime_series(btc_df) if btc_df is not None else None
if regime_s is not None:
bull_pct = (regime_s == "bull").mean() * 100
bear_pct = (regime_s == "bear").mean() * 100
print(f"레짐 분포: BULL {bull_pct:.1f}% / NEUTRAL {100-bull_pct-bear_pct:.1f}% / BEAR {bear_pct:.1f}%")
print(f"vel threshold: BULL={VEL_THRESH_BULL} / NEUTRAL={VEL_THRESH_NEUTRAL} / BEAR={VEL_THRESH_BEAR}\n")
all_a, all_b, all_c = [], [], []
wf_a_total = wf_b_total = wf_c_total = 0
for t in tickers:
df = cache["10m"][t]
if len(df) < MIN_I + 50:
continue
raw_a = run_strategy(df, t, use_velocity=False)
raw_b = run_strategy(df, t, use_velocity=True, regime_s=regime_s)
raw_c = run_strategy(df, t, use_velocity=True, regime_s=regime_s, dd_control=True)
fa, ba = apply_wf(raw_a)
fb, bb = apply_wf(raw_b)
fc, bc = apply_wf(raw_c)
wf_a_total += ba; wf_b_total += bb; wf_c_total += bc
all_a.extend(fa); all_b.extend(fb); all_c.extend(fc)
acc_a, skp_a = apply_max_pos(all_a)
acc_b, skp_b = apply_max_pos(all_b)
acc_c, skp_c = apply_max_pos(all_c)
ra = simulate(acc_a)
rb = simulate(acc_b)
rc = simulate(acc_c)
# ── 날짜 범위 ─────────────────────────────────────────
def date_range(acc):
if acc:
s = min(t[2] for t in acc).strftime("%Y-%m-%d")
e = max(t[3] for t in acc).strftime("%Y-%m-%d")
return f"{s} ~ {e}"
return "N/A"
# ── 출력 ─────────────────────────────────────────────
W = 72
print("=" * W)
print(f" 낙폭 제어 비교 백테스트 | 10분봉 | {len(tickers)}종목")
print(f" 기간: {date_range(acc_a)}")
print(f" hard_stop={HARD_STOP_PCT*100:.1f}% | tight_ts={TS_C_TIGHT*10//60}h+{TS_MIN_PCT_TIGHT:.0f}% "
f"(연속 {STREAK_TIGHT_N}손절 후)")
print("=" * W)
print(f" {'항목':<22} {'A. 기존':>12} {'B. +속도':>12} {'C. +속도+DD제어':>14}")
print(f" {''*64}")
def row3(label, va, vb, vc, fmt="{}"):
sa, sb, sc = fmt.format(va), fmt.format(vb), fmt.format(vc)
try:
dbc = float(str(vc).replace(",","").replace("%","").replace("","")) \
- float(str(va).replace(",","").replace("%","").replace("",""))
dc = f" ({dbc:+.1f})" if abs(dbc) >= 0.01 else ""
except Exception:
dc = ""
print(f" {label:<22} {sa:>12} {sb:>12} {sc:>14}{dc}")
row3("총 진입", ra["n"], rb["n"], rc["n"], "{:,}")
row3(" 속도 진입", 0, rb["vel_count"], rc["vel_count"], "{:,}")
row3("WF 차단", wf_a_total, wf_b_total, wf_c_total, "{:,}")
row3("MAX_POS 스킵", len(skp_a), len(skp_b), len(skp_c), "{:,}")
print(f" {''*64}")
row3("승률", f"{ra['wr']:.1f}%", f"{rb['wr']:.1f}%", f"{rc['wr']:.1f}%")
row3(" 속도진입 승률","-", f"{rb['vel_wr']:.1f}%", f"{rc['vel_wr']:.1f}%")
print(f" {''*64}")
row3("평균 pnl (거리)",f"{ra['dist_avg_pnl']:.2f}%", f"{rb['dist_avg_pnl']:.2f}%", f"{rc['dist_avg_pnl']:.2f}%")
row3("평균 pnl (속도)","-", f"{rb['vel_avg_pnl']:.2f}%", f"{rc['vel_avg_pnl']:.2f}%")
print(f" {''*64}")
row3("최종 자산", f"{ra['portfolio']:,.0f}", f"{rb['portfolio']:,.0f}", f"{rc['portfolio']:,.0f}")
row3("총 수익", f"{ra['total_krw']:+,.0f}", f"{rb['total_krw']:+,.0f}", f"{rc['total_krw']:+,.0f}")
row3("수익률", f"{ra['roi']:.2f}%", f"{rb['roi']:.2f}%", f"{rc['roi']:.2f}%")
row3("최대 낙폭", f"{-ra['max_dd']:.2f}%", f"{-rb['max_dd']:.2f}%", f"{-rc['max_dd']:.2f}%")
print("=" * W)
# ── 월별 ─────────────────────────────────────────────
print(f"\n── 월별 수익 비교 {''*50}")
print(f" {'':^8}{'A':>5} {'A%':>4} {'A수익':>10}"
f"{'B':>5} {'B%':>4} {'B수익':>10}"
f"{'C':>5} {'C%':>4} {'C수익':>10}")
all_months = sorted(set(list(ra["monthly"]) + list(rb["monthly"]) + list(rc["monthly"])))
for ym in all_months:
ma = ra["monthly"].get(ym, {"t":0,"w":0,"krw":0})
mb = rb["monthly"].get(ym, {"t":0,"w":0,"krw":0})
mc = rc["monthly"].get(ym, {"t":0,"w":0,"krw":0})
wra = ma["w"]/ma["t"]*100 if ma["t"] else 0
wrb = mb["w"]/mb["t"]*100 if mb["t"] else 0
wrc = mc["w"]/mc["t"]*100 if mc["t"] else 0
print(f" {ym:^8}{ma['t']:>4}{wra:>3.0f}% {ma['krw']:>+9,.0f}원 │ "
f"{mb['t']:>4}{wrb:>3.0f}% {mb['krw']:>+9,.0f}원 │ "
f"{mc['t']:>4}{wrc:>3.0f}% {mc['krw']:>+9,.0f}")
print("=" * W)
if __name__ == "__main__":
main()