Files
upbit-trader/momentum_cmp.py
joungmin 54ce327c50 chore: add WF/shadow/momentum analysis simulation scripts
Scripts used to analyze and validate strategy changes:
- wf_cmp.py: WF window size comparison on 42 real trades
- wf_cmp2.py: WF comparison extended with price_history simulation
- shadow_sim.py: shadow rehabilitation sim without strategy filters
- shadow_sim2.py: post-rehabilitation performance simulation
- shadow_sim3.py: shadow rehabilitation sim with full strategy filters
- momentum_cmp.py: momentum filter A/B comparison
- trend_check.py: 2h price gain distribution analysis per ticker

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 23:58:42 +09:00

187 lines
6.8 KiB
Python

"""모멘텀 필터 유/무 비교 시뮬레이션.
A안: 추세(2h +5%) + 15분 워치리스트 (모멘텀 없음)
B안: 추세(2h +5%) + 모멘텀 + 15분 워치리스트 (현행)
"""
import os, time
from dotenv import load_dotenv
load_dotenv()
import oracledb
import pyupbit
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
TREND_MIN_PCT = 5.0
MA_PERIOD = 20
LOCAL_VOL_HOURS = 5
VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
CONFIRM_MINUTES = 15
FEE = 0.0005
_daily_cache = {}
_hourly_cache = {}
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
def load_prices(cur, ticker, from_dt):
cur.execute("""SELECT price, recorded_at FROM price_history
WHERE ticker=:t AND recorded_at>=:f ORDER BY recorded_at""", t=ticker, f=from_dt)
return cur.fetchall()
def get_ma20(ticker, dt):
key = (ticker, dt.strftime("%Y-%m-%d"))
if key not in _daily_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD+2,
to=dt.strftime("%Y-%m-%d 09:00:00"))
_daily_cache[key] = df
time.sleep(0.1)
except:
_daily_cache[key] = None
df = _daily_cache[key]
if df is None or len(df) < MA_PERIOD:
return None
return df["close"].iloc[-MA_PERIOD:].mean()
def get_vol_ratio(ticker, dt):
key = (ticker, dt.strftime("%Y-%m-%d %H"))
if key not in _hourly_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS+3,
to=dt.strftime("%Y-%m-%d %H:%M:%S"))
_hourly_cache[key] = df
time.sleep(0.1)
except:
_hourly_cache[key] = None
df = _hourly_cache[key]
if df is None or len(df) < LOCAL_VOL_HOURS+1:
return 0.0
rv = df["volume"].iloc[-2]
la = df["volume"].iloc[-(LOCAL_VOL_HOURS+1):-2].mean()
return rv/la if la > 0 else 0.0
def check_trend(prices, idx):
lb = 12 # 2h = 12 * 10min
if idx < lb: return False
curr, past = prices[idx][0], prices[idx-lb][0]
return past > 0 and (curr-past)/past*100 >= TREND_MIN_PCT
def check_momentum(ticker, price, dt):
ma = get_ma20(ticker, dt)
if ma is None or price <= ma: return False
return get_vol_ratio(ticker, dt) >= VOL_MULT
def simulate_pos(prices, buy_idx, buy_price):
buy_dt = prices[buy_idx][1]
peak = buy_price
for price, ts in prices[buy_idx+1:]:
if price > peak: peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
if (peak - price) / peak >= STOP_LOSS_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, price, ts, f"타임스탑", net
lp, lt = prices[-1]
net = (lp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, lp, lt, "데이터종료", net
def run_scenario(prices, ticker, use_momentum, label):
wins = losses = 0
total_pnl = 0.0
watchlist_dt = None
in_pos = False
buy_idx = buy_price = None
idx = 0
trades = []
while idx < len(prices):
price, dt = prices[idx]
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(prices, buy_idx, buy_price)
next_idx = next((i for i,(_, ts) in enumerate(prices) if ts > sdt), len(prices))
if is_win: wins += 1
else: losses += 1
total_pnl += pnl
trades.append((is_win, buy_price, sp, pnl, dt, sdt, reason))
in_pos = False
watchlist_dt = None
idx = next_idx
continue
trend_ok = check_trend(prices, idx)
mom_ok = check_momentum(ticker, price, dt) if (use_momentum and trend_ok) else True
if trend_ok and mom_ok:
if watchlist_dt is None:
watchlist_dt = dt
elif (dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
in_pos = True
buy_idx = idx
buy_price = price
watchlist_dt = None
else:
watchlist_dt = None
idx += 1
total = wins + losses
wr = wins/total*100 if total else 0
return {'label': label, 'total': total, 'wins': wins, 'losses': losses,
'wr': wr, 'pnl': total_pnl, 'trades': trades}
def print_result(r):
print(f"\n [{r['label']}]")
print(f"{r['total']}건 | 승률={r['wr']:.0f}% ({r['wins']}{r['losses']}패) | 누적={r['pnl']:+.2f}%")
for i, (iw, bp, sp, pnl, bdt, sdt, reason) in enumerate(r['trades'], 1):
mark = "" if iw else ""
print(f" #{i}: {bp:.4f}{sp:.4f}원 | {mark} {pnl:+.2f}% | {reason}"
f" ({bdt.strftime('%m-%d %H:%M')}{sdt.strftime('%m-%d %H:%M')})")
def main():
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT MAX(recorded_at) FROM price_history")
end_dt = cur.fetchone()[0]
print("=" * 62)
print("모멘텀 필터 유/무 비교 (WF차단 발동 이후 전 기간)")
print("A안: 추세+워치리스트만 B안: 추세+모멘텀+워치리스트(현행)")
print("=" * 62)
summary = []
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("SELECT traded_at FROM trade_results WHERE ticker=:t ORDER BY traded_at", t=ticker)
rows = cur.fetchall()
wf_dt = rows[4][0]
prices = load_prices(cur, ticker, wf_dt)
print(f"\n{''*62}")
print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')} 데이터: {len(prices)}")
rA = run_scenario(prices, ticker, use_momentum=False, label="A: 추세+워치만")
rB = run_scenario(prices, ticker, use_momentum=True, label="B: 추세+모멘텀+워치(현행)")
print_result(rA)
print_result(rB)
summary.append((ticker, rA, rB))
print(f"\n{'='*62}")
print(f"{'종목':<12} {'A안 거래':>6} {'A안 승률':>8} {'A안 PnL':>10}{'B안 거래':>6} {'B안 승률':>8} {'B안 PnL':>10}")
print(f"{''*62}")
for ticker, rA, rB in summary:
print(f"{ticker:<12} {rA['total']:>6}{rA['wr']:>6.0f}% {rA['pnl']:>+9.2f}% │"
f" {rB['total']:>6}{rB['wr']:>6.0f}% {rB['pnl']:>+9.2f}%")
conn.close()
if __name__ == "__main__":
main()