- core/strategy.py: full rewrite to Volume Lead strategy - 10m candle direct detection (no 40m resampling) - F&G 3-tier vol threshold: <=40->6x, 41-50->5x, >50->blocked - Undying signal: price drop does not cancel signal (sig_p fixed) - Vol refresh: stronger vol_r updates signal price and timer - Watch alert: 4x-6x approaching threshold notifies via Telegram - WATCH_VOL_THRESH=4.0, WATCH_COOLDOWN_MIN=30, WATCH_VOL_JUMP=0.5 - daemon/runner.py: remove FNG_MIN_ENTRY block and Bear regime block - Only FNG_MAX_ENTRY(>50) blocks scan (greed/extreme greed) - Fast-poll loop cleaned of regime check - core/notify.py: add notify_watch() for near-signal Telegram alerts - Shows vol_r, distance to threshold, price, quiet pct - tests/: add 1y data collection and simulation scripts - collect_1y_data.py, refresh_cache.py - sim_10m_vol.py, sim_current.py, sim_regime_1y.py - sim_regime_sweep.py, sim_vol_override.py Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
449 lines
17 KiB
Python
449 lines
17 KiB
Python
"""현재 전략 기준 45일 복리 시뮬레이션 — 40분봉.
|
||
|
||
sim_45m40.py의 검증된 코어 로직을 기반으로
|
||
현재 전략 추가사항만 반영:
|
||
+ F&G 필터 (FNG_MIN_ENTRY=41)
|
||
+ 시장 레짐 필터 (BEAR < -0.5% → 차단, BULL ≥ 1.5% → vol_mult 완화)
|
||
+ 신호 강도별 진입 임계값 티어 (5x→1%, 3.5x→2%, 2.5x→3%, 기본→5%)
|
||
+ 속도 기반 조기 진입 (0.10%/분)
|
||
"""
|
||
|
||
import json
|
||
import os
|
||
import pickle
|
||
import sys
|
||
import urllib.request
|
||
import datetime
|
||
from pathlib import Path
|
||
|
||
import pandas as pd
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv(dotenv_path=Path(__file__).parent.parent / ".env")
|
||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||
|
||
# ── 파라미터 ─────────────────────────────────────────────
|
||
CACHE_FILE = Path(__file__).parent.parent / "data" / "sim10m_cache.pkl"
|
||
TOP_N = 20
|
||
|
||
BUDGET = 15_000_000
|
||
MIN_BUDGET = BUDGET * 3 // 10
|
||
MAX_POS = 3
|
||
|
||
FEE = 0.0005
|
||
TIME_STOP_MIN_PCT= 3.0
|
||
ATR_MULT = 1.5
|
||
ATR_MIN = 0.010
|
||
ATR_MAX = 0.020
|
||
|
||
VOL_MULT_NEUTRAL = 2.0 # NEUTRAL 레짐
|
||
VOL_MULT_BULL = 1.5 # BULL 레짐
|
||
QUIET_PCT = 2.0
|
||
THRESH_BASE = 5.0 # 기본 진입 임계값 (TREND_AFTER_VOL)
|
||
|
||
# 신호 강도별 임계값 티어
|
||
ENTRY_TIERS = [(5.0, 1.0), (3.5, 2.0), (2.5, 3.0)]
|
||
|
||
# 속도 진입
|
||
VELOCITY_THRESHOLD = 0.10 # %/분
|
||
VELOCITY_MIN_MOVE = 0.5 # 최소 이동 %
|
||
VELOCITY_MIN_AGE_M = 5.0 # 최소 경과 분
|
||
|
||
# F&G
|
||
FNG_MIN_ENTRY = int(os.getenv("FNG_MIN_ENTRY", "41"))
|
||
|
||
# 레짐
|
||
BEAR_THRESHOLD = -0.5
|
||
BULL_THRESHOLD = 1.5
|
||
REGIME_WEIGHTS = {"KRW-BTC": 0.40, "KRW-ETH": 0.30,
|
||
"KRW-SOL": 0.15, "KRW-XRP": 0.15}
|
||
|
||
# 40분봉 봉수 환산
|
||
LOCAL_VOL_N = 7 # 5h
|
||
QUIET_N = 3 # 2h
|
||
SIGNAL_TO_N = 12 # 8h
|
||
ATR_N = 7
|
||
TS_N = 12 # 8h (타임스탑)
|
||
REGIME_N = 3 # 2h (레짐 추세)
|
||
|
||
|
||
# ── F&G 히스토리 ─────────────────────────────────────────
|
||
def load_fng_history() -> dict:
|
||
try:
|
||
url = "https://api.alternative.me/fng/?limit=90&format=json"
|
||
with urllib.request.urlopen(url, timeout=10) as r:
|
||
data = json.loads(r.read())
|
||
result = {}
|
||
for e in data["data"]:
|
||
dt = datetime.datetime.fromtimestamp(int(e["timestamp"]))
|
||
result[dt.strftime("%Y-%m-%d")] = int(e["value"])
|
||
return result
|
||
except Exception as ex:
|
||
print(f" [경고] F&G 로드 실패: {ex} → 필터 비활성화")
|
||
return {}
|
||
|
||
|
||
# ── 리샘플링 ─────────────────────────────────────────────
|
||
def resample_40m(df: pd.DataFrame) -> pd.DataFrame:
|
||
return (
|
||
df.resample("40min")
|
||
.agg({"open": "first", "high": "max", "low": "min",
|
||
"close": "last", "volume": "sum"})
|
||
.dropna(subset=["close"])
|
||
)
|
||
|
||
|
||
# ── 레짐 시리즈 ──────────────────────────────────────────
|
||
def build_regime_series(dfs: dict) -> pd.Series:
|
||
weighted = None
|
||
for ticker, w in REGIME_WEIGHTS.items():
|
||
if ticker not in dfs:
|
||
continue
|
||
pct = dfs[ticker]["close"].pct_change(REGIME_N) * 100
|
||
weighted = pct * w if weighted is None else weighted.add(pct * w, fill_value=0.0)
|
||
return weighted if weighted is not None else pd.Series(dtype=float)
|
||
|
||
|
||
# ── 임계값 ───────────────────────────────────────────────
|
||
def calc_entry_threshold(vol_ratio: float) -> float:
|
||
for min_r, thr in ENTRY_TIERS:
|
||
if vol_ratio >= min_r:
|
||
return thr
|
||
return THRESH_BASE
|
||
|
||
|
||
# ── ATR ──────────────────────────────────────────────────
|
||
def calc_atr(df: pd.DataFrame, buy_idx: int) -> float:
|
||
sub = df.iloc[max(0, buy_idx - ATR_N - 1):buy_idx]
|
||
if len(sub) < 3:
|
||
return ATR_MIN
|
||
try:
|
||
avg = ((sub["high"] - sub["low"]) / sub["low"]).iloc[-ATR_N:].mean()
|
||
return float(max(ATR_MIN, min(ATR_MAX, avg * ATR_MULT)))
|
||
except Exception:
|
||
return ATR_MIN
|
||
|
||
|
||
# ── 포지션 시뮬 (기존 sim_45m40.py와 동일) ───────────────
|
||
def simulate_pos(df: pd.DataFrame, buy_idx: int,
|
||
buy_price: float, stop_pct: float):
|
||
peak = buy_price
|
||
for i in range(buy_idx + 1, len(df)):
|
||
row = df.iloc[i]
|
||
ts = df.index[i]
|
||
if row["high"] > peak:
|
||
peak = row["high"]
|
||
if row["low"] <= peak * (1 - stop_pct):
|
||
sp = peak * (1 - stop_pct)
|
||
pnl = (sp * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
|
||
return pnl > 0, ts, pnl, "trailing_stop"
|
||
pnl_now = (row["close"] - buy_price) / buy_price * 100
|
||
if (i - buy_idx) >= TS_N and pnl_now < TIME_STOP_MIN_PCT:
|
||
pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
|
||
return pnl > 0, ts, pnl, "time_stop"
|
||
last = df.iloc[-1]["close"]
|
||
pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
|
||
return pnl > 0, df.index[-1], pnl, "end_of_data"
|
||
|
||
|
||
# ── vol-lead 전략 (현재 전략 파라미터 전체 반영) ──────────
|
||
def run_vol_lead(df: pd.DataFrame, ticker: str,
|
||
fng_map: dict, regime_series: pd.Series) -> list:
|
||
trades = []
|
||
sig_i = None # 신호 봉 인덱스
|
||
sig_p = None # 신호가
|
||
sig_vr = 0.0 # 신호 vol_ratio
|
||
in_pos = False
|
||
buy_idx = buy_price = stop_pct = None
|
||
i = max(LOCAL_VOL_N + 2, QUIET_N + 1)
|
||
|
||
while i < len(df):
|
||
ts = df.index[i]
|
||
row = df.iloc[i]
|
||
cur = row["close"]
|
||
|
||
# ── 포지션 보유 중: 청산 체크 ─────────────────────
|
||
if in_pos:
|
||
is_win, sdt, pnl, reason = simulate_pos(df, buy_idx, buy_price, stop_pct)
|
||
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
|
||
trades.append((is_win, pnl, df.index[buy_idx], sdt, ticker, reason))
|
||
in_pos = False
|
||
sig_i = sig_p = None
|
||
i = next_i
|
||
continue
|
||
|
||
# ── F&G 필터 ──────────────────────────────────────
|
||
date_str = ts.strftime("%Y-%m-%d")
|
||
if fng_map:
|
||
fv = fng_map.get(date_str, 50)
|
||
if fv < FNG_MIN_ENTRY:
|
||
sig_i = sig_p = None # 신호 초기화
|
||
i += 1
|
||
continue
|
||
|
||
# ── 레짐 필터 ─────────────────────────────────────
|
||
score = 0.0
|
||
if not regime_series.empty and ts in regime_series.index:
|
||
v = regime_series.loc[ts]
|
||
score = float(v) if not pd.isna(v) else 0.0
|
||
|
||
if score < BEAR_THRESHOLD:
|
||
sig_i = sig_p = None
|
||
i += 1
|
||
continue
|
||
|
||
vol_mult = VOL_MULT_BULL if score >= BULL_THRESHOLD else VOL_MULT_NEUTRAL
|
||
|
||
# ── 신호 타임아웃 ──────────────────────────────────
|
||
if sig_i is not None and (i - sig_i) > SIGNAL_TO_N:
|
||
sig_i = sig_p = None
|
||
|
||
# ── 신호 있음: 진입 체크 ──────────────────────────
|
||
if sig_i is not None:
|
||
move_pct = (cur - sig_p) / sig_p * 100
|
||
age_min = (i - sig_i) * 40 # 봉수 → 분
|
||
entry_thr = calc_entry_threshold(sig_vr)
|
||
|
||
if cur < sig_p:
|
||
# 신호가 이하 하락 → 초기화
|
||
sig_i = sig_p = None
|
||
elif move_pct >= entry_thr:
|
||
# 거리 기반 진입
|
||
in_pos = True
|
||
buy_idx = i
|
||
buy_price = cur
|
||
stop_pct = calc_atr(df, i)
|
||
sig_i = sig_p = None
|
||
elif age_min >= VELOCITY_MIN_AGE_M and move_pct >= VELOCITY_MIN_MOVE:
|
||
velocity = move_pct / age_min
|
||
if velocity >= VELOCITY_THRESHOLD:
|
||
# 속도 기반 조기 진입
|
||
in_pos = True
|
||
buy_idx = i
|
||
buy_price = cur
|
||
stop_pct = calc_atr(df, i)
|
||
sig_i = sig_p = None
|
||
i += 1
|
||
continue
|
||
|
||
# ── 신호 없음: 축적 조건 체크 ────────────────────
|
||
vol_p = df.iloc[i - 1]["volume"]
|
||
vol_avg = df.iloc[i - 1 - LOCAL_VOL_N:i - 1]["volume"].mean()
|
||
vol_r = vol_p / vol_avg if vol_avg > 0 else 0
|
||
|
||
close_qh = df.iloc[i - QUIET_N]["close"]
|
||
chg_qh = abs(cur - close_qh) / close_qh * 100 if close_qh > 0 else 999
|
||
quiet = chg_qh < QUIET_PCT
|
||
spike = vol_r >= vol_mult
|
||
|
||
if quiet and spike:
|
||
if sig_i is None:
|
||
sig_i = i
|
||
sig_p = cur
|
||
sig_vr = vol_r
|
||
else:
|
||
if sig_i is not None and cur < sig_p:
|
||
sig_i = sig_p = None
|
||
|
||
i += 1
|
||
return trades
|
||
|
||
|
||
# ── WF 필터 (기존 동일) ──────────────────────────────────
|
||
def apply_wf(trades: list) -> tuple:
|
||
history = []
|
||
shadow_streak = 0
|
||
blocked = False
|
||
accepted = []
|
||
blocked_cnt = 0
|
||
|
||
for trade in trades:
|
||
is_win = int(trade[0])
|
||
if not blocked:
|
||
accepted.append(trade)
|
||
history.append(is_win)
|
||
if len(history) >= WF_WINDOW and sum(history[-WF_WINDOW:]) / WF_WINDOW < WF_MIN_WIN_RATE:
|
||
blocked = True
|
||
shadow_streak = 0
|
||
else:
|
||
blocked_cnt += 1
|
||
if is_win:
|
||
shadow_streak += 1
|
||
if shadow_streak >= WF_SHADOW_WINS:
|
||
blocked = False
|
||
history = []
|
||
shadow_streak = 0
|
||
else:
|
||
shadow_streak = 0
|
||
return accepted, blocked_cnt
|
||
|
||
|
||
WF_WINDOW = 4
|
||
WF_MIN_WIN_RATE = 0.01
|
||
WF_SHADOW_WINS = 2
|
||
|
||
|
||
# ── MAX_POSITIONS (기존 동일) ────────────────────────────
|
||
def apply_max_positions(all_trades: list) -> tuple:
|
||
open_exits, accepted, skipped = [], [], []
|
||
for trade in all_trades:
|
||
buy_dt, sell_dt = trade[2], trade[3]
|
||
open_exits = [s for s in open_exits if s > buy_dt]
|
||
if len(open_exits) < MAX_POS:
|
||
open_exits.append(sell_dt)
|
||
accepted.append(trade)
|
||
else:
|
||
skipped.append(trade)
|
||
return accepted, skipped
|
||
|
||
|
||
# ── 복리 시뮬 (기존 동일) ───────────────────────────────
|
||
def simulate(accepted: list) -> dict:
|
||
portfolio = float(BUDGET)
|
||
total_krw = 0.0
|
||
monthly = {}
|
||
trade_log = []
|
||
reason_cnt = {}
|
||
|
||
for trade in accepted:
|
||
is_win, pnl, buy_dt, sell_dt, ticker, reason = trade
|
||
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
|
||
krw_profit = pos_size * pnl / 100
|
||
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
|
||
total_krw += krw_profit
|
||
reason_cnt[reason] = reason_cnt.get(reason, 0) + 1
|
||
|
||
ym = buy_dt.strftime("%Y-%m")
|
||
if ym not in monthly:
|
||
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
|
||
monthly[ym]["trades"] += 1
|
||
monthly[ym]["wins"] += int(is_win)
|
||
monthly[ym]["pnl_krw"] += krw_profit
|
||
|
||
trade_log.append({
|
||
"buy_dt": buy_dt, "sell_dt": sell_dt, "ticker": ticker,
|
||
"is_win": is_win, "pnl_pct": pnl, "reason": reason,
|
||
"pos_size": pos_size, "krw_profit": krw_profit,
|
||
"portfolio": portfolio,
|
||
})
|
||
|
||
wins = sum(1 for t in accepted if t[0])
|
||
return {
|
||
"portfolio": portfolio, "total_krw": total_krw,
|
||
"roi_pct": (portfolio - BUDGET) / BUDGET * 100,
|
||
"total": len(accepted), "wins": wins,
|
||
"wr": wins / len(accepted) * 100 if accepted else 0,
|
||
"monthly": monthly, "trade_log": trade_log,
|
||
"reason_cnt": reason_cnt,
|
||
}
|
||
|
||
|
||
# ── 메인 ─────────────────────────────────────────────────
|
||
def main():
|
||
print("=" * 62)
|
||
print("현재 전략 기준 시뮬 (F&G + 레짐 + 티어임계 + 속도진입)")
|
||
print("=" * 62)
|
||
|
||
print("F&G 히스토리 로드...")
|
||
fng_map = load_fng_history()
|
||
if fng_map:
|
||
vals = sorted(fng_map.items())
|
||
print(f" {vals[0][0]} ~ {vals[-1][0]} ({len(fng_map)}일)")
|
||
else:
|
||
print(" F&G 데이터 없음 — 필터 비활성화")
|
||
|
||
print("캐시 로드...")
|
||
cache = pickle.load(open(CACHE_FILE, "rb"))
|
||
tickers = [t for t in list(cache["10m"].keys())[:TOP_N]
|
||
if len(cache["10m"][t]) > 200]
|
||
print(f" 종목: {len(tickers)}개\n")
|
||
|
||
dfs_40m = {t: resample_40m(cache["10m"][t]) for t in tickers}
|
||
|
||
print("레짐 시리즈 계산...")
|
||
regime_series = build_regime_series(dfs_40m)
|
||
|
||
sample_df = next(iter(dfs_40m.values()))
|
||
start_date = sample_df.index[0].strftime("%Y-%m-%d")
|
||
end_date = sample_df.index[-1].strftime("%Y-%m-%d")
|
||
print(f" 기간: {start_date} ~ {end_date}\n")
|
||
|
||
# F&G 차단 일수
|
||
if fng_map:
|
||
period_dates = [d for d in fng_map if start_date <= d <= end_date]
|
||
fng_blocked = sum(1 for d in period_dates if fng_map.get(d, 50) < FNG_MIN_ENTRY)
|
||
fng_allowed = len(period_dates) - fng_blocked
|
||
else:
|
||
fng_blocked = fng_allowed = 0
|
||
|
||
all_trades = []
|
||
wf_blocked = 0
|
||
for ticker in tickers:
|
||
df40 = dfs_40m[ticker]
|
||
raw = run_vol_lead(df40, ticker, fng_map, regime_series)
|
||
filtered, blocked = apply_wf(raw)
|
||
wf_blocked += blocked
|
||
all_trades.extend(filtered)
|
||
|
||
all_trades.sort(key=lambda x: x[2]) # buy_dt 기준 정렬
|
||
accepted, skipped = apply_max_positions(all_trades)
|
||
result = simulate(accepted)
|
||
|
||
# 최대 낙폭
|
||
peak = BUDGET
|
||
max_dd = 0.0
|
||
for t in result["trade_log"]:
|
||
peak = max(peak, t["portfolio"])
|
||
dd = (peak - t["portfolio"]) / peak * 100
|
||
max_dd = max(max_dd, dd)
|
||
|
||
total = result["total"]
|
||
wins = result["wins"]
|
||
|
||
print(f"{'='*62}")
|
||
print(f" 기간: {start_date} ~ {end_date} ({len(tickers)}종목 / 40분봉)")
|
||
print(f" F&G 차단: {fng_blocked}일 / 허용: {fng_allowed}일 (기준 FNG≥{FNG_MIN_ENTRY})")
|
||
print(f"{'='*62}")
|
||
print(f" 신호 발생: {len(all_trades)+wf_blocked:>4}건 (WF 차단: {wf_blocked}건)")
|
||
print(f" 실제 진입: {total:>4}건 ({len(skipped)}건 MAX_POS 스킵)")
|
||
print(f" 승 / 패: {wins}승 {total-wins}패 (승률 {result['wr']:.1f}%)"
|
||
if total else " 진입 없음")
|
||
print(f" {'─'*52}")
|
||
print(f" 초기 예산: {BUDGET:>15,}원")
|
||
print(f" 최종 자산: {result['portfolio']:>15,.0f}원")
|
||
print(f" 순수익: {result['total_krw']:>+15,.0f}원")
|
||
print(f" 수익률: {result['roi_pct']:>+14.2f}%")
|
||
print(f" 최대 낙폭: {-max_dd:>+14.2f}%"
|
||
f" ({-max_dd/100*BUDGET:>+,.0f}원)")
|
||
monthly_krw = [m["pnl_krw"] for m in result["monthly"].values()]
|
||
avg_m = sum(monthly_krw) / len(monthly_krw) if monthly_krw else 0
|
||
print(f" 월평균 수익: {avg_m:>+13,.0f}원")
|
||
|
||
print(f"\n── 청산 사유 {'─'*44}")
|
||
label_map = {"trailing_stop": "트레일링스탑", "time_stop": "타임스탑",
|
||
"end_of_data": "데이터종료"}
|
||
for r, cnt in sorted(result["reason_cnt"].items(), key=lambda x: -x[1]):
|
||
print(f" {label_map.get(r, r):12}: {cnt:>3}건")
|
||
|
||
print(f"\n── 월별 수익 {'─'*44}")
|
||
print(f" {'월':^8} │ {'거래':>4} {'승률':>5} │"
|
||
f" {'월수익(KRW)':>13} {'누적수익(KRW)':>14}")
|
||
cum = 0.0
|
||
for ym, m in sorted(result["monthly"].items()):
|
||
wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0
|
||
cum += m["pnl_krw"]
|
||
print(f" {ym:^8} │ {m['trades']:>4}건 {wr:>4.0f}% │"
|
||
f" {m['pnl_krw']:>+13,.0f}원 {cum:>+14,.0f}원")
|
||
|
||
print(f"\n── 파라미터 {'─'*46}")
|
||
print(f" F&G≥{FNG_MIN_ENTRY} 레짐BEAR<{BEAR_THRESHOLD}% BULL≥{BULL_THRESHOLD}%")
|
||
print(f" VOL: {VOL_MULT_NEUTRAL}x(중립)/{VOL_MULT_BULL}x(강세) 횡보<{QUIET_PCT}%")
|
||
print(f" 임계: 5x→1% / 3.5x→2% / 2.5x→3% / 기본→{THRESH_BASE}%")
|
||
print(f" 속도: ≥{VELOCITY_THRESHOLD}%/분 (≥{VELOCITY_MIN_MOVE}% / ≥{VELOCITY_MIN_AGE_M}분)")
|
||
print(f" ATR: ×{ATR_MULT} ({ATR_MIN*100:.0f}~{ATR_MAX*100:.0f}%) 타임스탑: 8h/{TIME_STOP_MIN_PCT}%")
|
||
print(f"{'='*62}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|