feat: rewrite strategy to 10m vol-lead with undying signal + watch alert

- core/strategy.py: full rewrite to Volume Lead strategy
  - 10m candle direct detection (no 40m resampling)
  - F&G 3-tier vol threshold: <=40->6x, 41-50->5x, >50->blocked
  - Undying signal: price drop does not cancel signal (sig_p fixed)
  - Vol refresh: stronger vol_r updates signal price and timer
  - Watch alert: 4x-6x approaching threshold notifies via Telegram
  - WATCH_VOL_THRESH=4.0, WATCH_COOLDOWN_MIN=30, WATCH_VOL_JUMP=0.5
- daemon/runner.py: remove FNG_MIN_ENTRY block and Bear regime block
  - Only FNG_MAX_ENTRY(>50) blocks scan (greed/extreme greed)
  - Fast-poll loop cleaned of regime check
- core/notify.py: add notify_watch() for near-signal Telegram alerts
  - Shows vol_r, distance to threshold, price, quiet pct
- tests/: add 1y data collection and simulation scripts
  - collect_1y_data.py, refresh_cache.py
  - sim_10m_vol.py, sim_current.py, sim_regime_1y.py
  - sim_regime_sweep.py, sim_vol_override.py

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-04 09:28:13 +09:00
parent 6580cda017
commit cfbacdacbc
11 changed files with 2358 additions and 209 deletions

153
tests/collect_1y_data.py Normal file
View File

@@ -0,0 +1,153 @@
"""1년치 데이터 수집 — 10분봉 OHLCV + F&G 히스토리.
생성 파일:
data/sim1y_cache.pkl — {"10m": {ticker: DataFrame}} (10분봉, 365일)
data/fng_1y.json — {"YYYY-MM-DD": int, ...} (Fear & Greed 1년치)
소요 시간: 약 10~15분 (20종목 × 263 API 호출)
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
import json
import pickle
import time
import urllib.request
from datetime import datetime, timedelta
from pathlib import Path
import pandas as pd
import pyupbit
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
# ── 설정 ─────────────────────────────────────────────────
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl"
FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json"
TOP30_FILE = Path(__file__).parent.parent / "data" / "top30_tickers.pkl"
DAYS = 365
TOP_N = 20
# ── 10분봉 수집 ───────────────────────────────────────────
def fetch_10m(ticker: str, days: int) -> "pd.DataFrame | None":
target_start = datetime.now() - timedelta(days=days)
all_dfs, to, prev_oldest = [], None, None
while True:
kwargs = dict(ticker=ticker, interval="minute10", count=200)
if to:
kwargs["to"] = to.strftime("%Y-%m-%d %H:%M:%S")
try:
df = pyupbit.get_ohlcv(**kwargs)
except Exception:
time.sleep(0.5)
break
if df is None or df.empty:
break
all_dfs.append(df)
oldest = df.index[0]
if prev_oldest is not None and oldest >= prev_oldest:
break
prev_oldest = oldest
if oldest <= target_start:
break
to = oldest
time.sleep(0.12)
if not all_dfs:
return None
combined = pd.concat(all_dfs).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
return combined[combined.index >= target_start]
# ── F&G 1년치 수집 ────────────────────────────────────────
def fetch_fng(limit: int = 400) -> dict:
url = f"https://api.alternative.me/fng/?limit={limit}&format=json"
with urllib.request.urlopen(url, timeout=15) as r:
data = json.loads(r.read())
result = {}
for e in data["data"]:
dt = datetime.fromtimestamp(int(e["timestamp"]))
result[dt.strftime("%Y-%m-%d")] = int(e["value"])
return result
# ── 메인 ─────────────────────────────────────────────────
def main():
# ── 종목 목록 ─────────────────────────────────────────
try:
from core.market import get_top_tickers
tickers = get_top_tickers()[:TOP_N]
print(f"Top{TOP_N} 종목 API 조회: {tickers}\n")
# top30 파일 갱신
pickle.dump(tickers, open(TOP30_FILE, "wb"))
except Exception as e:
print(f" [경고] 종목 API 실패: {e}")
if TOP30_FILE.exists():
tickers = pickle.load(open(TOP30_FILE, "rb"))[:TOP_N]
print(f" 기존 top30 파일 사용: {tickers}\n")
else:
print(" [오류] 종목 목록 없음. 종료.")
return
# ── F&G 1년치 ─────────────────────────────────────────
print("F&G 1년치 수집...")
try:
fng_map = fetch_fng(limit=400)
sorted_dates = sorted(fng_map.keys())
print(f" 기간: {sorted_dates[0]} ~ {sorted_dates[-1]} ({len(fng_map)}일)")
# 분포
zones = {"극공포(≤25)": 0, "공포(26~40)": 0, "중립(41~55)": 0,
"탐욕(56~75)": 0, "극탐욕(76+)": 0}
for v in fng_map.values():
if v <= 25: zones["극공포(≤25)"] += 1
elif v <= 40: zones["공포(26~40)"] += 1
elif v <= 55: zones["중립(41~55)"] += 1
elif v <= 75: zones["탐욕(56~75)"] += 1
else: zones["극탐욕(76+)"] += 1
total = sum(zones.values())
for name, cnt in zones.items():
print(f" {name:12} {cnt:>3}일 ({cnt/total*100:.1f}%)")
json.dump(fng_map, open(FNG_FILE, "w"))
print(f" 저장: {FNG_FILE}\n")
except Exception as e:
print(f" [오류] F&G 수집 실패: {e}\n")
fng_map = {}
# ── 10분봉 1년치 ──────────────────────────────────────
print(f"10분봉 {DAYS}일치 수집 중 ({len(tickers)}종목)...")
print(f" 예상 소요: {len(tickers) * 265 * 0.12 / 60:.0f}~{len(tickers) * 265 * 0.15 / 60:.0f}\n")
data = {"10m": {}}
for i, ticker in enumerate(tickers, 1):
start_t = time.time()
df = fetch_10m(ticker, DAYS)
elapsed = time.time() - start_t
if df is not None and len(df) > 500:
data["10m"][ticker] = df
candles = len(df)
period = f"{df.index[0].strftime('%Y-%m-%d')}~{df.index[-1].strftime('%Y-%m-%d')}"
print(f" {i:>2}/{len(tickers)} {ticker:<15} {candles:>6}{period} ({elapsed:.0f}s)")
else:
print(f" {i:>2}/{len(tickers)} {ticker:<15} 데이터 부족 ({elapsed:.0f}s)")
time.sleep(0.15)
# ── 저장 ──────────────────────────────────────────────
print(f"\n수집 완료: {len(data['10m'])}종목")
if data["10m"]:
sample = next(iter(data["10m"].values()))
print(f"기간: {sample.index[0].strftime('%Y-%m-%d')} ~ {sample.index[-1].strftime('%Y-%m-%d')}")
print(f"봉 수: {len(sample)}개 (10분봉)")
# 파일 크기 추정
import sys
size_mb = sys.getsizeof(pickle.dumps(data)) / 1024 / 1024
print(f"예상 크기: {size_mb:.1f} MB")
pickle.dump(data, open(CACHE_FILE, "wb"))
print(f"\n캐시 저장: {CACHE_FILE}")
print("완료!")
if __name__ == "__main__":
main()

81
tests/refresh_cache.py Normal file
View File

@@ -0,0 +1,81 @@
"""10분봉 캐시 갱신 스크립트 — 최신 45일 데이터를 Upbit API로 재수집."""
import os, sys, pickle, time
from pathlib import Path
from datetime import datetime, timedelta
import pyupbit
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
sys.path.insert(0, str(Path(__file__).parent.parent))
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl"
TOP30_FILE = Path(__file__).parent.parent / "data" / "top30_tickers.pkl"
SIM_DAYS = 45
TOP_N = 20
def fetch_10m(ticker: str, days: int) -> "pd.DataFrame | None":
import pandas as pd
target_start = datetime.now() - timedelta(days=days)
all_dfs, to, prev_oldest = [], None, None
while True:
kwargs = dict(ticker=ticker, interval="minute10", count=200)
if to:
kwargs["to"] = to.strftime("%Y-%m-%d %H:%M:%S")
try:
df = pyupbit.get_ohlcv(**kwargs)
except Exception:
time.sleep(0.5)
break
if df is None or df.empty:
break
all_dfs.append(df)
oldest = df.index[0]
if prev_oldest is not None and oldest >= prev_oldest:
break
prev_oldest = oldest
if oldest <= target_start:
break
to = oldest
time.sleep(0.12)
if not all_dfs:
return None
combined = pd.concat(all_dfs).sort_index()
combined = combined[~combined.index.duplicated(keep="last")]
return combined[combined.index >= target_start]
def main():
# 현재 Top20 종목 가져오기
from core.market import get_top_tickers
print("Top20 종목 조회...")
tickers = get_top_tickers()[:TOP_N]
print(f" {tickers}\n")
data = {"10m": {}}
for i, ticker in enumerate(tickers, 1):
print(f"\r {i:>2}/{len(tickers)} {ticker} ", end="", flush=True)
df = fetch_10m(ticker, SIM_DAYS)
if df is not None and len(df) > 100:
data["10m"][ticker] = df
time.sleep(0.15)
print(f"\n\n종목: {len(data['10m'])}")
if data["10m"]:
sample = next(iter(data["10m"].values()))
print(f"기간: {sample.index[0].strftime('%Y-%m-%d')} ~ {sample.index[-1].strftime('%Y-%m-%d')}")
print(f"레코드: {len(sample)}")
# 저장
pickle.dump(data, open(CACHE_FILE, "wb"))
print(f"\n캐시 저장: {CACHE_FILE}")
# top30 갱신
pickle.dump(tickers, open(TOP30_FILE, "wb"))
print(f"종목 저장: {TOP30_FILE}")
if __name__ == "__main__":
main()

382
tests/sim_10m_vol.py Normal file
View File

@@ -0,0 +1,382 @@
"""10분봉 vol 감지 vs 40분봉 vol 감지 비교 시뮬레이션.
40분봉 집계 시 10분봉 spike가 희석되는 문제를 해결하기 위해
신호 감지를 10분봉 기준으로 실행하고 40분봉 전략과 노이즈/수익 비교.
비교 모드 (각 필터 조합 × 2개 봉 단위):
10분봉 detection:
A. 10m 필터없음
B. 10m F&G≥41 + BEAR차단N5
C. 10m vol≥5x 오버라이드 (F&G+레짐 무시)
40분봉 detection (기준선):
D. 40m 필터없음
E. 40m F&G≥41 + BEAR차단N5
F. 40m vol≥5x 오버라이드
데이터: data/sim1y_cache.pkl (10분봉 1년치)
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
import json
import pickle
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl"
FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json"
TOP_N = 20
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
THRESH = 4.8
QUIET_PCT = 2.0
BEAR_THRESHOLD = -0.5
BULL_THRESHOLD = 1.5
FNG_MIN_ENTRY = 41
# ── 40분봉 파라미터 ────────────────────────────────
P40 = dict(
local_vol_n = 7,
quiet_n = 3,
signal_to_n = 12,
atr_n = 7,
ts_n = 12,
time_stop_pct = 3.0,
vol_mult = 2.0,
)
# ── 10분봉 파라미터 (벽시계 기준 동등) ───────────────
# LOCAL_VOL_N: 40m×7=280min → 10min×28
# QUIET_N: 40m×3=120min → 10min×12
# SIGNAL_TO_N: 40m×12=480min → 10min×48
# ATR_N: 40m×7=280min → 10min×28
# TS_N: 40m×12=480min → 10min×48
P10 = dict(
local_vol_n = 28,
quiet_n = 12,
signal_to_n = 48,
atr_n = 28,
ts_n = 48,
time_stop_pct = 3.0,
vol_mult = 2.0,
)
REGIME_N = 5 # 40분봉 기준
REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30,
"KRW-SOL": 0.15, "KRW-XRP": 0.15}
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
VOL_OVERRIDE_THRESH = 5.0
# ─────────────────────────────────────────────────────────────────────────────
def resample_40m(df):
return (df.resample("40min")
.agg({"open":"first","high":"max","low":"min",
"close":"last","volume":"sum"})
.dropna(subset=["close"]))
def build_regime_series(dfs40):
weighted = None
for ticker, w in REGIME_WEIGHTS.items():
if ticker not in dfs40: continue
pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100
weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0)
return weighted if weighted is not None else pd.Series(dtype=float)
def regime_to_10m(regime_40m: pd.Series, df_10m: pd.DataFrame) -> pd.Series:
"""40분봉 레짐 시리즈를 10분봉 인덱스에 ffill 매핑."""
combined = regime_40m.reindex(
regime_40m.index.union(df_10m.index)
).ffill()
return combined.reindex(df_10m.index)
def calc_atr(df, buy_idx, atr_n):
sub = df.iloc[max(0, buy_idx - atr_n - 1):buy_idx]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-atr_n:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
def simulate_pos(df, buy_idx, buy_price, stop_pct, ts_n, time_stop_pct):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
if row["high"] > peak:
peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= ts_n and pnl_now < time_stop_pct:
pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
last = df.iloc[-1]["close"]
pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[-1], pnl
def run_strategy(df, ticker, regime_series, fng_map, p,
use_fng, use_regime, vol_override_thresh):
"""
공통 전략 함수. df = 봉 단위 OHLCV (10분봉 또는 40분봉).
regime_series: df 인덱스와 정렬된 레짐 시리즈.
우선순위:
① 포지션 청산
② 축적 신호 감지 (필터 무관, 항상 실행)
③ 진입: vol_strong → 모든 필터 skip; 아니면 F&G+레짐 체크
"""
local_vol_n = p["local_vol_n"]
quiet_n = p["quiet_n"]
signal_to_n = p["signal_to_n"]
atr_n = p["atr_n"]
ts_n = p["ts_n"]
time_stop_pct = p["time_stop_pct"]
vol_mult = p["vol_mult"]
trades = []
sig_i = sig_p = sig_vr = None
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(local_vol_n + 2, quiet_n + 1)
while i < len(df):
ts = df.index[i]
row = df.iloc[i]
cur = row["close"]
# ── ① 포지션 청산 ─────────────────────────────────
if in_pos:
is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct,
ts_n, time_stop_pct)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker))
in_pos = False; sig_i = sig_p = sig_vr = None; i = next_i
continue
# 신호 만료 체크
if sig_i is not None and (i - sig_i) > signal_to_n:
sig_i = sig_p = sig_vr = None
# ── ② 축적 신호 감지 (항상 실행) ──────────────────────
if sig_i is None:
vol_p = df.iloc[i-1]["volume"]
vol_avg = df.iloc[i-1-local_vol_n:i-1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
close_qh = df.iloc[i-quiet_n]["close"]
chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999
if chg_qh < QUIET_PCT and vol_r >= vol_mult:
sig_i = i; sig_p = cur; sig_vr = vol_r
i += 1
continue
# 신호 이후 가격 하락 → 초기화
if cur < sig_p:
sig_i = sig_p = sig_vr = None
i += 1
continue
# ── ③ 진입 체크 ─────────────────────────────────────
vol_strong = (vol_override_thresh > 0
and sig_vr is not None
and sig_vr >= vol_override_thresh)
if not vol_strong:
# F&G 필터
if use_fng and fng_map:
fv = fng_map.get(ts.strftime("%Y-%m-%d"), 50)
if fv < FNG_MIN_ENTRY:
i += 1
continue
# 레짐 BEAR 차단
if use_regime and not regime_series.empty and ts in regime_series.index:
v = regime_series.loc[ts]
if not pd.isna(v) and float(v) < BEAR_THRESHOLD:
i += 1
continue
move_pct = (cur - sig_p) / sig_p * 100
if move_pct >= THRESH:
in_pos = True; buy_idx = i; buy_price = cur
stop_pct = calc_atr(df, i, atr_n)
sig_i = sig_p = sig_vr = None
i += 1
return trades
def apply_wf(trades):
history = []; shadow = 0; blocked = False; accepted = []; cnt = 0
for t in trades:
is_win = int(t[0])
if not blocked:
accepted.append(t); history.append(is_win)
if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
blocked = True; shadow = 0
else:
cnt += 1
if is_win:
shadow += 1
if shadow >= WF_SHADOW_WINS:
blocked = False; history = []; shadow = 0
else:
shadow = 0
return accepted, cnt
def apply_max_pos(trades):
open_exits = []; accepted = []; skipped = []
for t in trades:
buy_dt, sell_dt = t[2], t[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt); accepted.append(t)
else:
skipped.append(t)
return accepted, skipped
def run_compound(accepted):
portfolio = float(BUDGET); total_krw = 0.0
wins = 0; peak = BUDGET; max_dd = 0.0; pf = float(BUDGET)
for is_win, pnl, buy_dt, sell_dt, ticker in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
wins += int(is_win)
pf = max(pf + max(pf, MIN_BUDGET) / MAX_POS * pnl / 100, MIN_BUDGET)
peak = max(peak, pf)
max_dd = max(max_dd, (peak - pf) / peak * 100)
return {
"portfolio": portfolio, "total_krw": total_krw,
"roi_pct": (portfolio - BUDGET) / BUDGET * 100,
"total": len(accepted), "wins": wins,
"wr": wins / len(accepted) * 100 if accepted else 0,
"max_dd": max_dd,
}
def sim_mode(dfs_per_ticker, regime_map, fng_map, p,
use_fng, use_regime, vol_override_thresh):
"""
dfs_per_ticker: {ticker: DataFrame (10m 또는 40m)}
regime_map: {ticker: regime Series (같은 인덱스로 정렬됨)}
"""
all_trades = []; wf_total = 0
for ticker, df in dfs_per_ticker.items():
rs = regime_map.get(ticker, pd.Series(dtype=float))
raw = run_strategy(df, ticker, rs, fng_map, p,
use_fng, use_regime, vol_override_thresh)
filtered, blocked = apply_wf(raw)
wf_total += blocked
all_trades.extend(filtered)
all_trades.sort(key=lambda x: x[2])
accepted, skipped = apply_max_pos(all_trades)
return run_compound(accepted), wf_total, len(skipped)
def fmt(r, wf, skip):
if r["total"] == 0:
return "진입없음"
return (f"{r['total']:>5}{r['wr']:>4.1f}% "
f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}"
f"-{r['max_dd']:>4.1f}% wf:{wf} skip:{skip}")
def main():
print("캐시 로드 중...")
cache = pickle.load(open(CACHE_FILE, "rb"))
fng_map = json.loads(FNG_FILE.read_text())
tickers = [t for t in list(cache["10m"].keys())[:TOP_N]
if len(cache["10m"][t]) > 500]
print(f" 종목: {len(tickers)}")
dfs10 = {t: cache["10m"][t] for t in tickers}
dfs40 = {t: resample_40m(df) for t, df in dfs10.items()}
# 레짐 (40분봉 기반)
regime_40m = build_regime_series(dfs40)
# 40분봉용 regime map
regime_map_40 = {t: regime_40m for t in tickers}
# 10분봉용 regime map (ffill 매핑)
regime_map_10 = {
t: regime_to_10m(regime_40m, dfs10[t])
for t in tickers
}
sample = next(iter(dfs10.values()))
start_dt = sample.index[0].strftime("%Y-%m-%d")
end_dt = sample.index[-1].strftime("%Y-%m-%d")
print(f"\n{'='*80}")
print(f" 10분봉 vs 40분봉 vol 감지 비교 | {start_dt} ~ {end_dt} | {len(tickers)}종목")
print(f" vol override: ≥{VOL_OVERRIDE_THRESH}x | F&G≥{FNG_MIN_ENTRY} | BEAR차단N{REGIME_N}")
print(f"{'='*80}")
print(f" {'모드':<32}{'진입':>5} {'승률':>5}{'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>6}")
print(f" {''*76}")
# ── 10분봉 모드들 ─────────────────────────────────────────────────────
print(f"\n [10분봉 vol 감지 — local_vol_n={P10['local_vol_n']}봉({P10['local_vol_n']*10}분) quiet_n={P10['quiet_n']}봉({P10['quiet_n']*10}분)]")
modes_10 = [
("A. 10m 필터없음", False, False, 0.0),
("B. 10m F&G+BEAR차단", True, True, 0.0),
(f"C. 10m vol≥{VOL_OVERRIDE_THRESH}x 오버라이드", True, True, VOL_OVERRIDE_THRESH),
]
for label, uf, ur, vt in modes_10:
r, wf, skip = sim_mode(dfs10, regime_map_10, fng_map, P10, uf, ur, vt)
print(f" {label:<32}{fmt(r, wf, skip)}")
# ── 40분봉 모드들 (기준선) ──────────────────────────────────────────
print(f"\n [40분봉 vol 감지 — local_vol_n={P40['local_vol_n']}봉({P40['local_vol_n']*40}분) quiet_n={P40['quiet_n']}봉({P40['quiet_n']*40}분)]")
modes_40 = [
("D. 40m 필터없음", False, False, 0.0),
("E. 40m F&G+BEAR차단", True, True, 0.0),
(f"F. 40m vol≥{VOL_OVERRIDE_THRESH}x 오버라이드", True, True, VOL_OVERRIDE_THRESH),
]
for label, uf, ur, vt in modes_40:
r, wf, skip = sim_mode(dfs40, regime_map_40, fng_map, P40, uf, ur, vt)
print(f" {label:<32}{fmt(r, wf, skip)}")
print(f"\n{'='*80}")
# ── vol≥5x 신호 품질 비교 (10m vs 40m) ─────────────────────────────
print("\n [vol≥5x 오버라이드 신호 품질 비교 — 필터 없음 조건에서 override 효과]")
print(f" {'모드':<32}{'진입':>5} {'승률':>5}{'수익률':>8} {'낙폭':>6}")
for label, dfs, rmap, p in [
("10m vol≥5x (filter none)", dfs10, regime_map_10, P10),
("40m vol≥5x (filter none)", dfs40, regime_map_40, P40),
]:
r, wf, skip = sim_mode(dfs, rmap, fng_map, p, False, False, VOL_OVERRIDE_THRESH)
if r["total"]:
print(f" {label:<32}{r['total']:>5}{r['wr']:>4.1f}% "
f"{r['roi_pct']:>+7.2f}% -{r['max_dd']:>4.1f}%")
print(f"{'='*80}")
if __name__ == "__main__":
main()

View File

@@ -15,8 +15,8 @@ load_dotenv(dotenv_path=Path(__file__).parent / ".env")
sys.path.insert(0, str(Path(__file__).parent))
# ── 파라미터 ─────────────────────────────────────────────
CACHE_FILE = Path("sim10m_cache.pkl")
TOP30_FILE = Path("top30_tickers.pkl")
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl"
TOP30_FILE = Path(__file__).parent.parent / "data" / "top30_tickers.pkl"
TOP_N = 20
BUDGET = 15_000_000

448
tests/sim_current.py Normal file
View File

@@ -0,0 +1,448 @@
"""현재 전략 기준 45일 복리 시뮬레이션 — 40분봉.
sim_45m40.py의 검증된 코어 로직을 기반으로
현재 전략 추가사항만 반영:
+ F&G 필터 (FNG_MIN_ENTRY=41)
+ 시장 레짐 필터 (BEAR < -0.5% → 차단, BULL ≥ 1.5% → vol_mult 완화)
+ 신호 강도별 진입 임계값 티어 (5x→1%, 3.5x→2%, 2.5x→3%, 기본→5%)
+ 속도 기반 조기 진입 (0.10%/분)
"""
import json
import os
import pickle
import sys
import urllib.request
import datetime
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
sys.path.insert(0, str(Path(__file__).parent.parent))
# ── 파라미터 ─────────────────────────────────────────────
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl"
TOP_N = 20
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
TIME_STOP_MIN_PCT= 3.0
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
VOL_MULT_NEUTRAL = 2.0 # NEUTRAL 레짐
VOL_MULT_BULL = 1.5 # BULL 레짐
QUIET_PCT = 2.0
THRESH_BASE = 5.0 # 기본 진입 임계값 (TREND_AFTER_VOL)
# 신호 강도별 임계값 티어
ENTRY_TIERS = [(5.0, 1.0), (3.5, 2.0), (2.5, 3.0)]
# 속도 진입
VELOCITY_THRESHOLD = 0.10 # %/분
VELOCITY_MIN_MOVE = 0.5 # 최소 이동 %
VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분
# F&G
FNG_MIN_ENTRY = int(os.getenv("FNG_MIN_ENTRY", "41"))
# 레짐
BEAR_THRESHOLD = -0.5
BULL_THRESHOLD = 1.5
REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30,
"KRW-SOL": 0.15, "KRW-XRP": 0.15}
# 40분봉 봉수 환산
LOCAL_VOL_N = 7 # 5h
QUIET_N = 3 # 2h
SIGNAL_TO_N = 12 # 8h
ATR_N = 7
TS_N = 12 # 8h (타임스탑)
REGIME_N = 3 # 2h (레짐 추세)
# ── F&G 히스토리 ─────────────────────────────────────────
def load_fng_history() -> dict:
try:
url = "https://api.alternative.me/fng/?limit=90&format=json"
with urllib.request.urlopen(url, timeout=10) as r:
data = json.loads(r.read())
result = {}
for e in data["data"]:
dt = datetime.datetime.fromtimestamp(int(e["timestamp"]))
result[dt.strftime("%Y-%m-%d")] = int(e["value"])
return result
except Exception as ex:
print(f" [경고] F&G 로드 실패: {ex} → 필터 비활성화")
return {}
# ── 리샘플링 ─────────────────────────────────────────────
def resample_40m(df: pd.DataFrame) -> pd.DataFrame:
return (
df.resample("40min")
.agg({"open": "first", "high": "max", "low": "min",
"close": "last", "volume": "sum"})
.dropna(subset=["close"])
)
# ── 레짐 시리즈 ──────────────────────────────────────────
def build_regime_series(dfs: dict) -> pd.Series:
weighted = None
for ticker, w in REGIME_WEIGHTS.items():
if ticker not in dfs:
continue
pct = dfs[ticker]["close"].pct_change(REGIME_N) * 100
weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0)
return weighted if weighted is not None else pd.Series(dtype=float)
# ── 임계값 ───────────────────────────────────────────────
def calc_entry_threshold(vol_ratio: float) -> float:
for min_r, thr in ENTRY_TIERS:
if vol_ratio >= min_r:
return thr
return THRESH_BASE
# ── ATR ──────────────────────────────────────────────────
def calc_atr(df: pd.DataFrame, buy_idx: int) -> float:
sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
# ── 포지션 시뮬 (기존 sim_45m40.py와 동일) ───────────────
def simulate_pos(df: pd.DataFrame, buy_idx: int,
buy_price: float, stop_pct: float):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
ts = df.index[i]
if row["high"] > peak:
peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, ts, pnl, "trailing_stop"
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, ts, pnl, "time_stop"
last = df.iloc[-1]["close"]
pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[-1], pnl, "end_of_data"
# ── vol-lead 전략 (현재 전략 파라미터 전체 반영) ──────────
def run_vol_lead(df: pd.DataFrame, ticker: str,
fng_map: dict, regime_series: pd.Series) -> list:
trades = []
sig_i = None # 신호 봉 인덱스
sig_p = None # 신호가
sig_vr = 0.0 # 신호 vol_ratio
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(LOCAL_VOL_N + 2, QUIET_N + 1)
while i < len(df):
ts = df.index[i]
row = df.iloc[i]
cur = row["close"]
# ── 포지션 보유 중: 청산 체크 ─────────────────────
if in_pos:
is_win, sdt, pnl, reason = simulate_pos(df, buy_idx, buy_price, stop_pct)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, reason))
in_pos = False
sig_i = sig_p = None
i = next_i
continue
# ── F&G 필터 ──────────────────────────────────────
date_str = ts.strftime("%Y-%m-%d")
if fng_map:
fv = fng_map.get(date_str, 50)
if fv < FNG_MIN_ENTRY:
sig_i = sig_p = None # 신호 초기화
i += 1
continue
# ── 레짐 필터 ─────────────────────────────────────
score = 0.0
if not regime_series.empty and ts in regime_series.index:
v = regime_series.loc[ts]
score = float(v) if not pd.isna(v) else 0.0
if score < BEAR_THRESHOLD:
sig_i = sig_p = None
i += 1
continue
vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_NEUTRAL
# ── 신호 타임아웃 ──────────────────────────────────
if sig_i is not None and (i - sig_i) > SIGNAL_TO_N:
sig_i = sig_p = None
# ── 신호 있음: 진입 체크 ──────────────────────────
if sig_i is not None:
move_pct = (cur - sig_p) / sig_p * 100
age_min = (i - sig_i) * 40 # 봉수 → 분
entry_thr = calc_entry_threshold(sig_vr)
if cur < sig_p:
# 신호가 이하 하락 → 초기화
sig_i = sig_p = None
elif move_pct >= entry_thr:
# 거리 기반 진입
in_pos = True
buy_idx = i
buy_price = cur
stop_pct = calc_atr(df, i)
sig_i = sig_p = None
elif age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE:
velocity = move_pct / age_min
if velocity >= VELOCITY_THRESHOLD:
# 속도 기반 조기 진입
in_pos = True
buy_idx = i
buy_price = cur
stop_pct = calc_atr(df, i)
sig_i = sig_p = None
i += 1
continue
# ── 신호 없음: 축적 조건 체크 ────────────────────
vol_p = df.iloc[i - 1]["volume"]
vol_avg = df.iloc[i - 1 - LOCAL_VOL_N:i - 1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
close_qh = df.iloc[i - QUIET_N]["close"]
chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999
quiet = chg_qh < QUIET_PCT
spike = vol_r >= vol_mult
if quiet and spike:
if sig_i is None:
sig_i = i
sig_p = cur
sig_vr = vol_r
else:
if sig_i is not None and cur < sig_p:
sig_i = sig_p = None
i += 1
return trades
# ── WF 필터 (기존 동일) ──────────────────────────────────
def apply_wf(trades: list) -> tuple:
history = []
shadow_streak = 0
blocked = False
accepted = []
blocked_cnt = 0
for trade in trades:
is_win = int(trade[0])
if not blocked:
accepted.append(trade)
history.append(is_win)
if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
blocked = True
shadow_streak = 0
else:
blocked_cnt += 1
if is_win:
shadow_streak += 1
if shadow_streak >= WF_SHADOW_WINS:
blocked = False
history = []
shadow_streak = 0
else:
shadow_streak = 0
return accepted, blocked_cnt
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
# ── MAX_POSITIONS (기존 동일) ────────────────────────────
def apply_max_positions(all_trades: list) -> tuple:
open_exits, accepted, skipped = [], [], []
for trade in all_trades:
buy_dt, sell_dt = trade[2], trade[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt)
accepted.append(trade)
else:
skipped.append(trade)
return accepted, skipped
# ── 복리 시뮬 (기존 동일) ───────────────────────────────
def simulate(accepted: list) -> dict:
portfolio = float(BUDGET)
total_krw = 0.0
monthly = {}
trade_log = []
reason_cnt = {}
for trade in accepted:
is_win, pnl, buy_dt, sell_dt, ticker, reason = trade
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
reason_cnt[reason] = reason_cnt.get(reason, 0) + 1
ym = buy_dt.strftime("%Y-%m")
if ym not in monthly:
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
monthly[ym]["trades"] += 1
monthly[ym]["wins"] += int(is_win)
monthly[ym]["pnl_krw"] += krw_profit
trade_log.append({
"buy_dt": buy_dt, "sell_dt": sell_dt, "ticker": ticker,
"is_win": is_win, "pnl_pct": pnl, "reason": reason,
"pos_size": pos_size, "krw_profit": krw_profit,
"portfolio": portfolio,
})
wins = sum(1 for t in accepted if t[0])
return {
"portfolio": portfolio, "total_krw": total_krw,
"roi_pct": (portfolio - BUDGET) / BUDGET * 100,
"total": len(accepted), "wins": wins,
"wr": wins / len(accepted) * 100 if accepted else 0,
"monthly": monthly, "trade_log": trade_log,
"reason_cnt": reason_cnt,
}
# ── 메인 ─────────────────────────────────────────────────
def main():
print("=" * 62)
print("현재 전략 기준 시뮬 (F&G + 레짐 + 티어임계 + 속도진입)")
print("=" * 62)
print("F&G 히스토리 로드...")
fng_map = load_fng_history()
if fng_map:
vals = sorted(fng_map.items())
print(f" {vals[0][0]} ~ {vals[-1][0]} ({len(fng_map)}일)")
else:
print(" F&G 데이터 없음 — 필터 비활성화")
print("캐시 로드...")
cache = pickle.load(open(CACHE_FILE, "rb"))
tickers = [t for t in list(cache["10m"].keys())[:TOP_N]
if len(cache["10m"][t]) > 200]
print(f" 종목: {len(tickers)}\n")
dfs_40m = {t: resample_40m(cache["10m"][t]) for t in tickers}
print("레짐 시리즈 계산...")
regime_series = build_regime_series(dfs_40m)
sample_df = next(iter(dfs_40m.values()))
start_date = sample_df.index[0].strftime("%Y-%m-%d")
end_date = sample_df.index[-1].strftime("%Y-%m-%d")
print(f" 기간: {start_date} ~ {end_date}\n")
# F&G 차단 일수
if fng_map:
period_dates = [d for d in fng_map if start_date <= d <= end_date]
fng_blocked = sum(1 for d in period_dates if fng_map.get(d, 50) < FNG_MIN_ENTRY)
fng_allowed = len(period_dates) - fng_blocked
else:
fng_blocked = fng_allowed = 0
all_trades = []
wf_blocked = 0
for ticker in tickers:
df40 = dfs_40m[ticker]
raw = run_vol_lead(df40, ticker, fng_map, regime_series)
filtered, blocked = apply_wf(raw)
wf_blocked += blocked
all_trades.extend(filtered)
all_trades.sort(key=lambda x: x[2]) # buy_dt 기준 정렬
accepted, skipped = apply_max_positions(all_trades)
result = simulate(accepted)
# 최대 낙폭
peak = BUDGET
max_dd = 0.0
for t in result["trade_log"]:
peak = max(peak, t["portfolio"])
dd = (peak - t["portfolio"]) / peak * 100
max_dd = max(max_dd, dd)
total = result["total"]
wins = result["wins"]
print(f"{'='*62}")
print(f" 기간: {start_date} ~ {end_date} ({len(tickers)}종목 / 40분봉)")
print(f" F&G 차단: {fng_blocked}일 / 허용: {fng_allowed}일 (기준 FNG≥{FNG_MIN_ENTRY})")
print(f"{'='*62}")
print(f" 신호 발생: {len(all_trades)+wf_blocked:>4}건 (WF 차단: {wf_blocked}건)")
print(f" 실제 진입: {total:>4}건 ({len(skipped)}건 MAX_POS 스킵)")
print(f" 승 / 패: {wins}{total-wins}패 (승률 {result['wr']:.1f}%)"
if total else " 진입 없음")
print(f" {''*52}")
print(f" 초기 예산: {BUDGET:>15,}")
print(f" 최종 자산: {result['portfolio']:>15,.0f}")
print(f" 순수익: {result['total_krw']:>+15,.0f}")
print(f" 수익률: {result['roi_pct']:>+14.2f}%")
print(f" 최대 낙폭: {-max_dd:>+14.2f}%"
f" ({-max_dd/100*BUDGET:>+,.0f}원)")
monthly_krw = [m["pnl_krw"] for m in result["monthly"].values()]
avg_m = sum(monthly_krw) / len(monthly_krw) if monthly_krw else 0
print(f" 월평균 수익: {avg_m:>+13,.0f}")
print(f"\n── 청산 사유 {''*44}")
label_map = {"trailing_stop": "트레일링스탑", "time_stop": "타임스탑",
"end_of_data": "데이터종료"}
for r, cnt in sorted(result["reason_cnt"].items(), key=lambda x: -x[1]):
print(f" {label_map.get(r, r):12}: {cnt:>3}")
print(f"\n── 월별 수익 {''*44}")
print(f" {'':^8}{'거래':>4} {'승률':>5}"
f" {'월수익(KRW)':>13} {'누적수익(KRW)':>14}")
cum = 0.0
for ym, m in sorted(result["monthly"].items()):
wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0
cum += m["pnl_krw"]
print(f" {ym:^8}{m['trades']:>4}{wr:>4.0f}% │"
f" {m['pnl_krw']:>+13,.0f}{cum:>+14,.0f}")
print(f"\n── 파라미터 {''*46}")
print(f" F&G≥{FNG_MIN_ENTRY} 레짐BEAR<{BEAR_THRESHOLD}% BULL≥{BULL_THRESHOLD}%")
print(f" VOL: {VOL_MULT_NEUTRAL}x(중립)/{VOL_MULT_BULL}x(강세) 횡보<{QUIET_PCT}%")
print(f" 임계: 5x→1% / 3.5x→2% / 2.5x→3% / 기본→{THRESH_BASE}%")
print(f" 속도: ≥{VELOCITY_THRESHOLD}%/분 (≥{VELOCITY_MIN_MOVE}% / ≥{VELOCITY_MIN_AGE_M}분)")
print(f" ATR: ×{ATR_MULT} ({ATR_MIN*100:.0f}~{ATR_MAX*100:.0f}%) 타임스탑: 8h/{TIME_STOP_MIN_PCT}%")
print(f"{'='*62}")
if __name__ == "__main__":
main()

474
tests/sim_regime_1y.py Normal file
View File

@@ -0,0 +1,474 @@
"""레짐 기반 1년 시뮬레이션 — BULL 진입 vs Bear차단 vs 필터없음.
sim_45m40.py 검증된 코어 로직 기반.
데이터: data/sim1y_cache.pkl (10분봉 1년치)
data/fng_1y.json (F&G 1년치)
비교 구성:
1. 필터 없음 — 레짐/F&G 무관 진입
2. BEAR 차단 — 레짐 score < -0.5% 이면 차단 (현재 전략)
3. BULL 진입만 — 레짐 score ≥ 1.5% 일 때만 진입 ← 사용자 제안
4. BULL 진입 + F&G≥41 — BULL 조건에 F&G 필터 추가
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
import json
import pickle
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
# ── 데이터 경로 ───────────────────────────────────────────
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl"
FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json"
TOP_N = 20
# ── 전략 파라미터 (sim_45m40.py 동일) ────────────────────
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
TIME_STOP_MIN_PCT = 3.0
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
VOL_MULT_DEFAULT = 2.0 # 기본 (NEUTRAL / 필터없음)
VOL_MULT_BULL = 1.5 # BULL 레짐 완화
QUIET_PCT = 2.0
THRESH = 4.8 # sim_45m40.py 기준값
# 40분봉 봉수 환산
LOCAL_VOL_N = 7 # 5h
QUIET_N = 3 # 2h
SIGNAL_TO_N = 12 # 8h
ATR_N = 7
TS_N = 12 # 8h (타임스탑)
REGIME_N = 3 # 2h (레짐 추세)
# 레짐 임계
BEAR_THRESHOLD = -0.5
BULL_THRESHOLD = 1.5
# 레짐 계산 가중치
REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30,
"KRW-SOL": 0.15, "KRW-XRP": 0.15}
# WF 파라미터
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
# ── 리샘플링 ─────────────────────────────────────────────
def resample_40m(df: pd.DataFrame) -> pd.DataFrame:
return (
df.resample("40min")
.agg({"open": "first", "high": "max", "low": "min",
"close": "last", "volume": "sum"})
.dropna(subset=["close"])
)
# ── 레짐 시리즈 ──────────────────────────────────────────
def build_regime_series(dfs40: dict) -> pd.Series:
weighted = None
for ticker, w in REGIME_WEIGHTS.items():
if ticker not in dfs40:
continue
pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100
weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0)
return weighted if weighted is not None else pd.Series(dtype=float)
# ── ATR ──────────────────────────────────────────────────
def calc_atr(df: pd.DataFrame, buy_idx: int) -> float:
sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
# ── 포지션 시뮬 ──────────────────────────────────────────
def simulate_pos(df: pd.DataFrame, buy_idx: int,
buy_price: float, stop_pct: float):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
ts = df.index[i]
if row["high"] > peak:
peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, ts, pnl
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, ts, pnl
last = df.iloc[-1]["close"]
pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, df.index[-1], pnl
# ── 전략 실행 ─────────────────────────────────────────────
def run_strategy(df: pd.DataFrame, ticker: str,
regime_series: pd.Series, fng_map: dict,
mode: str) -> list:
"""
mode:
'none' — 레짐/F&G 필터 없음
'bear_off' — BEAR 차단만 (score < BEAR_THRESHOLD 시 스킵)
'bull_only'— BULL 진입만 (score >= BULL_THRESHOLD 일 때만)
'bull_fng' — BULL + F&G≥41
"""
trades = []
sig_i = sig_p = None
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(LOCAL_VOL_N + 2, QUIET_N + 1)
while i < len(df):
ts = df.index[i]
row = df.iloc[i]
cur = row["close"]
# ── 포지션 보유 중 ────────────────────────────────
if in_pos:
is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker))
in_pos = False
sig_i = sig_p = None
i = next_i
continue
# ── 레짐 스코어 계산 ─────────────────────────────
score = 0.0
if not regime_series.empty and ts in regime_series.index:
v = regime_series.loc[ts]
score = float(v) if not pd.isna(v) else 0.0
# ── 모드별 진입 필터 ─────────────────────────────
if mode == "bear_off":
if score < BEAR_THRESHOLD:
sig_i = sig_p = None
i += 1
continue
vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_DEFAULT
elif mode == "bull_only":
if score < BULL_THRESHOLD:
sig_i = sig_p = None
i += 1
continue
vol_mult = VOL_MULT_BULL
elif mode == "bull_fng":
if score < BULL_THRESHOLD:
sig_i = sig_p = None
i += 1
continue
date_str = ts.strftime("%Y-%m-%d")
fv = fng_map.get(date_str, 50) if fng_map else 50
if fv < 41:
sig_i = sig_p = None
i += 1
continue
vol_mult = VOL_MULT_BULL
else: # 'none'
vol_mult = VOL_MULT_DEFAULT
# ── 신호 타임아웃 ─────────────────────────────────
if sig_i is not None and (i - sig_i) > SIGNAL_TO_N:
sig_i = sig_p = None
# ── 신호 있음: 진입 체크 ──────────────────────────
if sig_i is not None:
move_pct = (cur - sig_p) / sig_p * 100
if cur < sig_p:
sig_i = sig_p = None
elif move_pct >= THRESH:
in_pos = True
buy_idx = i
buy_price = cur
stop_pct = calc_atr(df, i)
sig_i = sig_p = None
i += 1
continue
# ── 신호 없음: 축적 조건 체크 ────────────────────
vol_p = df.iloc[i - 1]["volume"]
vol_avg = df.iloc[i - 1 - LOCAL_VOL_N:i - 1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
close_qh = df.iloc[i - QUIET_N]["close"]
chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999
quiet = chg_qh < QUIET_PCT
spike = vol_r >= vol_mult
if quiet and spike:
if sig_i is None:
sig_i = i
sig_p = cur
else:
if sig_i is not None and cur < sig_p:
sig_i = sig_p = None
i += 1
return trades
# ── WF 필터 ──────────────────────────────────────────────
def apply_wf(trades: list) -> tuple:
history = []
shadow_streak = 0
blocked = False
accepted = []
blocked_cnt = 0
for trade in trades:
is_win = int(trade[0])
if not blocked:
accepted.append(trade)
history.append(is_win)
if len(history) >= WF_WINDOW:
wr = sum(history[-WF_WINDOW:]) / WF_WINDOW
if wr < WF_MIN_WIN_RATE:
blocked = True
shadow_streak = 0
else:
blocked_cnt += 1
if is_win:
shadow_streak += 1
if shadow_streak >= WF_SHADOW_WINS:
blocked = False
history = []
shadow_streak = 0
else:
shadow_streak = 0
return accepted, blocked_cnt
# ── MAX_POSITIONS ────────────────────────────────────────
def apply_max_positions(all_trades: list) -> tuple:
open_exits, accepted, skipped = [], [], []
for trade in all_trades:
buy_dt, sell_dt = trade[2], trade[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt)
accepted.append(trade)
else:
skipped.append(trade)
return accepted, skipped
# ── 복리 시뮬 ────────────────────────────────────────────
def run_compound(accepted: list) -> dict:
portfolio = float(BUDGET)
total_krw = 0.0
monthly = {}
trade_log = []
for is_win, pnl, buy_dt, sell_dt, ticker in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
ym = buy_dt.strftime("%Y-%m")
if ym not in monthly:
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
monthly[ym]["trades"] += 1
monthly[ym]["wins"] += int(is_win)
monthly[ym]["pnl_krw"] += krw_profit
trade_log.append({"buy_dt": buy_dt, "sell_dt": sell_dt,
"ticker": ticker, "is_win": is_win,
"pnl_pct": pnl, "portfolio": portfolio})
wins = sum(1 for t in accepted if t[0])
return {
"portfolio": portfolio,
"total_krw": total_krw,
"roi_pct": (portfolio - BUDGET) / BUDGET * 100,
"total": len(accepted),
"wins": wins,
"wr": wins / len(accepted) * 100 if accepted else 0,
"monthly": monthly,
"trade_log": trade_log,
}
# ── 결과 출력 ────────────────────────────────────────────
def print_result(label: str, result: dict, skipped: int, wf_blocked: int):
r = result
peak = BUDGET
max_dd = 0.0
for t in r["trade_log"]:
peak = max(peak, t["portfolio"])
dd = (peak - t["portfolio"]) / peak * 100
max_dd = max(max_dd, dd)
monthly_krw = [m["pnl_krw"] for m in r["monthly"].values()]
avg_m = sum(monthly_krw) / len(monthly_krw) if monthly_krw else 0
total = r["total"]
wins = r["wins"]
print(f"\n{''*60}")
print(f" [{label}]")
print(f" 진입: {total}건 (WF차단: {wf_blocked} / MAX_POS스킵: {skipped})")
if total:
print(f" 승패: {wins}{total-wins}패 (승률 {r['wr']:.1f}%)")
print(f" 초기 예산: {BUDGET:>15,}")
print(f" 최종 자산: {r['portfolio']:>15,.0f}")
print(f" 순수익: {r['total_krw']:>+15,.0f}")
print(f" 수익률: {r['roi_pct']:>+14.2f}%")
print(f" 최대 낙폭: {-max_dd:>+14.2f}%")
print(f" 월평균 수익: {avg_m:>+13,.0f}")
def print_monthly(result: dict):
print(f" {'':^8}{'거래':>4} {'승률':>5}{'월수익(KRW)':>13} {'누적수익(KRW)':>14}")
cum = 0.0
for ym, m in sorted(result["monthly"].items()):
wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0
cum += m["pnl_krw"]
print(f" {ym:^8}{m['trades']:>4}{wr:>4.0f}% │"
f" {m['pnl_krw']:>+13,.0f}{cum:>+14,.0f}")
# ── 메인 ─────────────────────────────────────────────────
def main():
# ── 데이터 로드 ───────────────────────────────────────
if not CACHE_FILE.exists():
print(f"[오류] 캐시 없음: {CACHE_FILE}")
print(" 먼저 tests/collect_1y_data.py 를 실행하세요.")
return
print("캐시 로드 중...")
cache = pickle.load(open(CACHE_FILE, "rb"))
all_tickers = list(cache["10m"].keys())[:TOP_N]
tickers = [t for t in all_tickers if len(cache["10m"][t]) > 500]
print(f" 유효 종목: {len(tickers)}")
# F&G 로드
fng_map: dict = {}
if FNG_FILE.exists():
fng_map = json.load(open(FNG_FILE))
fng_dates = sorted(fng_map.keys())
print(f" F&G: {fng_dates[0]} ~ {fng_dates[-1]} ({len(fng_map)}일)")
else:
print(" [경고] F&G 데이터 없음")
# 리샘플링
dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers}
# 레짐 시리즈
regime_series = build_regime_series(dfs40)
# 기간 정보
sample = next(iter(dfs40.values()))
start_dt = sample.index[0].strftime("%Y-%m-%d")
end_dt = sample.index[-1].strftime("%Y-%m-%d")
# 레짐 분포 계산
if not regime_series.empty:
valid = regime_series.dropna()
bull_cnt = (valid >= BULL_THRESHOLD).sum()
bear_cnt = (valid < BEAR_THRESHOLD).sum()
neut_cnt = len(valid) - bull_cnt - bear_cnt
total_cnt = len(valid)
print(f"\n 레짐 분포 ({total_cnt}봉 기준):")
print(f" BULL (≥{BULL_THRESHOLD}%) : {bull_cnt:>6}봉 ({bull_cnt/total_cnt*100:.1f}%)")
print(f" NEUTRAL : {neut_cnt:>6}봉 ({neut_cnt/total_cnt*100:.1f}%)")
print(f" BEAR (<{BEAR_THRESHOLD}%) : {bear_cnt:>6}봉 ({bear_cnt/total_cnt*100:.1f}%)")
# F&G 분포 (해당 기간)
if fng_map:
period_fng = {k: v for k, v in fng_map.items() if start_dt <= k <= end_dt}
zones = {"극공포(≤25)": 0, "공포(26~40)": 0, "중립+(≥41)": 0}
for v in period_fng.values():
if v <= 25: zones["극공포(≤25)"] += 1
elif v <= 40: zones["공포(26~40)"] += 1
else: zones["중립+(≥41)"] += 1
tot = sum(zones.values())
print(f"\n F&G 분포 (동 기간 {tot}일):")
for name, cnt in zones.items():
print(f" {name:12} {cnt:>3}일 ({cnt/tot*100:.1f}%)")
print(f"\n{'='*60}")
print(f" 레짐 BULL 진입 시뮬 | 1년 | {len(tickers)}종목 | 40분봉")
print(f" 기간: {start_dt} ~ {end_dt}")
print(f"{'='*60}")
# ── 4가지 시뮬 실행 ───────────────────────────────────
CONFIGS = [
("none", "필터 없음"),
("bear_off", "BEAR 차단 (현재)"),
("bull_only","BULL 진입만"),
("bull_fng", "BULL + F&G≥41"),
]
results = {}
for mode, label in CONFIGS:
all_trades = []
wf_total = 0
for ticker in tickers:
df40 = dfs40[ticker]
raw = run_strategy(df40, ticker, regime_series, fng_map, mode)
filtered, blocked = apply_wf(raw)
wf_total += blocked
all_trades.extend(filtered)
all_trades.sort(key=lambda x: x[2])
accepted, skipped = apply_max_positions(all_trades)
result = run_compound(accepted)
results[label] = result
print_result(label, result, len(skipped), wf_total)
# ── 요약 비교 ─────────────────────────────────────────
print(f"\n{'='*60}")
print(f" 요약 비교")
print(f"{'='*60}")
print(f" {'구성':<22} {'진입':>5} {'승률':>6} {'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}")
print(f" {''*58}")
for mode, label in CONFIGS:
r = results[label]
total = r["total"]
if total == 0:
print(f" {label:<22} {'진입없음':>34}")
continue
peak = BUDGET
max_dd = 0.0
for t in r["trade_log"]:
peak = max(peak, t["portfolio"])
dd = (peak - t["portfolio"]) / peak * 100
max_dd = max(max_dd, dd)
print(
f" {label:<22} {total:>5}{r['wr']:>5.1f}% "
f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{max_dd:.1f}%"
)
# ── 월별 상세 (BULL 진입만) ───────────────────────────
print(f"\n{'='*60}")
print(f" 월별 상세 — BULL 진입만")
print(f"{'='*60}")
print_monthly(results["BULL 진입만"])
print(f"\n 월별 상세 — BEAR 차단 (현재)")
print_monthly(results["BEAR 차단 (현재)"])
print(f"{'='*60}")
if __name__ == "__main__":
main()

322
tests/sim_regime_sweep.py Normal file
View File

@@ -0,0 +1,322 @@
"""레짐 REGIME_N 스윕 — BULL 진입 기준 봉수 최적화.
REGIME_N (pct_change 봉수) 를 1~8봉(40분~320분) 으로 변화시키며
BULL 진입만 / BEAR 차단 / 필터없음 비교.
데이터: data/sim1y_cache.pkl (10분봉 1년치)
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
import json
import pickle
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl"
FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json"
TOP_N = 20
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
TIME_STOP_MIN_PCT = 3.0
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
VOL_MULT_DEFAULT = 2.0
VOL_MULT_BULL = 1.5
QUIET_PCT = 2.0
THRESH = 4.8
LOCAL_VOL_N = 7
QUIET_N = 3
SIGNAL_TO_N = 12
ATR_N = 7
TS_N = 12
BEAR_THRESHOLD = -0.5
BULL_THRESHOLD = 1.5
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30,
"KRW-SOL": 0.15, "KRW-XRP": 0.15}
def resample_40m(df):
return (df.resample("40min")
.agg({"open":"first","high":"max","low":"min",
"close":"last","volume":"sum"})
.dropna(subset=["close"]))
def build_regime_series(dfs40, regime_n):
weighted = None
for ticker, w in REGIME_WEIGHTS.items():
if ticker not in dfs40:
continue
pct = dfs40[ticker]["close"].pct_change(regime_n) * 100
weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0)
return weighted if weighted is not None else pd.Series(dtype=float)
def calc_atr(df, buy_idx):
sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx]
if len(sub) < 3:
return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
def simulate_pos(df, buy_idx, buy_price, stop_pct):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
if row["high"] > peak:
peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
last = df.iloc[-1]["close"]
pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[-1], pnl
def run_strategy(df, ticker, regime_series, mode):
trades = []
sig_i = sig_p = None
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(LOCAL_VOL_N + 2, QUIET_N + 1)
while i < len(df):
ts = df.index[i]
row = df.iloc[i]
cur = row["close"]
if in_pos:
is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker))
in_pos = False; sig_i = sig_p = None; i = next_i
continue
score = 0.0
if not regime_series.empty and ts in regime_series.index:
v = regime_series.loc[ts]
score = float(v) if not pd.isna(v) else 0.0
if mode == "bear_off":
if score < BEAR_THRESHOLD:
sig_i = sig_p = None; i += 1; continue
vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_DEFAULT
elif mode == "bull_only":
if score < BULL_THRESHOLD:
sig_i = sig_p = None; i += 1; continue
vol_mult = VOL_MULT_BULL
else:
vol_mult = VOL_MULT_DEFAULT
if sig_i is not None and (i - sig_i) > SIGNAL_TO_N:
sig_i = sig_p = None
if sig_i is not None:
move_pct = (cur - sig_p) / sig_p * 100
if cur < sig_p:
sig_i = sig_p = None
elif move_pct >= THRESH:
in_pos = True; buy_idx = i; buy_price = cur
stop_pct = calc_atr(df, i); sig_i = sig_p = None
i += 1; continue
vol_p = df.iloc[i-1]["volume"]
vol_avg = df.iloc[i-1-LOCAL_VOL_N:i-1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
close_qh = df.iloc[i-QUIET_N]["close"]
chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999
if chg_qh < QUIET_PCT and vol_r >= vol_mult:
if sig_i is None:
sig_i = i; sig_p = cur
else:
if sig_i is not None and cur < sig_p:
sig_i = sig_p = None
i += 1
return trades
def apply_wf(trades):
history = []; shadow = 0; blocked = False; accepted = []; cnt = 0
for t in trades:
is_win = int(t[0])
if not blocked:
accepted.append(t); history.append(is_win)
if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
blocked = True; shadow = 0
else:
cnt += 1
if is_win:
shadow += 1
if shadow >= WF_SHADOW_WINS:
blocked = False; history = []; shadow = 0
else:
shadow = 0
return accepted, cnt
def apply_max_pos(trades):
open_exits = []; accepted = []; skipped = []
for t in trades:
buy_dt, sell_dt = t[2], t[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt); accepted.append(t)
else:
skipped.append(t)
return accepted, skipped
def run_compound(accepted):
portfolio = float(BUDGET); total_krw = 0.0; monthly = {}
for is_win, pnl, buy_dt, sell_dt, ticker in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
ym = buy_dt.strftime("%Y-%m")
if ym not in monthly:
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
monthly[ym]["trades"] += 1
monthly[ym]["wins"] += int(is_win)
monthly[ym]["pnl_krw"] += krw_profit
wins = sum(1 for t in accepted if t[0])
peak = BUDGET; max_dd = 0.0
pf = float(BUDGET)
for is_win, pnl, buy_dt, sell_dt, ticker in accepted:
pf = max(pf + max(pf, MIN_BUDGET) / MAX_POS * pnl / 100, MIN_BUDGET)
peak = max(peak, pf); max_dd = max(max_dd, (peak-pf)/peak*100)
return {
"portfolio": portfolio, "total_krw": total_krw,
"roi_pct": (portfolio-BUDGET)/BUDGET*100,
"total": len(accepted), "wins": wins,
"wr": wins/len(accepted)*100 if accepted else 0,
"monthly": monthly, "max_dd": max_dd,
}
def sim_one(dfs40, regime_n, mode):
rs = build_regime_series(dfs40, regime_n)
all_trades = []; wf_total = 0
for ticker, df40 in dfs40.items():
raw = run_strategy(df40, ticker, rs, mode)
filtered, blocked = apply_wf(raw)
wf_total += blocked
all_trades.extend(filtered)
all_trades.sort(key=lambda x: x[2])
accepted, skipped = apply_max_pos(all_trades)
result = run_compound(accepted)
# BULL 비율
if not rs.empty:
valid = rs.dropna()
bull_pct = (valid >= BULL_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0
bear_pct = (valid < BEAR_THRESHOLD).sum() / len(valid) * 100 if len(valid) else 0
else:
bull_pct = bear_pct = 0
return result, bull_pct, bear_pct, wf_total, len(skipped)
def main():
print("캐시 로드 중...")
cache = pickle.load(open(CACHE_FILE, "rb"))
tickers = [t for t in list(cache["10m"].keys())[:TOP_N]
if len(cache["10m"][t]) > 500]
print(f" 종목: {len(tickers)}\n")
dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers}
sample = next(iter(dfs40.values()))
start_dt = sample.index[0].strftime("%Y-%m-%d")
end_dt = sample.index[-1].strftime("%Y-%m-%d")
SWEEP_N = [1, 2, 3, 4, 5, 6, 8, 10] # 40분 ~ 400분 (6.7h)
# ── BULL 진입만 스윕 ──────────────────────────────────
print(f"{'='*72}")
print(f" REGIME_N 스윕 (40분봉 × N봉 변화율 기준 | BULL≥{BULL_THRESHOLD}%)")
print(f" 기간: {start_dt} ~ {end_dt} / {len(tickers)}종목")
print(f"{'='*72}")
print(f" {'N봉':>4} {'시간':>5}{'BULL%':>6} {'BEAR%':>6}"
f"{'진입':>5} {'승률':>5}{'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}")
print(f" {''*68}")
bull_results = {}
for n in SWEEP_N:
r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bull_only")
bull_results[n] = r
mins = n * 40
h = mins // 60
m = mins % 60
time_label = f"{h}h{m:02d}m" if m else f"{h}h"
if r["total"] == 0:
print(f" {n:>4}{time_label:>5}{bull_pct:>5.1f}% {bear_pct:>5.1f}% │ "
f"{'진입없음':>34}")
else:
print(f" {n:>4}{time_label:>5}{bull_pct:>5.1f}% {bear_pct:>5.1f}% │ "
f"{r['total']:>5}{r['wr']:>4.1f}% │ "
f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%")
# ── BEAR 차단 스윕 ────────────────────────────────────
print(f"\n{'='*72}")
print(f" REGIME_N 스윕 (BEAR 차단 모드 | BEAR<{BEAR_THRESHOLD}%)")
print(f"{'='*72}")
print(f" {'N봉':>4} {'시간':>5}{'BULL%':>6} {'BEAR%':>6}"
f"{'진입':>5} {'승률':>5}{'수익률':>8} {'순수익(KRW)':>14} {'낙폭':>7}")
print(f" {''*68}")
bear_results = {}
for n in SWEEP_N:
r, bull_pct, bear_pct, wf_b, skip = sim_one(dfs40, n, "bear_off")
bear_results[n] = r
mins = n * 40
h = mins // 60; m = mins % 60
time_label = f"{h}h{m:02d}m" if m else f"{h}h"
print(f" {n:>4}{time_label:>5}{bull_pct:>5.1f}% {bear_pct:>5.1f}% │ "
f"{r['total']:>5}{r['wr']:>4.1f}% │ "
f"{r['roi_pct']:>+7.2f}% {r['total_krw']:>+13,.0f}원 -{r['max_dd']:>4.1f}%")
# ── 베이스라인 (필터없음) ─────────────────────────────
r_none, _, _, _, _ = sim_one(dfs40, 1, "none")
print(f"\n 베이스라인 (필터없음): {r_none['total']}{r_none['wr']:.1f}% "
f"{r_none['roi_pct']:+.2f}% {r_none['total_krw']:+,.0f}원 -{r_none['max_dd']:.1f}%")
# ── 최적 BULL 구간 ────────────────────────────────────
valid_bull = {n: r for n, r in bull_results.items() if r["total"] >= 5}
if valid_bull:
best_n = max(valid_bull, key=lambda n: valid_bull[n]["roi_pct"])
best_r = valid_bull[best_n]
print(f"\n ★ BULL 진입 최적 N: {best_n}봉({best_n*40}분) "
f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}"
f"승률 {best_r['wr']:.1f}%")
valid_bear = {n: r for n, r in bear_results.items() if r["total"] >= 5}
if valid_bear:
best_n = max(valid_bear, key=lambda n: valid_bear[n]["roi_pct"])
best_r = valid_bear[best_n]
print(f" ★ BEAR 차단 최적 N: {best_n}봉({best_n*40}분) "
f"수익률 {best_r['roi_pct']:+.2f}% 진입 {best_r['total']}"
f"승률 {best_r['wr']:.1f}%")
print(f"{'='*72}")
if __name__ == "__main__":
main()

353
tests/sim_vol_override.py Normal file
View File

@@ -0,0 +1,353 @@
"""볼륨 강도 기반 레짐+F&G 오버라이드 시뮬 — 1년치.
우선순위 로직:
1순위: vol_ratio ≥ VOL_OVERRIDE_THRESH → 레짐/F&G 무관 즉시 진입 허용
2순위: F&G < FNG_MIN_ENTRY → 차단
3순위: 레짐 BEAR → 차단
4순위: 일반 vol-lead 로직
비교 구성:
1. 필터 없음
2. F&G≥41 + BEAR차단N5 (현재 전략 레짐 적용)
3. F&G≥41 + BEAR차단N5 + vol≥5x 오버라이드 (레짐+F&G 동시 오버라이드)
4. F&G≥41 + BEAR차단N5 + vol≥4x 오버라이드
5. F&G≥41 + BEAR차단N5 + vol≥3x 오버라이드
데이터: data/sim1y_cache.pkl / data/fng_1y.json
"""
import os as _os, sys as _sys
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
import json
import pickle
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim1y_cache.pkl"
FNG_FILE = Path(__file__).parent.parent / "data" / "fng_1y.json"
TOP_N = 20
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10
MAX_POS = 3
FEE = 0.0005
TIME_STOP_MIN_PCT = 3.0
ATR_MULT = 1.5
ATR_MIN = 0.010
ATR_MAX = 0.020
VOL_MULT_DEFAULT = 2.0
VOL_MULT_BULL = 1.5
QUIET_PCT = 2.0
THRESH = 4.8
LOCAL_VOL_N = 7
QUIET_N = 3
SIGNAL_TO_N = 12
ATR_N = 7
TS_N = 12
BEAR_THRESHOLD = -0.5
BULL_THRESHOLD = 1.5
REGIME_N = 5
FNG_MIN_ENTRY = 41
WF_WINDOW = 4
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30,
"KRW-SOL": 0.15, "KRW-XRP": 0.15}
def resample_40m(df):
return (df.resample("40min")
.agg({"open":"first","high":"max","low":"min",
"close":"last","volume":"sum"})
.dropna(subset=["close"]))
def build_regime_series(dfs40):
weighted = None
for ticker, w in REGIME_WEIGHTS.items():
if ticker not in dfs40: continue
pct = dfs40[ticker]["close"].pct_change(REGIME_N) * 100
weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0)
return weighted if weighted is not None else pd.Series(dtype=float)
def calc_atr(df, buy_idx):
sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx]
if len(sub) < 3: return ATR_MIN
try:
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean()
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
except Exception:
return ATR_MIN
def simulate_pos(df, buy_idx, buy_price, stop_pct):
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
if row["high"] > peak: peak = row["high"]
if row["low"] <= peak * (1 - stop_pct):
sp = peak * (1 - stop_pct)
pnl = (sp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
pnl_now = (row["close"] - buy_price) / buy_price * 100
if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"]*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[i], pnl
last = df.iloc[-1]["close"]
pnl = (last*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return pnl > 0, df.index[-1], pnl
def run_strategy(df, ticker, regime_series, fng_map,
use_fng, use_regime, vol_override_thresh):
"""
우선순위 구조:
① 포지션 청산 체크
② 볼륨 스파이크 감지 → 신호 기록 (F&G/레짐 무관, 항상 실행)
③ 진입 시점에서:
vol_strong(sig_vr≥thresh) → F&G+레짐 필터 전부 건너뜀
아니면 → F&G≥41 AND 레짐 BEAR 아닐 때만 진입 허용
"""
trades = []
sig_i = sig_p = sig_vr = None
in_pos = False
buy_idx = buy_price = stop_pct = None
i = max(LOCAL_VOL_N + 2, QUIET_N + 1)
while i < len(df):
ts = df.index[i]
row = df.iloc[i]
cur = row["close"]
# ── ① 포지션 청산 ────────────────────────────────
if in_pos:
is_win, sdt, pnl = simulate_pos(df, buy_idx, buy_price, stop_pct)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker))
in_pos = False; sig_i = sig_p = sig_vr = None; i = next_i
continue
# 신호 타임아웃
if sig_i is not None and (i - sig_i) > SIGNAL_TO_N:
sig_i = sig_p = sig_vr = None
# ── ② 신호 없을 때: 축적 감지 (필터 무관, 항상) ──
# F&G=14 극공포여도 vol 스파이크면 신호 기록 → ③에서 override 결정
if sig_i is None:
vol_p = df.iloc[i-1]["volume"]
vol_avg = df.iloc[i-1-LOCAL_VOL_N:i-1]["volume"].mean()
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
close_qh = df.iloc[i-QUIET_N]["close"]
chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999
if chg_qh < QUIET_PCT and vol_r >= VOL_MULT_DEFAULT:
sig_i = i; sig_p = cur; sig_vr = vol_r
i += 1
continue
# 신호가 이하 하락 → 초기화
if cur < sig_p:
sig_i = sig_p = sig_vr = None
i += 1
continue
# ── ③ 진입 체크 — vol_strong이면 필터 전부 스킵 ──
vol_strong = (vol_override_thresh > 0
and sig_vr is not None
and sig_vr >= vol_override_thresh)
if not vol_strong:
# F&G 필터 (신호 유지, 진입만 보류)
if use_fng and fng_map:
fv = fng_map.get(ts.strftime("%Y-%m-%d"), 50)
if fv < FNG_MIN_ENTRY:
i += 1; continue
# 레짐 필터 (신호 유지, 진입만 보류)
if use_regime and not regime_series.empty and ts in regime_series.index:
v = regime_series.loc[ts]
score = float(v) if not pd.isna(v) else 0.0
if score < BEAR_THRESHOLD:
i += 1; continue
move_pct = (cur - sig_p) / sig_p * 100
if move_pct >= THRESH:
in_pos = True; buy_idx = i; buy_price = cur
stop_pct = calc_atr(df, i); sig_i = sig_p = sig_vr = None
i += 1
return trades
def apply_wf(trades):
history = []; shadow = 0; blocked = False; accepted = []; cnt = 0
for t in trades:
is_win = int(t[0])
if not blocked:
accepted.append(t); history.append(is_win)
if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
blocked = True; shadow = 0
else:
cnt += 1
if is_win:
shadow += 1
if shadow >= WF_SHADOW_WINS: blocked = False; history = []; shadow = 0
else:
shadow = 0
return accepted, cnt
def apply_max_pos(trades):
open_exits = []; accepted = []; skipped = []
for t in trades:
buy_dt, sell_dt = t[2], t[3]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt); accepted.append(t)
else:
skipped.append(t)
return accepted, skipped
def run_compound(accepted):
portfolio = float(BUDGET); total_krw = 0.0; monthly = {}; trade_log = []
for is_win, pnl, buy_dt, sell_dt, ticker in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
ym = buy_dt.strftime("%Y-%m")
if ym not in monthly:
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
monthly[ym]["trades"] += 1; monthly[ym]["wins"] += int(is_win)
monthly[ym]["pnl_krw"] += krw_profit
trade_log.append({"portfolio": portfolio})
wins = sum(1 for t in accepted if t[0])
peak = BUDGET; max_dd = 0.0
for t in trade_log:
peak = max(peak, t["portfolio"])
max_dd = max(max_dd, (peak - t["portfolio"]) / peak * 100)
return {"portfolio": portfolio, "total_krw": total_krw,
"roi_pct": (portfolio-BUDGET)/BUDGET*100,
"total": len(accepted), "wins": wins,
"wr": wins/len(accepted)*100 if accepted else 0,
"monthly": monthly, "max_dd": max_dd}
def sim_one(dfs40, regime_series, fng_map, use_fng, use_regime, vol_override):
all_trades = []; wf_total = 0
for ticker, df40 in dfs40.items():
raw = run_strategy(df40, ticker, regime_series, fng_map,
use_fng, use_regime, vol_override)
filtered, blocked = apply_wf(raw)
wf_total += blocked; all_trades.extend(filtered)
all_trades.sort(key=lambda x: x[2])
accepted, skipped = apply_max_pos(all_trades)
return run_compound(accepted), wf_total, len(skipped)
def print_monthly(result, label):
print(f"\n ── 월별 상세: {label}")
print(f" {'':^8}{'거래':>4} {'승률':>5}{'월수익(KRW)':>13} {'누적(KRW)':>14}")
cum = 0.0
for ym, m in sorted(result["monthly"].items()):
wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0
cum += m["pnl_krw"]
flag = "" if m["pnl_krw"] > 0 else ""
print(f" {ym:^8}{m['trades']:>4}{wr:>4.0f}% │"
f" {m['pnl_krw']:>+13,.0f}{cum:>+13,.0f}{flag}")
def main():
print("캐시 로드 중...")
cache = pickle.load(open(CACHE_FILE, "rb"))
tickers = [t for t in list(cache["10m"].keys())[:TOP_N]
if len(cache["10m"][t]) > 500]
print(f" 종목: {len(tickers)}")
fng_map = {}
if FNG_FILE.exists():
fng_map = json.load(open(FNG_FILE))
dates = sorted(fng_map.keys())
print(f" F&G: {dates[0]} ~ {dates[-1]} ({len(fng_map)}일)")
dfs40 = {t: resample_40m(cache["10m"][t]) for t in tickers}
regime_series = build_regime_series(dfs40)
sample = next(iter(dfs40.values()))
start_dt = sample.index[0].strftime("%Y-%m-%d")
end_dt = sample.index[-1].strftime("%Y-%m-%d")
# 필터 적용 일수 통계
if fng_map:
period_fng = {k: v for k, v in fng_map.items() if start_dt <= k <= end_dt}
fng_blocked = sum(1 for v in period_fng.values() if v < FNG_MIN_ENTRY)
fng_allowed = len(period_fng) - fng_blocked
print(f" F&G 차단: {fng_blocked}일 / 허용: {fng_allowed}일 (기준 ≥{FNG_MIN_ENTRY})")
valid = regime_series.dropna()
bear_pct = (valid < BEAR_THRESHOLD).sum() / len(valid) * 100
print(f" 레짐 BEAR: {bear_pct:.1f}%봉 (REGIME_N={REGIME_N}봉={REGIME_N*40}분)\n")
# ── 시뮬 구성 ─────────────────────────────────────────
CONFIGS = [
# (use_fng, use_regime, vol_override, label)
(False, False, 0, "① 필터 없음"),
(True, True, 0, f"② F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"),
(True, True, 5.0, f"③ [1순위:vol≥5x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"),
(True, True, 4.0, f"④ [1순위:vol≥4x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"),
(True, True, 3.0, f"⑤ [1순위:vol≥3x] F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"),
]
print(f"{'='*72}")
print(f" vol 오버라이드 (레짐+F&G 동시) 시뮬 | 1년 | {len(tickers)}종목")
print(f" 기간: {start_dt} ~ {end_dt}")
print(f" 우선순위: vol≥Nx(오버라이드) > F&G필터 > 레짐필터 > vol-lead 로직")
print(f"{'='*72}")
print(f" {'구성':<48} {'진입':>5} {'승률':>5} {'수익률':>8} {'순수익':>12} {'낙폭':>6}")
print(f" {''*70}")
results = {}
for use_fng, use_regime, vol_ov, label in CONFIGS:
r, wf_b, skip = sim_one(dfs40, regime_series, fng_map,
use_fng, use_regime, vol_ov)
results[label] = r
n = r["total"]
print(f" {label:<48} {n:>5}{r['wr']:>4.1f}%"
f" {r['roi_pct']:>+7.2f}% {r['total_krw']:>+11,.0f}원 -{r['max_dd']:.1f}%")
# ── 월별 상세 ─────────────────────────────────────────
print(f"\n{'='*72}")
for use_fng, use_regime, vol_ov, label in CONFIGS:
if label in results:
print_monthly(results[label], label)
# ── 비교 요약 ─────────────────────────────────────────
print(f"\n{'='*72}")
base_label = f"② F&G≥{FNG_MIN_ENTRY} + BEAR차단N{REGIME_N}"
base_r = results.get(base_label)
if base_r:
print(f" 오버라이드 효과 (vs {base_label}):")
for _, _, vol_ov, label in CONFIGS[2:]:
r = results.get(label)
if r and r["total"] > 0:
d_roi = r["roi_pct"] - base_r["roi_pct"]
d_n = r["total"] - base_r["total"]
d_wr = r["wr"] - base_r["wr"]
d_dd = r["max_dd"] - base_r["max_dd"]
print(f" vol≥{vol_ov:.0f}x: 수익률 {d_roi:>+.2f}%p "
f"진입 {d_n:>+d}건 승률 {d_wr:>+.1f}%p 낙폭 {d_dd:>+.1f}%p")
best_label = max(results, key=lambda k: results[k]["roi_pct"])
best = results[best_label]
print(f"\n ★ 최고 수익률: {best_label}")
print(f" 수익률 {best['roi_pct']:+.2f}% / 순수익 {best['total_krw']:+,.0f}"
f"/ 낙폭 -{best['max_dd']:.1f}%")
print(f"{'='*72}")
if __name__ == "__main__":
main()