Files
upbit-trader/archive/tests/sim_regime_sweep.py
joungmin 6e0c4508fa refactor: MVC 구조 분리 + 미사용 파일 archive 정리
- tick_trader.py를 Controller로 축소, 로직을 3개 모듈로 분리:
  - core/signal.py: 시그널 감지, 지표 계산 (calc_vr, calc_atr, detect_signal)
  - core/order.py: Upbit 주문 실행 (매수/매도/취소/조회)
  - core/position_manager.py: 포지션 관리, DB sync, 복구, 청산 조건
- type hints, Google docstring, 구체적 예외 타입 적용
- 50줄 초과 함수 분리 (process_signal, restore_positions)
- 미사용 파일 58개 archive/ 폴더로 이동
- README.md 추가

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-06 20:46:47 +09:00

323 lines
12 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""레짐 REGIME_N 스윕 — BULL 진입 기준 봉수 최적화.
REGIME_N (pct_change 봉수) 를 1~8봉(40분~320분) 으로 변화시키며
BULL 진입만 / BEAR 차단 / 필터없음 비교.
데이터: data/sim1y_cache.pkl (10분봉 1년치)
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
import json
import pickle
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl"
FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json"
TOP_N = 20
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
TIME_STOP_MIN_PCT = 3.0
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
VOL_MULT_DEFAULT = 2.0
VOL_MULT_BULL = 1.5
QUIET_PCT = 2.0
THRESH = 4.8
LOCAL_VOL_N = 7
QUIET_N = 3
SIGNAL_TO_N = 12
ATR_N = 7
TS_N = 12
BEAR_THRESHOLD = -0.5
BULL_THRESHOLD = 1.5
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30,
"KRW-SOL": 0.15, "KRW-XRP": 0.15}
def resample_40m(df):
return (df.resample("40min")
.agg({"open":"first","high":"max","low":"min",
"close":"last","volume":"sum"})
.dropna(subset=["close"]))
def build_regime_series(dfs40, regime_n):
weighted = None
for ticker, w in REGIME_WEIGHTS.items():
if ticker not in dfs40:
continue
pct = dfs40[ticker]["close"].pct_change(regime_n) * 100
weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0)
return weighted if weighted is not None else pd.Series(dtype=float)
def calc_atr(df, buy_idx):
sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
def simulate_pos(df, buy_idx, buy_price, stop_pct):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
if row["high"] > peak:
peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
last = df.iloc[-1]["close"]
pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[-1], pnl
def run_strategy(df, ticker, regime_series, mode):
trades = []
sig_i = sig_p = None
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(LOCAL_VOL_N + 2, QUIET_N + 1)
while i < len(df):
ts = df.index[i]
row = df.iloc[i]
cur = row["close"]
if in_pos:
is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker))
in_pos = False; sig_i = sig_p = None; i = next_i
continue
score = 0.0
if not regime_series.empty and ts in regime_series.index:
v = regime_series.loc[ts]
score = float(v) if not pd.isna(v) else 0.0
if mode == "bear_off":
if score < BEAR_THRESHOLD:
sig_i = sig_p = None; i += 1; continue
vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_DEFAULT
elif mode == "bull_only":
if score < BULL_THRESHOLD:
sig_i = sig_p = None; i += 1; continue
vol_mult = VOL_MULT_BULL
else:
vol_mult = VOL_MULT_DEFAULT
if sig_i is not None and (i - sig_i) > SIGNAL_TO_N:
sig_i = sig_p = None
if sig_i is not None:
move_pct = (cur - sig_p) / sig_p * 100
if cur < sig_p:
sig_i = sig_p = None
elif move_pct >= THRESH:
in_pos = True; buy_idx = i; buy_price = cur
stop_pct = calc_atr(df, i); sig_i = sig_p = None
i += 1; continue
vol_p = df.iloc[i-1]["volume"]
vol_avg = df.iloc[i-1-LOCAL_VOL_N:i-1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
close_qh = df.iloc[i-QUIET_N]["close"]
chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999
if chg_qh < QUIET_PCT and vol_r >= vol_mult:
if sig_i is None:
sig_i = i; sig_p = cur
else:
if sig_i is not None and cur < sig_p:
sig_i = sig_p = None
i += 1
return trades
def apply_wf(trades):
history = []; shadow = 0; blocked = False; accepted = []; cnt = 0
for t in trades:
is_win = int(t[0])
if not blocked:
accepted.append(t); history.append(is_win)
if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
blocked = True; shadow = 0
else:
cnt += 1
if is_win:
shadow += 1
if shadow >= WF_SHADOW_WINS:
blocked = False; history = []; shadow = 0
else:
shadow = 0
return accepted, cnt
def apply_max_pos(trades):
open_exits = []; accepted = []; skipped = []
for t in trades:
buy_dt, sell_dt = t[2], t[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt); accepted.append(t)
else:
skipped.append(t)
return accepted, skipped
def run_compound(accepted):
portfolio = float(BUDGET); total_krw = 0.0; monthly = {}
for is_win, pnl, buy_dt, sell_dt, ticker in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
ym = buy_dt.strftime("%Y-%m")
if ym not in monthly:
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
monthly[ym]["trades"] += 1
monthly[ym]["wins"] += int(is_win)
monthly[ym]["pnl_krw"] += krw_profit
wins = sum(1 for t in accepted if t[0])
peak = BUDGET; max_dd = 0.0
pf = float(BUDGET)
for is_win, pnl, buy_dt, sell_dt, ticker in accepted:
pf = max(pf + max(pf, MIN_BUDGET) / MAX_POS * pnl / 100, MIN_BUDGET)
peak = max(peak, pf); max_dd = max(max_dd, (peak-pf)/peak*100)
return {
"portfolio": portfolio, "total_krw": total_krw,
"roi_pct": (portfolio-BUDGET)/BUDGET*100,
"total": len(accepted), "wins": wins,
"wr": wins/len(accepted)*100 if accepted else 0,
"monthly": monthly, "max_dd": max_dd,
}
def sim_one(dfs40, regime_n, mode):
rs = build_regime_series(dfs40, regime_n)
all_trades = []; wf_total = 0
for ticker, df40 in dfs40.items():
raw = run_strategy(df40, ticker, rs, mode)
filtered, blocked = apply_wf(raw)
wf_total += blocked
all_trades.extend(filtered)
all_trades.sort(key=lambda x: x[2])
accepted, skipped = apply_max_pos(all_trades)
result = run_compound(accepted)
# BULL 비율
if not rs.empty:
valid = rs.dropna()
bull_pct = (valid >= BULL_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0
bear_pct = (valid < BEAR_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0
else:
bull_pct = bear_pct = 0
return result, bull_pct, bear_pct, wf_total, len(skipped)
def main():
print("캐시 로드 중...")
cache = pickle.load(open(CACHE_FILE, "rb"))
tickers = [t for t in list(cache["10m"].keys())[:TOP_N]
if len(cache["10m"][t]) > 500]
print(f" 종목: {len(tickers)}\n")
dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers}
sample = next(iter(dfs40.values()))
start_dt = sample.index[0].strftime("%Y-%m-%d")
end_dt = sample.index[-1].strftime("%Y-%m-%d")
SWEEP_N = [1, 2, 3, 4, 5, 6, 8, 10] # 40분 ~ 400분 (6.7h)
# ── BULL 진입만 스윕 ──────────────────────────────────
print(f"{'='*72}")
print(f" REGIME_N 스윕 (40분봉 × N봉 변화율 기준 | BULL≥{BULL_THRESHOLD}%)")
print(f" 기간: {start_dt} ~ {end_dt} / {len(tickers)}종목")
print(f"{'='*72}")
print(f" {'N봉':>4} {'시간':>5}{'BULL%':>6} {'BEAR%':>6}"
f"{'진입':>5} {'승률':>5}{'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}")
print(f" {''*68}")
bull_results = {}
for n in SWEEP_N:
r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bull_only")
bull_results[n] = r
mins = n * 40
h = mins // 60
m = mins % 60
time_label = f"{h}h{m:02d}m" if m else f"{h}h"
if r["total"] == 0:
print(f" {n:>4}{time_label:>5}{bull_pct:>5.1f}% {bear_pct:>5.1f}% │ "
f"{'진입없음':>34}")
else:
print(f" {n:>4}{time_label:>5}{bull_pct:>5.1f}% {bear_pct:>5.1f}% │ "
f"{r['total']:>5}{r['wr']:>4.1f}% │ "
f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%")
# ── BEAR 차단 스윕 ────────────────────────────────────
print(f"\n{'='*72}")
print(f" REGIME_N 스윕 (BEAR 차단 모드 | BEAR<{BEAR_THRESHOLD}%)")
print(f"{'='*72}")
print(f" {'N봉':>4} {'시간':>5}{'BULL%':>6} {'BEAR%':>6}"
f"{'진입':>5} {'승률':>5}{'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}")
print(f" {''*68}")
bear_results = {}
for n in SWEEP_N:
r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bear_off")
bear_results[n] = r
mins = n * 40
h = mins // 60; m = mins % 60
time_label = f"{h}h{m:02d}m" if m else f"{h}h"
print(f" {n:>4}{time_label:>5}{bull_pct:>5.1f}% {bear_pct:>5.1f}% │ "
f"{r['total']:>5}{r['wr']:>4.1f}% │ "
f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%")
# ── 베이스라인 (필터없음) ─────────────────────────────
r_none, _, _, _, _ = sim_one(dfs40, 1, "none")
print(f"\n 베이스라인 (필터없음): {r_none['total']}{r_none['wr']:.1f}% "
f"{r_none['roi_pct']:+.2f}% {r_none['total_krw']:+,.0f}원 -{r_none['max_dd']:.1f}%")
# ── 최적 BULL 구간 ────────────────────────────────────
valid_bull = {n: r for n, r in bull_results.items() if r["total"] >= 5}
if valid_bull:
best_n = max(valid_bull, key=lambda n: valid_bull[n]["roi_pct"])
best_r = valid_bull[best_n]
print(f"\n ★ BULL 진입 최적 N: {best_n}봉({best_n*40}분) "
f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}"
f"승률 {best_r['wr']:.1f}%")
valid_bear = {n: r for n, r in bear_results.items() if r["total"] >= 5}
if valid_bear:
best_n = max(valid_bear, key=lambda n: valid_bear[n]["roi_pct"])
best_r = valid_bear[best_n]
print(f" ★ BEAR 차단 최적 N: {best_n}봉({best_n*40}분) "
f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}"
f"승률 {best_r['wr']:.1f}%")
print(f"{'='*72}")
if __name__ == "__main__":
main()