- tick_trader.py를 Controller로 축소, 로직을 3개 모듈로 분리: - core/signal.py: 시그널 감지, 지표 계산 (calc_vr, calc_atr, detect_signal) - core/order.py: Upbit 주문 실행 (매수/매도/취소/조회) - core/position_manager.py: 포지션 관리, DB sync, 복구, 청산 조건 - type hints, Google docstring, 구체적 예외 타입 적용 - 50줄 초과 함수 분리 (process_signal, restore_positions) - 미사용 파일 58개 archive/ 폴더로 이동 - README.md 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
324 lines
12 KiB
Python
324 lines
12 KiB
Python
"""Shadow 재활 시뮬레이션 v3 - 실제 전략 필터 포함.
|
|
|
|
전략 조건:
|
|
1. 추세: 현재가 vs 2h 전 가격 >= TREND_MIN_GAIN_PCT%
|
|
2. 모멘텀: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 * VOL_MULT
|
|
3. 15분 워치리스트: 첫 신호 후 15분 재확인
|
|
"""
|
|
|
|
import os as _os, sys as _sys
|
|
_sys.path.insert(0, _os.path.dirname(_os.path.dirname(_os.path.abspath(__file__))))
|
|
|
|
import os, time
|
|
from datetime import datetime, timedelta
|
|
from collections import defaultdict
|
|
from dotenv import load_dotenv
|
|
load_dotenv()
|
|
import oracledb
|
|
import pyupbit
|
|
|
|
# ── 파라미터 ──────────────────────────────────────────
|
|
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
|
|
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
|
|
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
|
|
TREND_HOURS = 2
|
|
TREND_MIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # Neutral 기준
|
|
MA_PERIOD = 20
|
|
LOCAL_VOL_HOURS = 5
|
|
VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
|
|
CONFIRM_MINUTES = 15
|
|
FEE = 0.0005
|
|
REHABILITATE_WINS = 2
|
|
|
|
|
|
# ── DB 연결 ───────────────────────────────────────────
|
|
def get_conn():
|
|
return oracledb.connect(
|
|
user=os.getenv('ORACLE_USER'),
|
|
password=os.getenv('ORACLE_PASSWORD'),
|
|
dsn=os.getenv('ORACLE_DSN'),
|
|
config_dir=os.getenv('ORACLE_WALLET')
|
|
)
|
|
|
|
|
|
def load_price_history(cur, ticker, from_dt, to_dt):
|
|
"""price_history 전체 로드."""
|
|
cur.execute("""
|
|
SELECT price, recorded_at FROM price_history
|
|
WHERE ticker = :t AND recorded_at BETWEEN :f AND :e
|
|
ORDER BY recorded_at
|
|
""", t=ticker, f=from_dt, e=to_dt)
|
|
return cur.fetchall() # [(price, dt), ...]
|
|
|
|
|
|
# ── pyupbit 과거 데이터 캐시 ───────────────────────────
|
|
_daily_cache = {} # (ticker, date_str) → df
|
|
_hourly_cache = {} # (ticker, hour_str) → df
|
|
|
|
def get_ma20(ticker, as_of_dt):
|
|
"""as_of_dt 기준 MA20 (일봉 종가)."""
|
|
date_str = as_of_dt.strftime("%Y-%m-%d")
|
|
key = (ticker, date_str)
|
|
if key not in _daily_cache:
|
|
try:
|
|
df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD + 2,
|
|
to=as_of_dt.strftime("%Y-%m-%d 09:00:00"))
|
|
_daily_cache[key] = df
|
|
time.sleep(0.1)
|
|
except Exception as e:
|
|
_daily_cache[key] = None
|
|
df = _daily_cache[key]
|
|
if df is None or len(df) < MA_PERIOD:
|
|
return None
|
|
return df["close"].iloc[-MA_PERIOD:].mean()
|
|
|
|
|
|
def get_volume_ratio(ticker, as_of_dt):
|
|
"""최근 1h 거래량 / 로컬 5h 평균. (직전 완성봉 기준)"""
|
|
# 시간봉은 해당 시각의 이전 7h 데이터 필요
|
|
hour_str = as_of_dt.strftime("%Y-%m-%d %H:00:00")
|
|
key = (ticker, hour_str)
|
|
if key not in _hourly_cache:
|
|
try:
|
|
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS + 3,
|
|
to=as_of_dt.strftime("%Y-%m-%d %H:%M:%S"))
|
|
_hourly_cache[key] = df
|
|
time.sleep(0.1)
|
|
except Exception as e:
|
|
_hourly_cache[key] = None
|
|
df = _hourly_cache[key]
|
|
if df is None or len(df) < LOCAL_VOL_HOURS + 1:
|
|
return 0.0
|
|
recent_vol = df["volume"].iloc[-2]
|
|
local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean()
|
|
if local_avg <= 0:
|
|
return 0.0
|
|
return recent_vol / local_avg
|
|
|
|
|
|
# ── 전략 조건 체크 ────────────────────────────────────
|
|
def check_trend(prices, idx):
|
|
"""현재 idx 기준 2h 전(12틱 전) 대비 +TREND_MIN_PCT% 이상."""
|
|
lookback = TREND_HOURS * 6 # 10분봉 기준 2h = 12틱
|
|
if idx < lookback:
|
|
return False
|
|
current = prices[idx][0]
|
|
past = prices[idx - lookback][0]
|
|
if past <= 0:
|
|
return False
|
|
gain = (current - past) / past * 100
|
|
return gain >= TREND_MIN_PCT
|
|
|
|
|
|
def check_momentum(ticker, current_price, as_of_dt):
|
|
"""현재가 > MA20 AND 거래량 비율 >= VOL_MULT."""
|
|
ma20 = get_ma20(ticker, as_of_dt)
|
|
if ma20 is None:
|
|
return False
|
|
if current_price <= ma20:
|
|
return False
|
|
vol_ratio = get_volume_ratio(ticker, as_of_dt)
|
|
return vol_ratio >= VOL_MULT
|
|
|
|
|
|
# ── 단일 포지션 시뮬레이션 ────────────────────────────
|
|
def simulate_position(prices, buy_idx, buy_price):
|
|
"""buy_idx 이후 가격으로 포지션 시뮬레이션."""
|
|
buy_dt = prices[buy_idx][1]
|
|
peak = buy_price
|
|
|
|
for price, ts in prices[buy_idx + 1:]:
|
|
if price > peak:
|
|
peak = price
|
|
elapsed_h = (ts - buy_dt).total_seconds() / 3600
|
|
pnl = (price - buy_price) / buy_price
|
|
|
|
drop_from_peak = (peak - price) / peak
|
|
if drop_from_peak >= STOP_LOSS_PCT:
|
|
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
|
|
return (net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net)
|
|
|
|
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
|
|
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
|
|
return (net > 0, price, ts, f"타임스탑({elapsed_h:.1f}h)", net)
|
|
|
|
last_price, last_ts = prices[-1]
|
|
net = (last_price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
|
|
return (net > 0, last_price, last_ts, "데이터종료", net)
|
|
|
|
|
|
# ── Shadow → 재활 → 실제 진입 시뮬레이션 ─────────────
|
|
def run_full_sim(cur, ticker, wf_trigger_dt, end_dt):
|
|
"""
|
|
1. WF차단 시점부터 shadow 포지션 (전략 필터 적용)
|
|
2. REHABILITATE_WINS 연승 → WF 해제
|
|
3. 해제 이후 실제 진입 성과
|
|
"""
|
|
prices = load_price_history(cur, ticker, wf_trigger_dt, end_dt)
|
|
if not prices:
|
|
print(f" 가격 데이터 없음")
|
|
return
|
|
|
|
print(f" 가격 데이터: {len(prices)}개 ({prices[0][1].strftime('%m-%d %H:%M')} ~ {prices[-1][1].strftime('%m-%d %H:%M')})")
|
|
|
|
# ── Phase 1: Shadow (WF 해제까지) ────────────────
|
|
shadow_wins = 0
|
|
rehab_dt = None
|
|
watchlist_dt = None # 신호 첫 발생 시각
|
|
in_position = False
|
|
pos_buy_idx = None
|
|
pos_buy_price = None
|
|
shadow_trade_no = 0
|
|
idx = 0
|
|
|
|
while idx < len(prices) and rehab_dt is None:
|
|
current_price, current_dt = prices[idx]
|
|
|
|
if in_position:
|
|
# 포지션 청산 체크
|
|
is_win, sell_price, sell_dt, reason, pnl = simulate_position(prices, pos_buy_idx, pos_buy_price)
|
|
# 청산 시각에 해당하는 idx로 점프
|
|
sell_idx = next((i for i, (_, ts) in enumerate(prices) if ts >= sell_dt), len(prices)-1)
|
|
shadow_trade_no += 1
|
|
if is_win:
|
|
shadow_wins += 1
|
|
else:
|
|
shadow_wins = 0
|
|
mark = "✅" if is_win else "❌"
|
|
print(f" [Shadow#{shadow_trade_no}] {pos_buy_price:.4f}→{sell_price:.4f}원 "
|
|
f"| {mark} {pnl:+.2f}% | {reason} | 연속승={shadow_wins}/{REHABILITATE_WINS}"
|
|
f" ({sell_dt.strftime('%m-%d %H:%M')})")
|
|
if shadow_wins >= REHABILITATE_WINS:
|
|
rehab_dt = sell_dt
|
|
idx = sell_idx
|
|
break
|
|
in_position = False
|
|
watchlist_dt = None
|
|
idx = sell_idx
|
|
continue
|
|
|
|
# 전략 조건 체크
|
|
trend_ok = check_trend(prices, idx)
|
|
if trend_ok:
|
|
mom_ok = check_momentum(ticker, current_price, current_dt)
|
|
else:
|
|
mom_ok = False
|
|
|
|
if trend_ok and mom_ok:
|
|
if watchlist_dt is None:
|
|
watchlist_dt = current_dt # 첫 신호
|
|
elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
|
|
# 15분 재확인 → shadow 진입
|
|
in_position = True
|
|
pos_buy_idx = idx
|
|
pos_buy_price = current_price
|
|
watchlist_dt = None
|
|
# 청산은 다음 루프에서
|
|
else:
|
|
watchlist_dt = None # 조건 깨지면 초기화
|
|
|
|
idx += 1
|
|
|
|
if rehab_dt is None:
|
|
print(f"\n ⛔ 전략 필터 적용 시 데이터 범위 내 재활 실패 (shadow_wins={shadow_wins})")
|
|
return
|
|
|
|
print(f"\n 🎉 WF 해제: {rehab_dt.strftime('%m-%d %H:%M')} ({shadow_trade_no}번 shadow 거래)")
|
|
|
|
# ── Phase 2: 실제 진입 (재활 이후) ───────────────
|
|
print(f"\n ── 재활 후 실제 진입 ──")
|
|
post_prices = [(p, dt) for p, dt in prices if dt >= rehab_dt]
|
|
if not post_prices:
|
|
print(" 재활 이후 데이터 없음")
|
|
return
|
|
|
|
real_wins = 0
|
|
real_total = 0
|
|
real_pnl_sum = 0.0
|
|
watchlist_dt = None
|
|
in_position = False
|
|
pos_buy_idx_g = None # global idx in post_prices
|
|
pos_buy_price = None
|
|
idx2 = 0
|
|
|
|
while idx2 < len(post_prices):
|
|
current_price, current_dt = post_prices[idx2]
|
|
|
|
if in_position:
|
|
is_win, sell_price, sell_dt, reason, pnl = simulate_position(post_prices, pos_buy_idx_g, pos_buy_price)
|
|
sell_idx2 = next((i for i, (_, ts) in enumerate(post_prices) if ts >= sell_dt), len(post_prices)-1)
|
|
real_total += 1
|
|
if is_win:
|
|
real_wins += 1
|
|
real_pnl_sum += pnl
|
|
mark = "✅" if is_win else "❌"
|
|
print(f" 실제#{real_total}: {pos_buy_price:.4f}→{sell_price:.4f}원 "
|
|
f"| {mark} {pnl:+.2f}% | {reason} ({sell_dt.strftime('%m-%d %H:%M')})")
|
|
in_position = False
|
|
watchlist_dt = None
|
|
idx2 = sell_idx2
|
|
continue
|
|
|
|
trend_ok = check_trend(post_prices, idx2)
|
|
if trend_ok:
|
|
mom_ok = check_momentum(ticker, current_price, current_dt)
|
|
else:
|
|
mom_ok = False
|
|
|
|
if trend_ok and mom_ok:
|
|
if watchlist_dt is None:
|
|
watchlist_dt = current_dt
|
|
elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
|
|
in_position = True
|
|
pos_buy_idx_g = idx2
|
|
pos_buy_price = current_price
|
|
watchlist_dt = None
|
|
else:
|
|
watchlist_dt = None
|
|
|
|
idx2 += 1
|
|
|
|
if real_total == 0:
|
|
print(" 재활 후 전략 조건 충족 진입 없음")
|
|
else:
|
|
wr = real_wins / real_total * 100
|
|
print(f"\n 📊 재활 후: {real_total}건 | 승률={wr:.0f}% | 누적={real_pnl_sum:+.2f}%")
|
|
return {'trades': real_total, 'wins': real_wins, 'wr': wr, 'pnl': real_pnl_sum}
|
|
|
|
|
|
def main():
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
|
|
# price_history 최대 시각
|
|
cur.execute("SELECT MAX(recorded_at) FROM price_history")
|
|
end_dt = cur.fetchone()[0]
|
|
|
|
summary = {}
|
|
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
|
|
cur.execute("""
|
|
SELECT traded_at FROM trade_results
|
|
WHERE ticker = :t ORDER BY traded_at
|
|
""", t=ticker)
|
|
rows = cur.fetchall()
|
|
wf_dt = rows[4][0]
|
|
|
|
print(f"\n{'='*62}")
|
|
print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')}")
|
|
print(f"{'='*62}")
|
|
r = run_full_sim(cur, ticker, wf_dt, end_dt)
|
|
if r:
|
|
summary[ticker] = r
|
|
|
|
print(f"\n{'='*62}")
|
|
print("전체 요약 (전략 필터 적용)")
|
|
print(f"{'='*62}")
|
|
for ticker, r in summary.items():
|
|
print(f"{ticker}: {r['trades']}건 | 승률={r['wr']:.0f}% | 누적={r['pnl']:+.2f}%")
|
|
|
|
conn.close()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|