chore: add WF/shadow/momentum analysis simulation scripts

Scripts used to analyze and validate strategy changes:
- wf_cmp.py: WF window size comparison on 42 real trades
- wf_cmp2.py: WF comparison extended with price_history simulation
- shadow_sim.py: shadow rehabilitation sim without strategy filters
- shadow_sim2.py: post-rehabilitation performance simulation
- shadow_sim3.py: shadow rehabilitation sim with full strategy filters
- momentum_cmp.py: momentum filter A/B comparison
- trend_check.py: 2h price gain distribution analysis per ticker

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-03-01 23:58:42 +09:00
parent 29d48f0fe9
commit 54ce327c50
7 changed files with 1210 additions and 0 deletions

186
momentum_cmp.py Normal file
View File

@@ -0,0 +1,186 @@
"""모멘텀 필터 유/무 비교 시뮬레이션.
A안: 추세(2h +5%) + 15분 워치리스트 (모멘텀 없음)
B안: 추세(2h +5%) + 모멘텀 + 15분 워치리스트 (현행)
"""
import os, time
from dotenv import load_dotenv
load_dotenv()
import oracledb
import pyupbit
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
TREND_MIN_PCT = 5.0
MA_PERIOD = 20
LOCAL_VOL_HOURS = 5
VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
CONFIRM_MINUTES = 15
FEE = 0.0005
_daily_cache = {}
_hourly_cache = {}
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
def load_prices(cur, ticker, from_dt):
cur.execute("""SELECT price, recorded_at FROM price_history
WHERE ticker=:t AND recorded_at>=:f ORDER BY recorded_at""", t=ticker, f=from_dt)
return cur.fetchall()
def get_ma20(ticker, dt):
key = (ticker, dt.strftime("%Y-%m-%d"))
if key not in _daily_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD+2,
to=dt.strftime("%Y-%m-%d 09:00:00"))
_daily_cache[key] = df
time.sleep(0.1)
except:
_daily_cache[key] = None
df = _daily_cache[key]
if df is None or len(df) < MA_PERIOD:
return None
return df["close"].iloc[-MA_PERIOD:].mean()
def get_vol_ratio(ticker, dt):
key = (ticker, dt.strftime("%Y-%m-%d %H"))
if key not in _hourly_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS+3,
to=dt.strftime("%Y-%m-%d %H:%M:%S"))
_hourly_cache[key] = df
time.sleep(0.1)
except:
_hourly_cache[key] = None
df = _hourly_cache[key]
if df is None or len(df) < LOCAL_VOL_HOURS+1:
return 0.0
rv = df["volume"].iloc[-2]
la = df["volume"].iloc[-(LOCAL_VOL_HOURS+1):-2].mean()
return rv/la if la > 0 else 0.0
def check_trend(prices, idx):
lb = 12 # 2h = 12 * 10min
if idx < lb: return False
curr, past = prices[idx][0], prices[idx-lb][0]
return past > 0 and (curr-past)/past*100 >= TREND_MIN_PCT
def check_momentum(ticker, price, dt):
ma = get_ma20(ticker, dt)
if ma is None or price <= ma: return False
return get_vol_ratio(ticker, dt) >= VOL_MULT
def simulate_pos(prices, buy_idx, buy_price):
buy_dt = prices[buy_idx][1]
peak = buy_price
for price, ts in prices[buy_idx+1:]:
if price > peak: peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
if (peak - price) / peak >= STOP_LOSS_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, price, ts, f"타임스탑", net
lp, lt = prices[-1]
net = (lp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, lp, lt, "데이터종료", net
def run_scenario(prices, ticker, use_momentum, label):
wins = losses = 0
total_pnl = 0.0
watchlist_dt = None
in_pos = False
buy_idx = buy_price = None
idx = 0
trades = []
while idx < len(prices):
price, dt = prices[idx]
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(prices, buy_idx, buy_price)
next_idx = next((i for i,(_, ts) in enumerate(prices) if ts > sdt), len(prices))
if is_win: wins += 1
else: losses += 1
total_pnl += pnl
trades.append((is_win, buy_price, sp, pnl, dt, sdt, reason))
in_pos = False
watchlist_dt = None
idx = next_idx
continue
trend_ok = check_trend(prices, idx)
mom_ok = check_momentum(ticker, price, dt) if (use_momentum and trend_ok) else True
if trend_ok and mom_ok:
if watchlist_dt is None:
watchlist_dt = dt
elif (dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
in_pos = True
buy_idx = idx
buy_price = price
watchlist_dt = None
else:
watchlist_dt = None
idx += 1
total = wins + losses
wr = wins/total*100 if total else 0
return {'label': label, 'total': total, 'wins': wins, 'losses': losses,
'wr': wr, 'pnl': total_pnl, 'trades': trades}
def print_result(r):
print(f"\n [{r['label']}]")
print(f"{r['total']}건 | 승률={r['wr']:.0f}% ({r['wins']}{r['losses']}패) | 누적={r['pnl']:+.2f}%")
for i, (iw, bp, sp, pnl, bdt, sdt, reason) in enumerate(r['trades'], 1):
mark = "" if iw else ""
print(f" #{i}: {bp:.4f}{sp:.4f}원 | {mark} {pnl:+.2f}% | {reason}"
f" ({bdt.strftime('%m-%d %H:%M')}{sdt.strftime('%m-%d %H:%M')})")
def main():
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT MAX(recorded_at) FROM price_history")
end_dt = cur.fetchone()[0]
print("=" * 62)
print("모멘텀 필터 유/무 비교 (WF차단 발동 이후 전 기간)")
print("A안: 추세+워치리스트만 B안: 추세+모멘텀+워치리스트(현행)")
print("=" * 62)
summary = []
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("SELECT traded_at FROM trade_results WHERE ticker=:t ORDER BY traded_at", t=ticker)
rows = cur.fetchall()
wf_dt = rows[4][0]
prices = load_prices(cur, ticker, wf_dt)
print(f"\n{''*62}")
print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')} 데이터: {len(prices)}")
rA = run_scenario(prices, ticker, use_momentum=False, label="A: 추세+워치만")
rB = run_scenario(prices, ticker, use_momentum=True, label="B: 추세+모멘텀+워치(현행)")
print_result(rA)
print_result(rB)
summary.append((ticker, rA, rB))
print(f"\n{'='*62}")
print(f"{'종목':<12} {'A안 거래':>6} {'A안 승률':>8} {'A안 PnL':>10}{'B안 거래':>6} {'B안 승률':>8} {'B안 PnL':>10}")
print(f"{''*62}")
for ticker, rA, rB in summary:
print(f"{ticker:<12} {rA['total']:>6}{rA['wr']:>6.0f}% {rA['pnl']:>+9.2f}% │"
f" {rB['total']:>6}{rB['wr']:>6.0f}% {rB['pnl']:>+9.2f}%")
conn.close()
if __name__ == "__main__":
main()

156
shadow_sim.py Normal file
View File

@@ -0,0 +1,156 @@
"""Shadow Trading 재활 시뮬레이션.
WF차단 종목들에 대해 shadow 포지션을 시뮬레이션하여
몇 번의 shadow 승리 후 WF차단이 해제될 수 있었는지 분석.
"""
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
import oracledb
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
FEE = 0.0005
REHABILITATE_WINS = 2 # shadow N승 → WF 해제
def get_price_series(cur, ticker, from_dt):
"""WF차단 이후 가격 시계열 조회."""
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at >= :dt
ORDER BY recorded_at
""", t=ticker, dt=from_dt)
return cur.fetchall()
def simulate_shadow(prices, buy_price, buy_dt):
"""
단일 shadow 포지션 시뮬레이션.
Returns: (is_win, sell_price, sell_dt, reason, pnl_pct)
"""
peak = buy_price
for price, ts in prices:
# 고점 갱신
if price > peak:
peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
# 트레일링 스탑 (최고가 대비 -STOP_LOSS_PCT)
drop_from_peak = (peak - price) / peak
if drop_from_peak >= STOP_LOSS_PCT:
sell_pnl = (price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return (sell_pnl > 0, price, ts, f"트레일링스탑(peak={peak:.4f})", sell_pnl)
# 타임 스탑
if elapsed_h >= TIME_STOP_HOURS:
if pnl < TIME_STOP_MIN_PCT:
sell_pnl = (price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return (sell_pnl > 0, price, ts, f"타임스탑({elapsed_h:.1f}h,{pnl*100:+.1f}%)", sell_pnl)
# 데이터 끝까지 보유 중
last_price, last_ts = prices[-1]
sell_pnl = (last_price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return (sell_pnl > 0, last_price, last_ts, "데이터종료(보유중)", sell_pnl)
def run():
conn = oracledb.connect(
user=os.getenv('ORACLE_USER'),
password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'),
config_dir=os.getenv('ORACLE_WALLET')
)
cur = conn.cursor()
# WF차단 종목별 5번째 패배 시점 조회
blocked_tickers = {}
for ticker in ['KRW-DKA', 'KRW-SAHARA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT traded_at, pnl_pct, sell_price FROM trade_results
WHERE ticker = :t ORDER BY traded_at
""", t=ticker)
rows = cur.fetchall()
if len(rows) >= 5:
wf_trigger_dt = rows[4][0] # 5번째 거래 완료 시점
last_sell_price = rows[4][2]
blocked_tickers[ticker] = {
'wf_trigger_dt': wf_trigger_dt,
'last_sell_price': last_sell_price,
'trades': rows
}
print("=" * 60)
print("Shadow Trading 재활 시뮬레이션")
print("=" * 60)
for ticker, info in blocked_tickers.items():
wf_dt = info['wf_trigger_dt']
print(f"\n{''*60}")
print(f"[{ticker}] WF차단 발동: {wf_dt.strftime('%m-%d %H:%M')}")
print(f" 직전 5건: {[f'{r[1]:+.2f}%' for r in info['trades']]}")
# WF차단 이후 가격 시계열
prices = get_price_series(cur, ticker, wf_dt)
if not prices:
print(" → 이후 가격 데이터 없음")
continue
print(f" 이후 가격 데이터: {len(prices)}개 ({prices[0][1].strftime('%m-%d %H:%M')} ~ {prices[-1][1].strftime('%m-%d %H:%M')})")
print()
# Shadow 포지션 시뮬레이션
shadow_wins = 0
pos_idx = 0
cursor_idx = 0
rehabilitated = False
while cursor_idx < len(prices) and not rehabilitated:
buy_price, buy_dt = prices[cursor_idx]
remaining = prices[cursor_idx + 1:]
if not remaining:
break
pos_idx += 1
is_win, sell_price, sell_dt, reason, pnl = simulate_shadow(remaining, buy_price, buy_dt)
win_mark = "✅ WIN" if is_win else "❌ LOSS"
if is_win:
shadow_wins += 1
else:
shadow_wins = 0 # 패배 시 카운터 리셋
print(f" Shadow #{pos_idx}: 진입={buy_price:.4f}원 ({buy_dt.strftime('%m-%d %H:%M')})")
print(f" 청산={sell_price:.4f}원 ({sell_dt.strftime('%m-%d %H:%M')}) | {reason}")
print(f" {win_mark} {pnl:+.2f}% | 연속승={shadow_wins}/{REHABILITATE_WINS}")
if shadow_wins >= REHABILITATE_WINS:
print(f"\n 🎉 {REHABILITATE_WINS}연승 달성 → WF 차단 해제! ({sell_dt.strftime('%m-%d %H:%M')})")
rehabilitated = True
break
# 다음 진입: 청산 시점 이후 첫 번째 가격
next_idx = next(
(i for i, (_, ts) in enumerate(prices) if ts > sell_dt),
None
)
if next_idx is None:
break
cursor_idx = next_idx
if not rehabilitated:
print(f"\n ⛔ 데이터 범위 내 재활 실패 (shadow_wins={shadow_wins})")
conn.close()
if __name__ == "__main__":
run()

155
shadow_sim2.py Normal file
View File

@@ -0,0 +1,155 @@
"""Shadow 재활 이후 실제 진입 성과 시뮬레이션."""
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
import oracledb
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
FEE = 0.0005
REHABILITATE_WINS = 2
def get_price_series(cur, ticker, from_dt):
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at >= :dt
ORDER BY recorded_at
""", t=ticker, dt=from_dt)
return cur.fetchall()
def simulate_one(prices, buy_price, buy_dt):
"""단일 포지션 시뮬레이션. Returns (is_win, sell_price, sell_dt, reason, pnl_pct)"""
peak = buy_price
for price, ts in prices:
if price > peak:
peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
drop_from_peak = (peak - price) / peak
if drop_from_peak >= STOP_LOSS_PCT:
sell_pnl = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (sell_pnl > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", sell_pnl)
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
sell_pnl = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (sell_pnl > 0, price, ts, f"타임스탑({elapsed_h:.1f}h)", sell_pnl)
last_price, last_ts = prices[-1]
sell_pnl = (last_price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (sell_pnl > 0, last_price, last_ts, "데이터종료(보유중)", sell_pnl)
def run_shadow_then_real(cur, ticker, wf_trigger_dt):
"""shadow로 재활 후, 재활 시점 이후 실제 거래 성과 시뮬레이션."""
prices = get_price_series(cur, ticker, wf_trigger_dt)
if not prices:
return None
# 1단계: shadow로 재활 시점 찾기
shadow_wins = 0
cursor_idx = 0
rehab_dt = None
while cursor_idx < len(prices):
buy_price, buy_dt = prices[cursor_idx]
remaining = prices[cursor_idx + 1:]
if not remaining:
break
is_win, sell_price, sell_dt, reason, pnl = simulate_one(remaining, buy_price, buy_dt)
if is_win:
shadow_wins += 1
else:
shadow_wins = 0
if shadow_wins >= REHABILITATE_WINS:
rehab_dt = sell_dt
break
next_idx = next((i for i, (_, ts) in enumerate(prices) if ts > sell_dt), None)
if next_idx is None:
break
cursor_idx = next_idx
if rehab_dt is None:
return None # 재활 실패
# 2단계: 재활 이후 실제 거래 시뮬레이션
print(f"\n ★ WF 해제 시점: {rehab_dt.strftime('%m-%d %H:%M')}")
print(f" ─ 이후 실제 진입 시뮬레이션 ─")
post_prices = get_price_series(cur, ticker, rehab_dt)
if not post_prices:
print(" → 재활 이후 가격 데이터 없음")
return
cursor_idx = 0
trade_no = 0
wins = 0
total_pnl = 0.0
while cursor_idx < len(post_prices):
buy_price, buy_dt = post_prices[cursor_idx]
remaining = post_prices[cursor_idx + 1:]
if not remaining:
break
is_win, sell_price, sell_dt, reason, pnl = simulate_one(remaining, buy_price, buy_dt)
trade_no += 1
if is_win:
wins += 1
total_pnl += pnl
mark = "" if is_win else ""
print(f" 실제#{trade_no}: {buy_price:.4f}{sell_price:.4f}원 | {mark} {pnl:+.2f}% | {reason} ({sell_dt.strftime('%m-%d %H:%M')})")
next_idx = next((i for i, (_, ts) in enumerate(post_prices) if ts > sell_dt), None)
if next_idx is None:
break
cursor_idx = next_idx
if trade_no > 0:
wr = wins / trade_no * 100
print(f"\n 📊 재활 후 성과: {trade_no}건 중 {wins}승 | 승률={wr:.0f}% | 누적PnL={total_pnl:+.2f}%")
return {'trades': trade_no, 'wins': wins, 'win_rate': wr, 'total_pnl': total_pnl}
return None
def run():
conn = oracledb.connect(
user=os.getenv('ORACLE_USER'),
password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'),
config_dir=os.getenv('ORACLE_WALLET')
)
cur = conn.cursor()
results = {}
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT traded_at FROM trade_results
WHERE ticker = :t ORDER BY traded_at
""", t=ticker)
rows = cur.fetchall()
wf_trigger_dt = rows[4][0]
print(f"\n{'='*60}")
print(f"[{ticker}] WF차단 발동: {wf_trigger_dt.strftime('%m-%d %H:%M')}")
r = run_shadow_then_real(cur, ticker, wf_trigger_dt)
if r:
results[ticker] = r
print(f"\n{'='*60}")
print("전체 요약")
print(f"{'='*60}")
for ticker, r in results.items():
print(f"{ticker}: 재활 후 {r['trades']}건 | 승률={r['win_rate']:.0f}% | 누적={r['total_pnl']:+.2f}%")
conn.close()
if __name__ == "__main__":
run()

320
shadow_sim3.py Normal file
View File

@@ -0,0 +1,320 @@
"""Shadow 재활 시뮬레이션 v3 - 실제 전략 필터 포함.
전략 조건:
1. 추세: 현재가 vs 2h 전 가격 >= TREND_MIN_GAIN_PCT%
2. 모멘텀: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 * VOL_MULT
3. 15분 워치리스트: 첫 신호 후 15분 재확인
"""
import os, time
from datetime import datetime, timedelta
from collections import defaultdict
from dotenv import load_dotenv
load_dotenv()
import oracledb
import pyupbit
# ── 파라미터 ──────────────────────────────────────────
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
TREND_HOURS = 2
TREND_MIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # Neutral 기준
MA_PERIOD = 20
LOCAL_VOL_HOURS = 5
VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
CONFIRM_MINUTES = 15
FEE = 0.0005
REHABILITATE_WINS = 2
# ── DB 연결 ───────────────────────────────────────────
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'),
password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'),
config_dir=os.getenv('ORACLE_WALLET')
)
def load_price_history(cur, ticker, from_dt, to_dt):
"""price_history 전체 로드."""
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at BETWEEN :f AND :e
ORDER BY recorded_at
""", t=ticker, f=from_dt, e=to_dt)
return cur.fetchall() # [(price, dt), ...]
# ── pyupbit 과거 데이터 캐시 ───────────────────────────
_daily_cache = {} # (ticker, date_str) → df
_hourly_cache = {} # (ticker, hour_str) → df
def get_ma20(ticker, as_of_dt):
"""as_of_dt 기준 MA20 (일봉 종가)."""
date_str = as_of_dt.strftime("%Y-%m-%d")
key = (ticker, date_str)
if key not in _daily_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD + 2,
to=as_of_dt.strftime("%Y-%m-%d 09:00:00"))
_daily_cache[key] = df
time.sleep(0.1)
except Exception as e:
_daily_cache[key] = None
df = _daily_cache[key]
if df is None or len(df) < MA_PERIOD:
return None
return df["close"].iloc[-MA_PERIOD:].mean()
def get_volume_ratio(ticker, as_of_dt):
"""최근 1h 거래량 / 로컬 5h 평균. (직전 완성봉 기준)"""
# 시간봉은 해당 시각의 이전 7h 데이터 필요
hour_str = as_of_dt.strftime("%Y-%m-%d %H:00:00")
key = (ticker, hour_str)
if key not in _hourly_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS + 3,
to=as_of_dt.strftime("%Y-%m-%d %H:%M:%S"))
_hourly_cache[key] = df
time.sleep(0.1)
except Exception as e:
_hourly_cache[key] = None
df = _hourly_cache[key]
if df is None or len(df) < LOCAL_VOL_HOURS + 1:
return 0.0
recent_vol = df["volume"].iloc[-2]
local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean()
if local_avg <= 0:
return 0.0
return recent_vol / local_avg
# ── 전략 조건 체크 ────────────────────────────────────
def check_trend(prices, idx):
"""현재 idx 기준 2h 전(12틱 전) 대비 +TREND_MIN_PCT% 이상."""
lookback = TREND_HOURS * 6 # 10분봉 기준 2h = 12틱
if idx < lookback:
return False
current = prices[idx][0]
past = prices[idx - lookback][0]
if past <= 0:
return False
gain = (current - past) / past * 100
return gain >= TREND_MIN_PCT
def check_momentum(ticker, current_price, as_of_dt):
"""현재가 > MA20 AND 거래량 비율 >= VOL_MULT."""
ma20 = get_ma20(ticker, as_of_dt)
if ma20 is None:
return False
if current_price <= ma20:
return False
vol_ratio = get_volume_ratio(ticker, as_of_dt)
return vol_ratio >= VOL_MULT
# ── 단일 포지션 시뮬레이션 ────────────────────────────
def simulate_position(prices, buy_idx, buy_price):
"""buy_idx 이후 가격으로 포지션 시뮬레이션."""
buy_dt = prices[buy_idx][1]
peak = buy_price
for price, ts in prices[buy_idx + 1:]:
if price > peak:
peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
drop_from_peak = (peak - price) / peak
if drop_from_peak >= STOP_LOSS_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net)
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (net > 0, price, ts, f"타임스탑({elapsed_h:.1f}h)", net)
last_price, last_ts = prices[-1]
net = (last_price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (net > 0, last_price, last_ts, "데이터종료", net)
# ── Shadow → 재활 → 실제 진입 시뮬레이션 ─────────────
def run_full_sim(cur, ticker, wf_trigger_dt, end_dt):
"""
1. WF차단 시점부터 shadow 포지션 (전략 필터 적용)
2. REHABILITATE_WINS 연승 → WF 해제
3. 해제 이후 실제 진입 성과
"""
prices = load_price_history(cur, ticker, wf_trigger_dt, end_dt)
if not prices:
print(f" 가격 데이터 없음")
return
print(f" 가격 데이터: {len(prices)}개 ({prices[0][1].strftime('%m-%d %H:%M')} ~ {prices[-1][1].strftime('%m-%d %H:%M')})")
# ── Phase 1: Shadow (WF 해제까지) ────────────────
shadow_wins = 0
rehab_dt = None
watchlist_dt = None # 신호 첫 발생 시각
in_position = False
pos_buy_idx = None
pos_buy_price = None
shadow_trade_no = 0
idx = 0
while idx < len(prices) and rehab_dt is None:
current_price, current_dt = prices[idx]
if in_position:
# 포지션 청산 체크
is_win, sell_price, sell_dt, reason, pnl = simulate_position(prices, pos_buy_idx, pos_buy_price)
# 청산 시각에 해당하는 idx로 점프
sell_idx = next((i for i, (_, ts) in enumerate(prices) if ts >= sell_dt), len(prices)-1)
shadow_trade_no += 1
if is_win:
shadow_wins += 1
else:
shadow_wins = 0
mark = "" if is_win else ""
print(f" [Shadow#{shadow_trade_no}] {pos_buy_price:.4f}{sell_price:.4f}"
f"| {mark} {pnl:+.2f}% | {reason} | 연속승={shadow_wins}/{REHABILITATE_WINS}"
f" ({sell_dt.strftime('%m-%d %H:%M')})")
if shadow_wins >= REHABILITATE_WINS:
rehab_dt = sell_dt
idx = sell_idx
break
in_position = False
watchlist_dt = None
idx = sell_idx
continue
# 전략 조건 체크
trend_ok = check_trend(prices, idx)
if trend_ok:
mom_ok = check_momentum(ticker, current_price, current_dt)
else:
mom_ok = False
if trend_ok and mom_ok:
if watchlist_dt is None:
watchlist_dt = current_dt # 첫 신호
elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
# 15분 재확인 → shadow 진입
in_position = True
pos_buy_idx = idx
pos_buy_price = current_price
watchlist_dt = None
# 청산은 다음 루프에서
else:
watchlist_dt = None # 조건 깨지면 초기화
idx += 1
if rehab_dt is None:
print(f"\n ⛔ 전략 필터 적용 시 데이터 범위 내 재활 실패 (shadow_wins={shadow_wins})")
return
print(f"\n 🎉 WF 해제: {rehab_dt.strftime('%m-%d %H:%M')} ({shadow_trade_no}번 shadow 거래)")
# ── Phase 2: 실제 진입 (재활 이후) ───────────────
print(f"\n ── 재활 후 실제 진입 ──")
post_prices = [(p, dt) for p, dt in prices if dt >= rehab_dt]
if not post_prices:
print(" 재활 이후 데이터 없음")
return
real_wins = 0
real_total = 0
real_pnl_sum = 0.0
watchlist_dt = None
in_position = False
pos_buy_idx_g = None # global idx in post_prices
pos_buy_price = None
idx2 = 0
while idx2 < len(post_prices):
current_price, current_dt = post_prices[idx2]
if in_position:
is_win, sell_price, sell_dt, reason, pnl = simulate_position(post_prices, pos_buy_idx_g, pos_buy_price)
sell_idx2 = next((i for i, (_, ts) in enumerate(post_prices) if ts >= sell_dt), len(post_prices)-1)
real_total += 1
if is_win:
real_wins += 1
real_pnl_sum += pnl
mark = "" if is_win else ""
print(f" 실제#{real_total}: {pos_buy_price:.4f}{sell_price:.4f}"
f"| {mark} {pnl:+.2f}% | {reason} ({sell_dt.strftime('%m-%d %H:%M')})")
in_position = False
watchlist_dt = None
idx2 = sell_idx2
continue
trend_ok = check_trend(post_prices, idx2)
if trend_ok:
mom_ok = check_momentum(ticker, current_price, current_dt)
else:
mom_ok = False
if trend_ok and mom_ok:
if watchlist_dt is None:
watchlist_dt = current_dt
elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
in_position = True
pos_buy_idx_g = idx2
pos_buy_price = current_price
watchlist_dt = None
else:
watchlist_dt = None
idx2 += 1
if real_total == 0:
print(" 재활 후 전략 조건 충족 진입 없음")
else:
wr = real_wins / real_total * 100
print(f"\n 📊 재활 후: {real_total}건 | 승률={wr:.0f}% | 누적={real_pnl_sum:+.2f}%")
return {'trades': real_total, 'wins': real_wins, 'wr': wr, 'pnl': real_pnl_sum}
def main():
conn = get_conn()
cur = conn.cursor()
# price_history 최대 시각
cur.execute("SELECT MAX(recorded_at) FROM price_history")
end_dt = cur.fetchone()[0]
summary = {}
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT traded_at FROM trade_results
WHERE ticker = :t ORDER BY traded_at
""", t=ticker)
rows = cur.fetchall()
wf_dt = rows[4][0]
print(f"\n{'='*62}")
print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')}")
print(f"{'='*62}")
r = run_full_sim(cur, ticker, wf_dt, end_dt)
if r:
summary[ticker] = r
print(f"\n{'='*62}")
print("전체 요약 (전략 필터 적용)")
print(f"{'='*62}")
for ticker, r in summary.items():
print(f"{ticker}: {r['trades']}건 | 승률={r['wr']:.0f}% | 누적={r['pnl']:+.2f}%")
conn.close()
if __name__ == "__main__":
main()

42
trend_check.py Normal file
View File

@@ -0,0 +1,42 @@
import os
from dotenv import load_dotenv
load_dotenv()
import oracledb
conn = oracledb.connect(user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
cur = conn.cursor()
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at >= TIMESTAMP '2026-02-28 20:00:00'
ORDER BY recorded_at
""", t=ticker)
rows = cur.fetchall()
lookback = 12 # 10분봉 * 12 = 2h
gains = []
for i in range(lookback, len(rows)):
curr = rows[i][0]
past = rows[i - lookback][0]
if past > 0:
gains.append((curr - past) / past * 100)
if not gains:
continue
above_5 = sum(1 for g in gains if g >= 5.0)
above_3 = sum(1 for g in gains if g >= 3.0)
above_0 = sum(1 for g in gains if g >= 0.0)
negative = sum(1 for g in gains if g < 0.0)
print(f"[{ticker}] 2h 등락률 분포 ({len(gains)}개 틱)")
print(f" 평균={sum(gains)/len(gains):+.2f}% 최고={max(gains):+.2f}% 최저={min(gains):+.2f}%")
print(f" +5% 이상(신호): {above_5}건 ({above_5/len(gains)*100:.0f}%)")
print(f" +3%~+5%: {above_3-above_5}건 ({(above_3-above_5)/len(gains)*100:.0f}%)")
print(f" 0%~+3%: {above_0-above_3}건 ({(above_0-above_3)/len(gains)*100:.0f}%)")
print(f" 음전(하락): {negative}건 ({negative/len(gains)*100:.0f}%)")
print()
conn.close()

140
wf_cmp.py Normal file
View File

@@ -0,0 +1,140 @@
"""WF 윈도우 크기별 비교 시뮬레이션.
실제 42건 거래를 시간순으로 재생하며
WF_WINDOW 크기(2, 3, 5)에 따라 차단/허용 여부를 시뮬레이션.
차단된 거래 → P&L 0 (진입 안 함)
허용된 거래 → 실제 P&L 반영
"""
import os
from dotenv import load_dotenv
load_dotenv()
import oracledb
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
def simulate_wf(trades, window, min_wr):
"""
trades: [(ticker, is_win, pnl_pct, krw_profit, traded_at), ...] 시간순
window: WF 윈도우 크기
min_wr: 최소 승률 임계값
Returns: 허용된 거래 목록, 차단된 거래 목록, 요약
"""
history = {} # ticker → [bool, ...]
allowed = []
blocked = []
for t in trades:
ticker, is_win, pnl, profit, dt = t
hist = history.get(ticker, [])
# WF 차단 여부 판단
is_blocked = False
if len(hist) >= window:
recent_wr = sum(hist[-window:]) / window
if recent_wr < min_wr:
is_blocked = True
if is_blocked:
blocked.append(t)
else:
allowed.append(t)
# 실제 결과를 이력에 추가
hist = hist + [bool(is_win)]
if len(hist) > window * 2:
hist = hist[-window:]
history[ticker] = hist
total = len(allowed)
wins = sum(1 for t in allowed if t[1])
pnl = sum(t[2] for t in allowed)
profit = sum(t[3] for t in allowed)
return allowed, blocked, {
'total': total, 'wins': wins,
'wr': wins/total*100 if total else 0,
'pnl': pnl, 'profit': profit,
'blocked_count': len(blocked),
}
def main():
conn = get_conn()
cur = conn.cursor()
# 전체 거래 시간순 로드
cur.execute("""
SELECT ticker, is_win, pnl_pct, NVL(krw_profit,0), traded_at
FROM trade_results
ORDER BY traded_at
""")
trades = cur.fetchall()
print(f"전체 거래: {len(trades)}\n")
configs = [
(2, 0.5, "WF=2 (2연패→차단, 1승→해제)"),
(3, 0.34, "WF=3 (3건중 1승 이상 필요)"),
(5, 0.40, "WF=5 (5건중 2승 이상, 현행)"),
]
results = []
for window, min_wr, label in configs:
allowed, blocked, stats = simulate_wf(trades, window, min_wr)
stats['label'] = label
stats['window'] = window
results.append((label, allowed, blocked, stats))
print(f"[{label}]")
print(f" 허용: {stats['total']}건 | 승률={stats['wr']:.1f}% | "
f"누적수익={stats['profit']:+,.0f}원 | 차단={stats['blocked_count']}")
# 차단된 거래 상세
if blocked:
print(f" 차단된 거래:")
blocked_by_ticker = {}
for t in blocked:
blocked_by_ticker.setdefault(t[0], []).append(t)
for ticker, ts in blocked_by_ticker.items():
pnls = [f"{t[2]:+.1f}%" for t in ts]
print(f" {ticker}: {len(ts)}{pnls}")
print()
# 상세 비교표: 거래별 허용/차단 여부
print("=" * 70)
print(f"{'날짜':>12} {'종목':>12} {'결과':>6} {'PnL':>8}"
f"{'WF=2':>6} {'WF=3':>6} {'WF=5':>6}")
print("" * 70)
# 각 설정별 허용 set 구성 (traded_at + ticker로 식별)
allowed_sets = []
for _, allowed, _, _ in results:
allowed_sets.append(set((t[0], t[4]) for t in allowed))
for t in trades:
ticker, is_win, pnl, profit, dt = t
win_mark = "" if is_win else ""
cols = []
for aset in allowed_sets:
if (ticker, dt) in aset:
cols.append("허용")
else:
cols.append("🔴차단")
print(f"{dt.strftime('%m-%d %H:%M'):>12} {ticker:>12} {win_mark:>4} {pnl:>+7.1f}% │ "
f"{cols[0]:>6} {cols[1]:>6} {cols[2]:>6}")
# 최종 요약
print("\n" + "=" * 70)
print(f"{'설정':<35} {'거래':>5} {'승률':>7} {'KRW수익':>12} {'차단':>5}")
print("" * 70)
for label, _, _, s in results:
print(f"{label:<35} {s['total']:>5}{s['wr']:>6.1f}% {s['profit']:>+12,.0f}{s['blocked_count']:>4}건 차단")
conn.close()
if __name__ == "__main__":
main()

211
wf_cmp2.py Normal file
View File

@@ -0,0 +1,211 @@
"""WF 윈도우 비교 시뮬레이션 v2 - 실거래 + 이후 시뮬 거래 통합.
Phase 1: 실제 42건 거래를 WF 설정별로 허용/차단 재생
Phase 2: 마지막 실거래 이후 price_history 기반 신호로 추가 거래 시뮬
(추세 2h+5% + 15분 워치리스트, 모멘텀은 API 한계로 생략)
→ WF 상태는 Phase1에서 이어짐
비교 설정:
A: WF=2 (min_wr=0.0, 즉 2연패시만 차단 — last2=[L,L]이면 차단)
B: WF=3 (min_wr=0.34)
C: WF=5 현행 (min_wr=0.40)
D: WF 없음
"""
import os, time
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
import oracledb
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
TREND_MIN_PCT = 5.0
CONFIRM_MINUTES = 15
FEE = 0.0005
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
# ── WF 판단 ───────────────────────────────────────────
def is_wf_blocked(hist, window, min_wr):
if window == 0: return False
if len(hist) < window: return False
wr = sum(hist[-window:]) / window
return wr < min_wr
# ── 추세 체크 (price_history 기반) ────────────────────
def check_trend(prices, idx):
lb = 12 # 2h = 12 * 10분봉
if idx < lb: return False
curr, past = prices[idx][0], prices[idx-lb][0]
return past > 0 and (curr-past)/past*100 >= TREND_MIN_PCT
# ── 포지션 시뮬 ───────────────────────────────────────
def simulate_pos(prices, buy_idx, buy_price):
buy_dt = prices[buy_idx][1]
peak = buy_price
for price, ts in prices[buy_idx+1:]:
if price > peak: peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
if (peak - price) / peak >= STOP_LOSS_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net>0, price, ts, f"트레일링({pnl*100:+.1f}%)", net
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net>0, price, ts, "타임스탑", net
lp, lt = prices[-1]
net = (lp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net>0, lp, lt, "데이터종료", net
# ── Phase1: 실거래 재생 ───────────────────────────────
def phase1(real_trades, window, min_wr):
"""42건 실거래 재생. Returns (허용목록, 차단목록, history_per_ticker)"""
history = {}
allowed = []
blocked = []
for t in real_trades:
ticker, is_win, pnl, profit, dt = t
hist = history.get(ticker, [])
if is_wf_blocked(hist, window, min_wr):
blocked.append(('block', ticker, is_win, pnl, profit, dt))
else:
allowed.append(('real', ticker, is_win, pnl, profit, dt))
hist = hist + [bool(is_win)]
if window > 0 and len(hist) > window * 2:
hist = hist[-window:]
history[ticker] = hist
return allowed, blocked, history
# ── Phase2: price_history 신호 시뮬 ──────────────────
def phase2(cur, history, real_last_dt, window, min_wr):
"""실거래 종료 이후 price_history 기반 신호 시뮬레이션."""
# 스캔 대상: 실거래에 등장한 종목 전체
tickers = list(history.keys()) if history else []
# 실거래 후 WF 해제 가능한 종목만
# (차단됐어도 shadow 없이는 해제 불가 → 차단 상태 종목 제외)
active_tickers = []
for ticker in tickers:
hist = history.get(ticker, [])
if not is_wf_blocked(hist, window, min_wr):
active_tickers.append(ticker)
if not active_tickers:
return [], history
sim_trades = []
for ticker in active_tickers:
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker=:t AND recorded_at > :dt
ORDER BY recorded_at
""", t=ticker, dt=real_last_dt)
prices = cur.fetchall()
if len(prices) < 13: continue
hist = list(history.get(ticker, []))
watchlist_dt = None
in_pos = False
buy_idx = buy_price = None
idx = 0
while idx < len(prices):
price, dt = prices[idx]
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(prices, buy_idx, buy_price)
next_idx = next((i for i,(_, ts) in enumerate(prices) if ts > sdt), len(prices))
profit = pnl * 3333333 / 100 # 포지션당 예산 기준 근사
sim_trades.append(('sim', ticker, is_win, pnl, profit, dt))
hist = hist + [bool(is_win)]
if window > 0 and len(hist) > window * 2:
hist = hist[-window:]
history[ticker] = hist
in_pos = False
watchlist_dt = None
idx = next_idx
continue
if is_wf_blocked(hist, window, min_wr):
idx += 1
continue
trend_ok = check_trend(prices, idx)
if trend_ok:
if watchlist_dt is None:
watchlist_dt = dt
elif (dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
in_pos = True
buy_idx = idx
buy_price = price
watchlist_dt = None
else:
watchlist_dt = None
idx += 1
return sim_trades, history
# ── 요약 출력 ─────────────────────────────────────────
def print_summary(label, p1_allowed, p1_blocked, p2_trades):
all_trades = p1_allowed + p2_trades
total = len(all_trades)
wins = sum(1 for t in all_trades if t[2])
pnl = sum(t[4] for t in all_trades)
wr = wins/total*100 if total else 0
blk = len(p1_blocked)
p2_cnt = len(p2_trades)
p2_win = sum(1 for t in p2_trades if t[2])
print(f"\n[{label}]")
print(f" 실거래 허용: {len(p1_allowed)}건 | 차단: {blk}")
print(f" 추가 시뮬: {p2_cnt}건 ({p2_win}승)")
print(f" ─────────────────────────────────────")
print(f" 합계: {total}건 | 승률={wr:.1f}% | KRW={pnl:+,.0f}")
return {'label': label, 'total': total, 'wins': wins, 'wr': wr, 'pnl': pnl,
'blk': blk, 'p2': p2_cnt}
def main():
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT ticker, is_win, pnl_pct, NVL(krw_profit,0), traded_at
FROM trade_results ORDER BY traded_at
""")
real_trades = cur.fetchall()
real_last_dt = real_trades[-1][4]
print(f"실거래: {len(real_trades)}건 (마지막: {real_last_dt.strftime('%m-%d %H:%M')})")
cur.execute("SELECT MAX(recorded_at) FROM price_history")
ph_last = cur.fetchone()[0]
print(f"price_history 끝: {ph_last.strftime('%m-%d %H:%M')}\n")
configs = [
(2, 0.01, "WF=2 (2연패→차단)"),
(3, 0.34, "WF=3"),
(5, 0.40, "WF=5 현행"),
(0, 0.00, "WF없음"),
]
summary = []
for window, min_wr, label in configs:
p1_allowed, p1_blocked, history = phase1(real_trades, window, min_wr)
p2_trades, _ = phase2(cur, history, real_last_dt, window, min_wr)
s = print_summary(label, p1_allowed, p1_blocked, p2_trades)
summary.append(s)
print(f"\n{'='*62}")
print(f"{'설정':<22} {'허용':>5} {'차단':>5} {'추가시뮬':>8} {'승률':>7} {'KRW수익':>13}")
print(f"{''*62}")
for s in summary:
print(f"{s['label']:<22} {s['total']-s['p2']:>5}{s['blk']:>5}"
f"{s['p2']:>6}{s['wr']:>6.1f}% {s['pnl']:>+12,.0f}")
conn.close()
if __name__ == "__main__":
main()