Compare commits

...

4 Commits

Author SHA1 Message Date
joungmin
324d69dde0 feat: volume-lead strategy with compounding, WF filter, and DB-backed simulation
- core/strategy.py: replace trend strategy with volume-lead accumulation
  (vol spike + 2h quiet → signal, +4.8% rise → entry)
- core/trader.py: compound budget adjusts on both profit and loss (floor 30%)
- core/notify.py: add accumulation signal telegram notification
- ohlcv_db.py: Oracle ADB OHLCV cache (insert, load, incremental update)
- sim_365.py: 365-day compounding simulation loading from DB
- krw_sim.py: KRW-based simulation with MAX_POSITIONS constraint
- ticker_sim.py: ticker count expansion comparison
- STRATEGY.md: full strategy documentation
- .gitignore: exclude *.pkl cache files

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-02 01:46:03 +09:00
joungmin
7c7fb08693 feat: replace trend strategy with volume-lead accumulation strategy
- strategy.py: rewrite should_buy() with volume-lead logic
  - detect accumulation: vol spike + 2h quiet price → record signal_price
  - entry: price rises ≥ TREND_AFTER_VOL% from signal_price
  - signal reset: timeout (8h) or price drops below signal_price
- .env: add PRICE_QUIET_PCT=2.0, TREND_AFTER_VOL=4.8, SIGNAL_TIMEOUT_H=8.0
- vol_lead_sim.py: add parameter sweep 0.5~5.0% + fine sweep 4.0~5.0%

Backtest result (9 tickers, 2026-01-15~): +4.8% threshold
  26 trades | 69% win rate | +73.38% cumulative (vs A 33 trades 45% +24.25%)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-02 00:22:20 +09:00
joungmin
54ce327c50 chore: add WF/shadow/momentum analysis simulation scripts
Scripts used to analyze and validate strategy changes:
- wf_cmp.py: WF window size comparison on 42 real trades
- wf_cmp2.py: WF comparison extended with price_history simulation
- shadow_sim.py: shadow rehabilitation sim without strategy filters
- shadow_sim2.py: post-rehabilitation performance simulation
- shadow_sim3.py: shadow rehabilitation sim with full strategy filters
- momentum_cmp.py: momentum filter A/B comparison
- trend_check.py: 2h price gain distribution analysis per ticker

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 23:58:42 +09:00
joungmin
29d48f0fe9 feat: add shadow trading rehabilitation for WF-blocked tickers
When WF filter blocks a ticker, automatically start a virtual (shadow)
position with the same trailing/time stop logic. After WF_SHADOW_WINS
consecutive shadow wins, reset WF history to re-enable real trading.

- trader.py: add _shadow_positions, _shadow_cons_wins state;
  _shadow_enter(), get_shadow_positions(), update_shadow_peak(),
  close_shadow() functions; trigger shadow entry on WF block in buy()
- monitor.py: add _check_shadow_position() with ATR trailing + time stop;
  check shadow positions every 10s in run_monitor()
- Env: WF_SHADOW_WINS=2 (2 consecutive wins to rehabilitate)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-01 23:57:45 +09:00
18 changed files with 2867 additions and 183 deletions

1
.gitignore vendored
View File

@@ -3,3 +3,4 @@ __pycache__/
*.pyc *.pyc
.venv/ .venv/
*.log *.log
*.pkl

158
STRATEGY.md Normal file
View File

@@ -0,0 +1,158 @@
# Volume Lead 전략 가이드
## 전략 개요
**거래량 선행(Volume Lead) 매집 전략** — 가격이 횡보하는 중 거래량 급증이 발생하면
매집 신호로 기록하고, 이후 일정 수준 이상 상승 시 진입하는 선진입 전략.
> 핵심 아이디어: 대형 매수자는 가격을 올리지 않고 조용히 매집한다.
> 거래량이 먼저 급증하고, 가격 상승은 그 뒤에 따라온다.
---
## 진입 조건 (2단계)
### 1단계: 매집 신호 감지
다음 두 조건 동시 충족 시 `signal_price` 기록:
| 조건 | 파라미터 | 기본값 |
|------|----------|--------|
| 2h 가격 변동 < N% (횡보) | `PRICE_QUIET_PCT` | 2.0% |
| 직전 1h 거래량 ≥ 로컬 5h 평균 × M배 | `VOLUME_MULTIPLIER` | 2.0x |
- 신호 발생 시 텔레그램 알림 발송
- `SIGNAL_TIMEOUT_H` 내 진입 조건 미달 시 신호 초기화 (기본: 8h)
- 신호가 이하 하락 시 즉시 초기화 (매집 실패 판단)
### 2단계: 추세 확인 후 진입
`signal_price` 대비 +`TREND_AFTER_VOL`% 이상 상승 확인 시 매수:
| 파라미터 | 기본값 | 설명 |
|----------|--------|------|
| `TREND_AFTER_VOL` | 4.8% | 신호가 대비 진입 임계값 |
---
## 청산 조건
### 트레일링 스탑 (ATR 기반)
- ATR 5봉 × 1.5 계수 → 동적 손절폭 산출
- 최소 1.0% / 최대 4.0% 범위 내 자동 조정
- 최고가 대비 손절폭 이하 하락 시 즉시 청산
### 타임 스탑
- 보유 `TIME_STOP_HOURS`h 경과 후 수익률 < `TIME_STOP_MIN_GAIN_PCT`% 이면 청산
- 기본값: 8시간 경과 / 수익률 3% 미만
---
## 리스크 관리
### Walk-Forward (WF) 필터
| 파라미터 | 기본값 | 설명 |
|----------|--------|------|
| `WF_WINDOW` | 2 | 이력 윈도우 (직전 N건) |
| `WF_MIN_WIN_RATE` | 0.01 | 최소 승률 임계값 (1%) |
| `WF_SHADOW_WINS` | 2 | 차단 해제 조건 (가상 N연승) |
- 직전 2건 모두 손실 → 해당 종목 진입 차단
- 차단 후 가상 추적으로 2연승 달성 시 자동 복귀
### 예산 관리 (복리)
- 수익 발생 시: `운용예산 = 초기예산 + 누적수익` (복리 증가)
- 손실 발생 시: `운용예산 = 초기예산 + 누적수익` (차감)
- 하한선: 초기예산의 30% (기본: 4,500,000원)
- 포지션당 크기: `운용예산 / MAX_POSITIONS`
---
## 시장 레짐 적응
| 레짐 | BTC 1h 변동 | 거래량 기준 |
|------|------------|------------|
| BULL | +5% 이상 | 1.5x |
| NEUTRAL | ±5% 이내 | 2.0x |
| BEAR | -5% 이하 | 진입 차단 |
- BEAR 레짐 감지 시 신규 진입 전면 차단
- 레짐별 `vol_mult` 조정으로 민감도 제어
---
## 운용 설정 (.env)
```env
# 핵심 전략
PRICE_QUIET_PCT=2.0 # 2h 횡보 기준 (%)
TREND_AFTER_VOL=4.8 # 진입 임계값 (신호가 대비 %)
SIGNAL_TIMEOUT_H=8.0 # 신호 유효 시간 (h)
VOLUME_MULTIPLIER=2.0 # 거래량 배수 기준
# 청산
STOP_LOSS_PCT=1.5 # ATR 트레일링 기본값 (동적 조정됨)
TIME_STOP_HOURS=8 # 타임스탑 보유 시간
TIME_STOP_MIN_GAIN_PCT=3 # 타임스탑 최소 수익률
# 포트폴리오
MAX_BUDGET=15000000 # 초기 운용 예산
MAX_POSITIONS=3 # 최대 동시 보유 종목
# WF 필터
WF_WINDOW=2
WF_MIN_WIN_RATE=0.01
WF_SHADOW_WINS=2
```
---
## 백테스트 결과 요약
### 365일 (2025-03-02 ~ 2026-03-02) — WF 적용
| 항목 | 값 |
|------|-----|
| 초기 예산 | 15,000,000원 |
| 최종 자산 | 29,996,109원 |
| 수익률 | **+100%** |
| 최대 낙폭 | -3.81% (-57만원) |
| 거래수 | 190건 (WF 183건 차단) |
| 승률 | 46% |
| 월평균 수익 | 약 115만원 |
### 45일 Walk-Forward 검증 (2026-01-15 ~ 2026-03-02)
| 기간 | 거래수 | 승률 | 수익률 |
|------|--------|------|--------|
| Train (77일) | 66건 | 42% | +22.5% |
| Test (45일) | 67건 | 61% | +49.9% |
Train/Test 모두 수익 → 오버피팅 아님.
---
## 주요 파일
| 파일 | 역할 |
|------|------|
| `core/strategy.py` | 진입 신호 로직 |
| `core/monitor.py` | ATR 트레일링 스탑 + 타임스탑 |
| `core/trader.py` | 주문 실행 + 복리 예산 관리 |
| `core/market_regime.py` | 시장 레짐 감지 |
| `ohlcv_db.py` | OHLCV 시계열 DB 캐시 관리 |
| `sim_365.py` | 365일 복리 시뮬레이션 |
| `vol_lead_sim.py` | 전략 파라미터 스윕 도구 |
---
## 시뮬레이션 실행
```bash
# 365일 복리 시뮬 (DB에서 로드)
python sim_365.py
# OHLCV DB 상태 확인
python ohlcv_db.py status
# 신규 봉 증분 업데이트
python ohlcv_db.py update
```

View File

@@ -112,6 +112,47 @@ def _check_time_stop(ticker: str, pos: dict, current: float) -> bool:
return True return True
def _check_shadow_position(ticker: str, spos: dict) -> None:
"""Shadow 포지션 청산 조건 체크 (트레일링 + 타임 스탑).
실제 포지션과 동일한 로직을 적용하되 주문 없이 결과만 기록.
"""
current = get_current_price(ticker)
if current is None:
return
trader.update_shadow_peak(ticker, current)
# 갱신 후 최신 값 재조회
spos = trader.get_shadow_positions().get(ticker)
if spos is None:
return
buy_price = spos["buy_price"]
peak = spos["peak_price"]
entry_time = spos["entry_time"]
stop_pct = _get_adaptive_stop(ticker)
drop_from_peak = (peak - current) / peak
elapsed_hours = (datetime.now() - entry_time).total_seconds() / 3600
pnl_pct = (current - buy_price) / buy_price * 100
reason = None
if drop_from_peak >= stop_pct:
reason = (
f"트레일링스탑 | 최고={peak:,.2f}→현재={current:,.2f}"
f" ({drop_from_peak:.2%} | 스탑={stop_pct:.2%})"
)
elif elapsed_hours >= TIME_STOP_HOURS and pnl_pct < TIME_STOP_MIN_GAIN_PCT:
reason = (
f"타임스탑 | {elapsed_hours:.1f}h 경과 "
f"수익률={pnl_pct:+.2f}% (기준={TIME_STOP_MIN_GAIN_PCT:+.2f}%)"
)
if reason:
trader.close_shadow(ticker, current, pnl_pct, reason)
def _check_position(ticker: str, pos: dict) -> None: def _check_position(ticker: str, pos: dict) -> None:
"""단일 포지션 전체 체크 (트레일링 스탑 → 타임 스탑 순서).""" """단일 포지션 전체 체크 (트레일링 스탑 → 타임 스탑 순서)."""
current = get_current_price(ticker) current = get_current_price(ticker)
@@ -149,10 +190,20 @@ def run_monitor(interval: int = CHECK_INTERVAL) -> None:
f"타임스탑={TIME_STOP_HOURS:.0f}h/{TIME_STOP_MIN_GAIN_PCT:+.2f}%" f"타임스탑={TIME_STOP_HOURS:.0f}h/{TIME_STOP_MIN_GAIN_PCT:+.2f}%"
) )
while True: while True:
# 실제 포지션 감시
positions_snapshot = dict(trader.get_positions()) positions_snapshot = dict(trader.get_positions())
for ticker, pos in positions_snapshot.items(): for ticker, pos in positions_snapshot.items():
try: try:
_check_position(ticker, pos) _check_position(ticker, pos)
except Exception as e: except Exception as e:
logger.error(f"모니터 오류 {ticker}: {e}") logger.error(f"모니터 오류 {ticker}: {e}")
# Shadow 포지션 감시 (WF차단 종목 재활 추적)
shadow_snapshot = trader.get_shadow_positions()
for ticker, spos in shadow_snapshot.items():
try:
_check_shadow_position(ticker, spos)
except Exception as e:
logger.error(f"Shadow 모니터 오류 {ticker}: {e}")
time.sleep(interval) time.sleep(interval)

View File

@@ -62,6 +62,16 @@ def notify_sell(
) )
def notify_signal(ticker: str, signal_price: float, vol_mult: float) -> None:
"""거래량 축적 신호 감지 알림."""
_send(
f"🔍 <b>[축적감지]</b> {ticker}\n"
f"신호가: {signal_price:,.2f}\n"
f"거래량: {vol_mult:.1f}x 급증 + 2h 횡보\n"
f"진입 목표: {signal_price * 1.048:,.2f}원 (+4.8%)"
)
def notify_error(message: str) -> None: def notify_error(message: str) -> None:
_send(f"⚠️ <b>[오류]</b>\n{message}") _send(f"⚠️ <b>[오류]</b>\n{message}")

View File

@@ -1,8 +1,11 @@
"""Strategy C: 현재 기준 N시간 전 대비 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호. """Volume Lead 전략: 거래량 축적(급증+횡보) 감지 후 +TREND_AFTER_VOL% 상승 시 선진입.
추가 필터: 흐름:
1. 6h 추세 확인 (단기 급등 아닌 지속 추세) 1. 직전 1h 거래량 > 로컬 5h 평균 × VOL_MULT AND
2. 15분 확인 워치리스트 (신호 첫 발생 후 15분 내 재확인 시 진입) 2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
→ 신호가(signal_price) 기록
2. signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입
3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화
""" """
from __future__ import annotations from __future__ import annotations
@@ -12,174 +15,139 @@ import os
import time import time
import pyupbit import pyupbit
from .market import get_current_price, get_ohlcv
from .market import get_current_price
from .market_regime import get_regime from .market_regime import get_regime
from .notify import notify_signal
from .price_db import get_price_n_hours_ago from .price_db import get_price_n_hours_ago
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 추세 판단: 현재 기준 N시간 전 DB 가격 대비 +M% 이상이면 상승 중 # 축적 감지 파라미터
TREND_HOURS = float(os.getenv("TREND_HOURS", "12")) PRICE_QUIET_PCT = float(os.getenv("PRICE_QUIET_PCT", "2.0")) # 2h 횡보 기준 (%)
TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # 레짐이 없을 때 기본값 TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "5.0")) # 진입 임계값 (신호가 대비 %)
SIGNAL_TIMEOUT_H = float(os.getenv("SIGNAL_TIMEOUT_H", "8.0")) # 신호 유효 시간 (h)
# 6h 단기 추세 최소 상승률 (추세 지속형 필터) # 거래량 파라미터
TREND_6H_MIN_PCT = float(os.getenv("TREND_6H_MIN_PCT", "1.0"))
# 모멘텀: MA 기간, 거래량 급증 배수
MA_PERIOD = 20
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) # 레짐이 없을 때 기본값
LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h) LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h)
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
# 15분 확인 워치리스트: 신호 첫 발생 시각(unix ts) 기록 # 축적 신호 상태: ticker → {"price": float, "time": float(unix)}
CONFIRM_SECONDS = int(os.getenv("CONFIRM_SECONDS", "900")) # 기본 15분 _accum_signals: dict[str, dict] = {}
_watchlist: dict[str, float] = {} # ticker → first_signal_time (unix timestamp)
def check_trend(ticker: str, min_gain_pct: float) -> bool: def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
"""상승 추세 조건: 현재가가 DB에 저장된 N시간 전 가격 대비 +min_gain_pct% 이상.""" """직전 완성 1h 거래량이 로컬 5h 평균의 vol_mult 배 이상인지 확인."""
past_price = get_price_n_hours_ago(ticker, TREND_HOURS) fetch_count = LOCAL_VOL_HOURS + 3
if past_price is None:
logger.debug(f"[추세] {ticker} {TREND_HOURS:.0f}h 전 가격 없음 (수집 중)")
return False
current = get_current_price(ticker)
if not current:
return False
gain_pct = (current - past_price) / past_price * 100
result = gain_pct >= min_gain_pct
if result:
logger.info(
f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.2f} "
f"현재={current:,.2f} (+{gain_pct:.1f}% ≥ {min_gain_pct}%)"
)
else:
logger.debug(
f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={min_gain_pct:+.0f}%)"
)
return result
def check_momentum(ticker: str, vol_mult: float) -> bool:
"""모멘텀 조건: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 × vol_mult.
23h 평균은 낮 시간대 고거래량이 포함돼 새벽에 항상 미달하므로,
로컬 5h 평균(같은 시간대 컨텍스트)과 비교한다.
"""
# MA20: 일봉 기준
df_daily = get_ohlcv(ticker, count=MA_PERIOD + 1)
if df_daily is None or len(df_daily) < MA_PERIOD + 1:
return False
ma = df_daily["close"].iloc[-MA_PERIOD:].mean()
current = get_current_price(ticker)
if current is None:
return False
price_ok = current > ma
if not price_ok:
logger.debug(f"[모멘텀✗] {ticker} 현재={current:,.0f} < MA20={ma:,.0f} (가격 기준 미달)")
return False
# 거래량: 60분봉 기준 (최근 1h vs 이전 LOCAL_VOL_HOURS h 로컬 평균)
fetch_count = LOCAL_VOL_HOURS + 3 # 여유 있게 fetch
try: try:
df_hour = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count) df = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
except Exception: except Exception:
return False return False
if df_hour is None or len(df_hour) < LOCAL_VOL_HOURS + 1: if df is None or len(df) < LOCAL_VOL_HOURS + 1:
return False return False
recent_vol = df_hour["volume"].iloc[-2] # 직전 완성된 1h 봉 recent_vol = df["volume"].iloc[-2] # 직전 완성된 1h 봉
local_avg = df_hour["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 LOCAL_VOL_HOURS h 평균 local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 5h 평균
vol_ok = local_avg > 0 and recent_vol >= local_avg * vol_mult if local_avg <= 0:
ratio = recent_vol / local_avg if local_avg > 0 else 0
if vol_ok:
logger.info(
f"[모멘텀↑] {ticker} 현재={current:,.0f} MA20={ma:,.0f} "
f"1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
)
else:
logger.debug(
f"[모멘텀✗] {ticker} 1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} "
f"({ratio:.2f}x < {vol_mult}x)"
)
return vol_ok
def check_trend_6h(ticker: str) -> bool:
"""6h 추세 지속 확인: 6h 전 대비 +TREND_6H_MIN_PCT% 이상 상승 중이어야 진입 허용."""
past = get_price_n_hours_ago(ticker, 6)
if past is None:
logger.debug(f"[6h추세] {ticker} 6h 전 가격 없음 (수집 중)")
return True # 데이터 없으면 필터 패스 (수집 초기)
current = get_current_price(ticker)
if not current:
return False return False
gain_pct = (current - past) / past * 100 ratio = recent_vol / local_avg
result = gain_pct >= TREND_6H_MIN_PCT result = ratio >= vol_mult
if result: if result:
logger.debug( logger.debug(
f"[6h추세↑] {ticker} 6h전={past:,.2f} 현재={current:,.2f} " f"[거래량↑] {ticker} 1h={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
f"(+{gain_pct:.1f}% ≥ {TREND_6H_MIN_PCT}%)"
) )
else: else:
logger.debug( logger.debug(
f"[6h추세✗] {ticker} 6h {gain_pct:+.1f}% (기준={TREND_6H_MIN_PCT:+.1f}%)" f"[거래량✗] {ticker} {ratio:.2f}x < {vol_mult}x"
) )
return result return result
def should_buy(ticker: str) -> bool: def should_buy(ticker: str) -> bool:
"""Strategy C + 시장 레짐 + 추세 지속형 진입 (6h 추세 + 15분 확인 워치리스트). """Volume Lead 전략.
진입 조건: 1단계: 거래량 급증 + 2h 횡보 → 신호가 기록
1. 12h 추세 ≥ 레짐별 임계값 (bull 3% / neutral 5% / bear 8%) 2단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
2. 6h 추세 ≥ 1% (단기 급등이 아닌 지속 추세)
3. 모멘텀 (MA20 초과 + 1h 거래량 급증)
4. 위 조건 최초 충족 후 15분 경과 시 실제 진입 (확인 필터)
""" """
regime = get_regime() regime = get_regime()
trend_pct = regime["trend_pct"]
vol_mult = regime["vol_mult"] vol_mult = regime["vol_mult"]
# 조건 평가 (순서: 가장 빠른 필터 먼저) current = get_current_price(ticker)
if not check_trend(ticker, trend_pct): if not current:
_watchlist.pop(ticker, None) # 조건 깨지면 워치리스트 초기화
return False return False
if not check_trend_6h(ticker):
_watchlist.pop(ticker, None)
return False
if not check_momentum(ticker, vol_mult):
_watchlist.pop(ticker, None)
return False
# 모든 조건 충족 — 15분 확인 워치리스트 처리
now = time.time() now = time.time()
if ticker not in _watchlist:
_watchlist[ticker] = now # ── 기존 신호 유효성 확인 ────────────────────────────────
logger.info( sig = _accum_signals.get(ticker)
f"[워치] {ticker} 신호 첫 발생 → {CONFIRM_SECONDS//60}분 후 진입 예정" if sig is not None:
) age_h = (now - sig["time"]) / 3600
if age_h > SIGNAL_TIMEOUT_H:
del _accum_signals[ticker]
sig = None
logger.debug(f"[축적타임아웃] {ticker} {age_h:.1f}h 경과 → 신호 초기화")
# ── 신호 없음: 축적 조건 체크 ────────────────────────────
if sig is None:
# 2h 가격 횡보 확인 (DB 가격 활용)
price_2h = get_price_n_hours_ago(ticker, 2)
if price_2h is None:
return False return False
elapsed = now - _watchlist[ticker] quiet = abs(current - price_2h) / price_2h * 100 < PRICE_QUIET_PCT
if elapsed < CONFIRM_SECONDS:
if not quiet:
logger.debug( logger.debug(
f"[워치] {ticker} 확인 대기 중 ({elapsed/60:.1f}분 / {CONFIRM_SECONDS//60}분)" f"[횡보✗] {ticker} 2h변동={(current - price_2h) / price_2h * 100:+.1f}% "
f"(기준={PRICE_QUIET_PCT}%)"
) )
return False return False
# 15분 경과 → 진입 # 거래량 급증
_watchlist.pop(ticker, None) if not _check_vol_spike(ticker, vol_mult):
return False
# 축적 신호 기록
_accum_signals[ticker] = {"price": current, "time": now}
logger.info( logger.info(
f"[워치확인] {ticker} {elapsed/60:.1f}분 경과 → 진입 확정" f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}"
)
# 거래량 비율 계산 후 알림 전송
try:
fetch_count = LOCAL_VOL_HOURS + 3
df_h = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
if df_h is not None and len(df_h) >= LOCAL_VOL_HOURS + 1:
recent_vol = df_h["volume"].iloc[-2]
local_avg = df_h["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean()
ratio = recent_vol / local_avg if local_avg > 0 else 0
notify_signal(ticker, current, ratio)
except Exception:
notify_signal(ticker, current, 0.0)
return False # 신호 첫 발생 시는 진입 안 함
# ── 신호 있음: 상승 확인 → 진입 ─────────────────────────
signal_price = sig["price"]
move_pct = (current - signal_price) / signal_price * 100
if current < signal_price:
# 신호가 이하 하락 → 축적 실패
del _accum_signals[ticker]
logger.debug(
f"[축적실패] {ticker} 신호가={signal_price:,.2f} → 현재={current:,.2f} (하락) → 초기화"
)
return False
if move_pct >= TREND_AFTER_VOL:
del _accum_signals[ticker]
logger.info(
f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}"
f"(+{move_pct:.1f}% ≥ {TREND_AFTER_VOL}%)"
) )
return True return True
logger.debug(
f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} "
f"(+{move_pct:.1f}% / 목표={TREND_AFTER_VOL}%)"
)
return False

View File

@@ -24,24 +24,31 @@ load_dotenv()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
SIMULATION_MODE = os.getenv("SIMULATION_MODE", "").lower() in ("true", "1", "yes")
if SIMULATION_MODE:
logging.getLogger(__name__).warning(
"*** SIMULATION MODE ACTIVE — 실제 주문이 실행되지 않습니다 ***"
)
INITIAL_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 초기 원금 (고정) INITIAL_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 초기 원금 (고정)
MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수 MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수
# 복리 적용 예산 (매도 후 재계산) — 수익 발생 시만 증가, 손실 시 원금 유지 # 복리 적용 예산 (매도 후 재계산) — 수익 시 복리 증가, 손실 시 차감 (하한 30%)
MIN_BUDGET = INITIAL_BUDGET * 3 // 10 # 최소 예산: 초기값의 30%
MAX_BUDGET = INITIAL_BUDGET MAX_BUDGET = INITIAL_BUDGET
PER_POSITION = INITIAL_BUDGET // MAX_POSITIONS PER_POSITION = INITIAL_BUDGET // MAX_POSITIONS
def _recalc_compound_budget() -> None: def _recalc_compound_budget() -> None:
"""누적 수익을 반영해 MAX_BUDGET / PER_POSITION 재계산. """누적 수익/손실을 반영해 MAX_BUDGET / PER_POSITION 재계산.
수익이 발생한 만큼만 예산에 더함 (손실 시 원금 아래로 내려가지 않음). 수익 시 복리로 증가, 손실 시 차감 (최소 초기 예산의 30% 보장).
매도 완료 후 호출. 매도 완료 후 호출.
""" """
global MAX_BUDGET, PER_POSITION global MAX_BUDGET, PER_POSITION
try: try:
cum_profit = get_cumulative_krw_profit() cum_profit = get_cumulative_krw_profit()
effective = INITIAL_BUDGET + max(int(cum_profit), 0) effective = max(INITIAL_BUDGET + int(cum_profit), MIN_BUDGET)
MAX_BUDGET = effective MAX_BUDGET = effective
PER_POSITION = effective // MAX_POSITIONS PER_POSITION = effective // MAX_POSITIONS
logger.info( logger.info(
@@ -54,6 +61,7 @@ def _recalc_compound_budget() -> None:
# Walk-forward 필터 설정 # Walk-forward 필터 설정
WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기 WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기
WF_MIN_WIN_RATE = float(os.getenv("WF_MIN_WIN_RATE", "0.40")) # 최소 승률 임계값 WF_MIN_WIN_RATE = float(os.getenv("WF_MIN_WIN_RATE", "0.40")) # 최소 승률 임계값
WF_SHADOW_WINS = int(os.getenv("WF_SHADOW_WINS", "2")) # shadow N연승 → WF 해제
_lock = threading.Lock() _lock = threading.Lock()
_positions: dict = {} _positions: dict = {}
@@ -65,6 +73,13 @@ _last_sell_prices: dict[str, float] = {}
_trade_history: dict[str, list[bool]] = {} _trade_history: dict[str, list[bool]] = {}
# walk-forward 이력: { ticker: [True/False, ...] } (True=수익) # walk-forward 이력: { ticker: [True/False, ...] } (True=수익)
_shadow_lock = threading.Lock()
_shadow_positions: dict[str, dict] = {}
# WF차단 종목 가상 포지션: { ticker: { buy_price, peak_price, entry_time } }
_shadow_cons_wins: dict[str, int] = {}
# shadow 연속 승 횟수: { ticker: int }
_upbit: Optional[pyupbit.Upbit] = None _upbit: Optional[pyupbit.Upbit] = None
@@ -107,6 +122,88 @@ def _update_history(
logger.error(f"거래 이력 저장 실패 {ticker}: {e}") logger.error(f"거래 이력 저장 실패 {ticker}: {e}")
# ── Shadow 재활 ────────────────────────────────────────────────────────────────
def _shadow_enter(ticker: str) -> None:
"""WF 차단 종목에 shadow(가상) 포지션 진입.
buy() 내부(_lock 보유 중)에서 호출됨.
API 호출 후 _shadow_lock으로만 shadow 상태 보호 (deadlock 방지).
"""
# 이미 shadow 중이면 스킵
if ticker in _shadow_positions:
return
price = pyupbit.get_current_price(ticker)
if not price:
return
with _shadow_lock:
if ticker in _shadow_positions: # double-check
return
_shadow_positions[ticker] = {
"buy_price": price,
"peak_price": price,
"entry_time": datetime.now(),
}
cons = _shadow_cons_wins.get(ticker, 0)
logger.info(
f"[Shadow진입] {ticker} @ {price:,.0f}"
f"(가상 — WF 재활 {cons}/{WF_SHADOW_WINS}연승 필요)"
)
def get_shadow_positions() -> dict:
"""Shadow 포지션 스냅샷 반환 (monitor 에서 조회용)."""
with _shadow_lock:
return {k: dict(v) for k, v in _shadow_positions.items()}
def update_shadow_peak(ticker: str, price: float) -> None:
"""Shadow 포지션 최고가 갱신."""
with _shadow_lock:
if ticker in _shadow_positions:
if price > _shadow_positions[ticker]["peak_price"]:
_shadow_positions[ticker]["peak_price"] = price
def close_shadow(ticker: str, sell_price: float, pnl_pct: float, reason: str) -> None:
"""Shadow 포지션 청산 및 WF 재활 진행.
연속승 갱신 → WF_SHADOW_WINS 달성 시 WF 이력 초기화 + Telegram 알림.
"""
with _shadow_lock:
spos = _shadow_positions.pop(ticker, None)
if spos is None:
return
is_win = pnl_pct > 0
cons = _shadow_cons_wins.get(ticker, 0)
cons = cons + 1 if is_win else 0
_shadow_cons_wins[ticker] = cons
do_wf_reset = cons >= WF_SHADOW_WINS
if do_wf_reset:
_shadow_cons_wins.pop(ticker, None)
mark = "" if is_win else ""
logger.info(
f"[Shadow청산] {ticker} {spos['buy_price']:,.0f}{sell_price:,.0f}"
f"| {mark} {pnl_pct:+.1f}% | {reason} | 연속승={cons}/{WF_SHADOW_WINS}"
)
if do_wf_reset:
with _lock: # _shadow_lock은 이미 해제된 상태 (deadlock 없음)
_trade_history.pop(ticker, None)
logger.warning(
f"[WF해제] {ticker} Shadow {WF_SHADOW_WINS}연승 달성 → "
f"WF 이력 초기화, 실거래 재개"
)
notify_error(
f"🎉 [{ticker}] WF 재활 완료!\n"
f"Shadow {WF_SHADOW_WINS}연승 달성 → 실거래 재개"
)
def _db_upsert(ticker: str, pos: dict) -> None: def _db_upsert(ticker: str, pos: dict) -> None:
"""포지션을 Oracle DB에 저장 (실패해도 거래는 계속).""" """포지션을 Oracle DB에 저장 (실패해도 거래는 계속)."""
try: try:
@@ -172,6 +269,30 @@ def restore_positions() -> None:
logger.error(f"DB 포지션 로드 실패: {e}") logger.error(f"DB 포지션 로드 실패: {e}")
saved = {} saved = {}
if SIMULATION_MODE:
# --- 시뮬레이션: Upbit 잔고 조회 없이 DB 포지션만 복원 ---
logger.info("[SIMULATION] 시뮬레이션 모드 — Upbit 잔고 조회 생략, DB 포지션만 복원")
for ticker, s in saved.items():
current = pyupbit.get_current_price(ticker)
if not current:
continue
peak = max(s["peak_price"], current)
entry_time = datetime.fromisoformat(s["entry_time"]) if isinstance(s["entry_time"], str) else s["entry_time"]
with _lock:
_positions[ticker] = {
"buy_price": s["buy_price"],
"peak_price": peak,
"amount": s.get("amount", 0),
"invested_krw": s["invested_krw"],
"entry_time": entry_time,
"trade_id": s.get("trade_id", ""),
}
logger.info(
f"[SIMULATION][복원] {ticker} 매수가={s['buy_price']:,.0f}원 | "
f"현재가={current:,.0f}원 (DB 복원)"
)
return
upbit = _get_upbit() upbit = _get_upbit()
balances = upbit.get_balances() balances = upbit.get_balances()
upbit_tickers = set() upbit_tickers = set()
@@ -261,7 +382,7 @@ def buy(ticker: str) -> bool:
) )
return False return False
# Walk-forward 필터: 직전 WF_WINDOW건 승률이 낮으면 진입 차단 # Walk-forward 필터: 직전 WF_WINDOW건 승률이 낮으면 진입 차단 + shadow 진입
if WF_MIN_WIN_RATE > 0: if WF_MIN_WIN_RATE > 0:
hist = _get_history(ticker) hist = _get_history(ticker)
if len(hist) >= WF_WINDOW: if len(hist) >= WF_WINDOW:
@@ -269,8 +390,9 @@ def buy(ticker: str) -> bool:
if recent_wr < WF_MIN_WIN_RATE: if recent_wr < WF_MIN_WIN_RATE:
logger.info( logger.info(
f"[WF차단] {ticker} 직전{WF_WINDOW}건 승률={recent_wr*100:.0f}%" f"[WF차단] {ticker} 직전{WF_WINDOW}건 승률={recent_wr*100:.0f}%"
f" < {WF_MIN_WIN_RATE*100:.0f}% → 진입 차단" f" < {WF_MIN_WIN_RATE*100:.0f}% → 진입 차단 (shadow 재활 시작)"
) )
_shadow_enter(ticker) # 가상 포지션으로 WF 재활 추적
return False return False
if len(_positions) >= MAX_POSITIONS: if len(_positions) >= MAX_POSITIONS:
@@ -285,8 +407,22 @@ def buy(ticker: str) -> bool:
logger.info(f"잔여 예산 부족({order_krw:,}원), {ticker} 패스") logger.info(f"잔여 예산 부족({order_krw:,}원), {ticker} 패스")
return False return False
upbit = _get_upbit()
try: try:
if SIMULATION_MODE:
# --- 시뮬레이션 매수 ---
sim_price = pyupbit.get_current_price(ticker)
if not sim_price:
logger.error(f"[SIMULATION] 현재가 조회 실패: {ticker}")
return False
amount = order_krw / sim_price
actual_price = sim_price
logger.info(
f"[SIMULATION][매수] {ticker} @ {actual_price:,.0f}원 | "
f"수량={amount:.8f} | 투자금={order_krw:,}원 (모의 주문)"
)
else:
# --- 실제 매수 ---
upbit = _get_upbit()
result = upbit.buy_market_order(ticker, order_krw) result = upbit.buy_market_order(ticker, order_krw)
if not result or "error" in str(result): if not result or "error" in str(result):
logger.error(f"매수 실패: {result}") logger.error(f"매수 실패: {result}")
@@ -310,8 +446,9 @@ def buy(ticker: str) -> bool:
"trade_id": trade_id, "trade_id": trade_id,
} }
_db_upsert(ticker, _positions[ticker]) _db_upsert(ticker, _positions[ticker])
prefix = "[SIMULATION]" if SIMULATION_MODE else ""
logger.info( logger.info(
f"[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | " f"{prefix}[매수] {ticker} @ {actual_price:,.0f}원 (실체결가) | "
f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}" f"수량={amount} | 투자금={order_krw:,}원 | trade_id={trade_id[:8]}"
) )
notify_buy(ticker, actual_price, amount, order_krw, notify_buy(ticker, actual_price, amount, order_krw,
@@ -383,8 +520,22 @@ def sell(ticker: str, reason: str = "") -> bool:
return False return False
pos = _positions[ticker] pos = _positions[ticker]
upbit = _get_upbit()
try: try:
if SIMULATION_MODE:
# --- 시뮬레이션 매도 ---
actual_amount = pos["amount"]
actual_sell_price = pyupbit.get_current_price(ticker) or pos["buy_price"]
sell_value = actual_sell_price * actual_amount
fee = pos["invested_krw"] * 0.0005 + sell_value * 0.0005
krw_profit = sell_value - pos["invested_krw"] - fee
pnl = (actual_sell_price - pos["buy_price"]) / pos["buy_price"] * 100
logger.info(
f"[SIMULATION][매도] {ticker} @ {actual_sell_price:,.4f}원 | "
f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (모의 주문) | 사유={reason}"
)
else:
# --- 실제 매도 ---
upbit = _get_upbit()
currency = ticker.split("-")[1] currency = ticker.split("-")[1]
# 실제 잔고 확인 (재시작 후 이미 매도된 경우 대비) # 실제 잔고 확인 (재시작 후 이미 매도된 경우 대비)
@@ -420,8 +571,9 @@ def sell(ticker: str, reason: str = "") -> bool:
fee = actual_fee_from_order if actual_fee_from_order is not None \ fee = actual_fee_from_order if actual_fee_from_order is not None \
else (pos["invested_krw"] * 0.0005 + sell_value * 0.0005) else (pos["invested_krw"] * 0.0005 + sell_value * 0.0005)
krw_profit = sell_value - pos["invested_krw"] - fee krw_profit = sell_value - pos["invested_krw"] - fee
prefix = "[SIMULATION]" if SIMULATION_MODE else ""
logger.info( logger.info(
f"[매도] {ticker} @ {actual_sell_price:,.4f}원 | " f"{prefix}[매도] {ticker} @ {actual_sell_price:,.4f}원 | "
f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (수수료 {fee:,.0f}원) | 사유={reason}" f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (수수료 {fee:,.0f}원) | 사유={reason}"
) )
try: try:

177
krw_sim.py Normal file
View File

@@ -0,0 +1,177 @@
"""천만원 시드 기준 KRW 시뮬레이션.
- 20개 종목 × vol-lead 4.8% 전략
- MAX_POSITIONS=3 동시 보유 제약 적용
- 포지션별 예산 = 포트폴리오 / MAX_POSITIONS (복리)
- 거래를 시간순으로 처리 → 3개 이상 동시 보유 시 신호 스킵
"""
import os
import pickle
import sys
from datetime import datetime
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
sys.path.insert(0, str(Path(__file__).parent))
from vol_lead_sim import run_trend, run_vol_lead_thresh
BUDGET = 10_000_000 # 초기 시드
MAX_POS = 3 # 최대 동시 보유
THRESH = 4.8 # 진입 임계값 (%)
CACHE_FILE = Path("vol_lead_cache_30.pkl")
TOP30_FILE = Path("top30_tickers.pkl")
def collect_all_trades(data: dict, tickers: list, thresh: float) -> list:
"""모든 종목의 거래를 (buy_dt, sell_dt, ticker, is_win, pnl, reason) 목록으로 반환."""
all_trades = []
for t in tickers:
if t not in data:
continue
trades = run_vol_lead_thresh(data[t], thresh)
for is_win, pnl, buy_dt, sell_dt, reason in trades:
all_trades.append((buy_dt, sell_dt, t, is_win, pnl, reason))
all_trades.sort(key=lambda x: x[0]) # 진입 시간순 정렬
return all_trades
def apply_max_positions(all_trades: list, max_pos: int) -> tuple[list, list]:
"""MAX_POSITIONS 제약 적용. (허용 거래, 스킵 거래) 반환."""
open_exits = [] # 현재 열린 포지션의 청산 시각 목록
accepted = []
skipped = []
for trade in all_trades:
buy_dt, sell_dt = trade[0], trade[1]
# 이미 청산된 포지션 제거
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < max_pos:
open_exits.append(sell_dt)
accepted.append(trade)
else:
skipped.append(trade)
return accepted, skipped
def simulate_krw(accepted: list, budget: float, max_pos: int) -> dict:
"""복리 KRW 시뮬레이션. 포지션당 예산 = 포트폴리오 / MAX_POSITIONS."""
portfolio = budget
total_krw = 0.0
monthly = {} # YYYY-MM → {'trades':0,'wins':0,'pnl':0}
trade_log = []
for buy_dt, sell_dt, ticker, is_win, pnl, reason in accepted:
pos_size = portfolio / max_pos
krw_profit = pos_size * pnl / 100
portfolio += krw_profit
total_krw += krw_profit
ym = buy_dt.strftime("%Y-%m")
if ym not in monthly:
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
monthly[ym]["trades"] += 1
monthly[ym]["wins"] += int(is_win)
monthly[ym]["pnl_krw"] += krw_profit
trade_log.append({
"buy_dt": buy_dt,
"sell_dt": sell_dt,
"ticker": ticker,
"is_win": is_win,
"pnl_pct": pnl,
"krw_profit": krw_profit,
"portfolio": portfolio,
"reason": reason,
})
wins = sum(1 for t in accepted if t[3])
return {
"portfolio": portfolio,
"total_krw": total_krw,
"roi_pct": (portfolio - budget) / budget * 100,
"total": len(accepted),
"wins": wins,
"wr": wins / len(accepted) * 100 if accepted else 0,
"monthly": monthly,
"trade_log": trade_log,
}
def main() -> None:
data = pickle.load(open(CACHE_FILE, "rb"))
top30 = pickle.load(open(TOP30_FILE, "rb"))
valid = [t for t in top30 if t in data and len(data[t]) >= 400]
use20 = valid[:20]
print(f"{'='*65}")
print(f"천만원 시드 KRW 시뮬레이션 | vol-lead +{THRESH}% | 20종목")
print(f"MAX_POSITIONS={MAX_POS} | 복리 포지션 크기")
print(f"기간: 2026-01-15 ~ 2026-03-02 (46일)")
print(f"{'='*65}")
all_trades = collect_all_trades(data, use20, THRESH)
accepted, skipped = apply_max_positions(all_trades, MAX_POS)
result = simulate_krw(accepted, BUDGET, MAX_POS)
print(f"\n── 전체 결과 ─────────────────────────────────────────")
print(f" 신호 발생: {len(all_trades):>4}")
print(f" 실제 진입: {result['total']:>4}건 (MAX_POS={MAX_POS} 제약으로 {len(skipped)}건 스킵)")
print(f" 승/패: {result['wins']}{result['total']-result['wins']}패 (승률 {result['wr']:.0f}%)")
print(f" ─────────────────────────────────────────────────")
print(f" 초기 시드: {BUDGET:>14,.0f}")
print(f" 최종 자산: {result['portfolio']:>14,.0f}")
print(f" 순수익: {result['total_krw']:>+14,.0f}")
print(f" 수익률: {result['roi_pct']:>+13.2f}%")
# ── 월별 수익 ─────────────────────────────────────
print(f"\n── 월별 수익 ─────────────────────────────────────────")
print(f" {'':^8}{'거래':>4} {'승률':>5}{'월수익(KRW)':>14} {'누적수익(KRW)':>15}")
print(f" {''*58}")
cum = 0.0
for ym, m in sorted(result["monthly"].items()):
wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0
cum += m["pnl_krw"]
print(f" {ym:^8}{m['trades']:>4}{wr:>4.0f}% │ "
f"{m['pnl_krw']:>+14,.0f}{cum:>+14,.0f}")
# ── 종목별 수익 ───────────────────────────────────
print(f"\n── 종목별 수익 ───────────────────────────────────────")
print(f" {'종목':<14}{'거래':>4} {'승률':>5}{'KRW수익':>14} {'평균/건':>10}")
print(f" {''*58}")
ticker_stats: dict = {}
for t in result["trade_log"]:
k = t["ticker"]
if k not in ticker_stats:
ticker_stats[k] = {"n": 0, "wins": 0, "krw": 0.0}
ticker_stats[k]["n"] += 1
ticker_stats[k]["wins"] += int(t["is_win"])
ticker_stats[k]["krw"] += t["krw_profit"]
for t, s in sorted(ticker_stats.items(), key=lambda x: -x[1]["krw"]):
wr = s["wins"] / s["n"] * 100 if s["n"] else 0
avg = s["krw"] / s["n"] if s["n"] else 0
print(f" {t:<14}{s['n']:>4}{wr:>4.0f}% │ "
f"{s['krw']:>+14,.0f}{avg:>+9,.0f}원/건")
# ── 전체 거래 내역 ────────────────────────────────
print(f"\n── 전체 거래 내역 ({len(result['trade_log'])}건) ─────────────────────")
print(f" {'#':>3} {'종목':<14} {'매수':^13} {'매도':^13} "
f"{'수익률':>7} {'KRW수익':>12} {'잔고':>12} {'사유'}")
print(f" {''*90}")
for i, t in enumerate(result["trade_log"], 1):
mark = "" if t["is_win"] else ""
print(f" {i:>3} {t['ticker']:<14} "
f"{t['buy_dt'].strftime('%m-%d %H:%M'):^13} "
f"{t['sell_dt'].strftime('%m-%d %H:%M'):^13} "
f"{mark}{t['pnl_pct']:>+6.2f}% "
f"{t['krw_profit']:>+12,.0f}"
f"{t['portfolio']:>12,.0f}"
f"{t['reason']}")
if __name__ == "__main__":
main()

186
momentum_cmp.py Normal file
View File

@@ -0,0 +1,186 @@
"""모멘텀 필터 유/무 비교 시뮬레이션.
A안: 추세(2h +5%) + 15분 워치리스트 (모멘텀 없음)
B안: 추세(2h +5%) + 모멘텀 + 15분 워치리스트 (현행)
"""
import os, time
from dotenv import load_dotenv
load_dotenv()
import oracledb
import pyupbit
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
TREND_MIN_PCT = 5.0
MA_PERIOD = 20
LOCAL_VOL_HOURS = 5
VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
CONFIRM_MINUTES = 15
FEE = 0.0005
_daily_cache = {}
_hourly_cache = {}
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
def load_prices(cur, ticker, from_dt):
cur.execute("""SELECT price, recorded_at FROM price_history
WHERE ticker=:t AND recorded_at>=:f ORDER BY recorded_at""", t=ticker, f=from_dt)
return cur.fetchall()
def get_ma20(ticker, dt):
key = (ticker, dt.strftime("%Y-%m-%d"))
if key not in _daily_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD+2,
to=dt.strftime("%Y-%m-%d 09:00:00"))
_daily_cache[key] = df
time.sleep(0.1)
except:
_daily_cache[key] = None
df = _daily_cache[key]
if df is None or len(df) < MA_PERIOD:
return None
return df["close"].iloc[-MA_PERIOD:].mean()
def get_vol_ratio(ticker, dt):
key = (ticker, dt.strftime("%Y-%m-%d %H"))
if key not in _hourly_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS+3,
to=dt.strftime("%Y-%m-%d %H:%M:%S"))
_hourly_cache[key] = df
time.sleep(0.1)
except:
_hourly_cache[key] = None
df = _hourly_cache[key]
if df is None or len(df) < LOCAL_VOL_HOURS+1:
return 0.0
rv = df["volume"].iloc[-2]
la = df["volume"].iloc[-(LOCAL_VOL_HOURS+1):-2].mean()
return rv/la if la > 0 else 0.0
def check_trend(prices, idx):
lb = 12 # 2h = 12 * 10min
if idx < lb: return False
curr, past = prices[idx][0], prices[idx-lb][0]
return past > 0 and (curr-past)/past*100 >= TREND_MIN_PCT
def check_momentum(ticker, price, dt):
ma = get_ma20(ticker, dt)
if ma is None or price <= ma: return False
return get_vol_ratio(ticker, dt) >= VOL_MULT
def simulate_pos(prices, buy_idx, buy_price):
buy_dt = prices[buy_idx][1]
peak = buy_price
for price, ts in prices[buy_idx+1:]:
if price > peak: peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
if (peak - price) / peak >= STOP_LOSS_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, price, ts, f"타임스탑", net
lp, lt = prices[-1]
net = (lp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net > 0, lp, lt, "데이터종료", net
def run_scenario(prices, ticker, use_momentum, label):
wins = losses = 0
total_pnl = 0.0
watchlist_dt = None
in_pos = False
buy_idx = buy_price = None
idx = 0
trades = []
while idx < len(prices):
price, dt = prices[idx]
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(prices, buy_idx, buy_price)
next_idx = next((i for i,(_, ts) in enumerate(prices) if ts > sdt), len(prices))
if is_win: wins += 1
else: losses += 1
total_pnl += pnl
trades.append((is_win, buy_price, sp, pnl, dt, sdt, reason))
in_pos = False
watchlist_dt = None
idx = next_idx
continue
trend_ok = check_trend(prices, idx)
mom_ok = check_momentum(ticker, price, dt) if (use_momentum and trend_ok) else True
if trend_ok and mom_ok:
if watchlist_dt is None:
watchlist_dt = dt
elif (dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
in_pos = True
buy_idx = idx
buy_price = price
watchlist_dt = None
else:
watchlist_dt = None
idx += 1
total = wins + losses
wr = wins/total*100 if total else 0
return {'label': label, 'total': total, 'wins': wins, 'losses': losses,
'wr': wr, 'pnl': total_pnl, 'trades': trades}
def print_result(r):
print(f"\n [{r['label']}]")
print(f"{r['total']}건 | 승률={r['wr']:.0f}% ({r['wins']}{r['losses']}패) | 누적={r['pnl']:+.2f}%")
for i, (iw, bp, sp, pnl, bdt, sdt, reason) in enumerate(r['trades'], 1):
mark = "" if iw else ""
print(f" #{i}: {bp:.4f}{sp:.4f}원 | {mark} {pnl:+.2f}% | {reason}"
f" ({bdt.strftime('%m-%d %H:%M')}{sdt.strftime('%m-%d %H:%M')})")
def main():
conn = get_conn()
cur = conn.cursor()
cur.execute("SELECT MAX(recorded_at) FROM price_history")
end_dt = cur.fetchone()[0]
print("=" * 62)
print("모멘텀 필터 유/무 비교 (WF차단 발동 이후 전 기간)")
print("A안: 추세+워치리스트만 B안: 추세+모멘텀+워치리스트(현행)")
print("=" * 62)
summary = []
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("SELECT traded_at FROM trade_results WHERE ticker=:t ORDER BY traded_at", t=ticker)
rows = cur.fetchall()
wf_dt = rows[4][0]
prices = load_prices(cur, ticker, wf_dt)
print(f"\n{''*62}")
print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')} 데이터: {len(prices)}")
rA = run_scenario(prices, ticker, use_momentum=False, label="A: 추세+워치만")
rB = run_scenario(prices, ticker, use_momentum=True, label="B: 추세+모멘텀+워치(현행)")
print_result(rA)
print_result(rB)
summary.append((ticker, rA, rB))
print(f"\n{'='*62}")
print(f"{'종목':<12} {'A안 거래':>6} {'A안 승률':>8} {'A안 PnL':>10}{'B안 거래':>6} {'B안 승률':>8} {'B안 PnL':>10}")
print(f"{''*62}")
for ticker, rA, rB in summary:
print(f"{ticker:<12} {rA['total']:>6}{rA['wr']:>6.0f}% {rA['pnl']:>+9.2f}% │"
f" {rB['total']:>6}{rB['wr']:>6.0f}% {rB['pnl']:>+9.2f}%")
conn.close()
if __name__ == "__main__":
main()

213
ohlcv_db.py Normal file
View File

@@ -0,0 +1,213 @@
"""OHLCV 시계열 캐시 — Oracle ADB ohlcv_hourly 테이블.
기능:
- 테이블 생성 (없으면)
- pkl → DB 최초 적재
- DB → DataFrame dict 로드 (시뮬용)
- 증분 업데이트 (신규 봉만 API 페치)
"""
from __future__ import annotations
import os
import pickle
import time
from datetime import datetime
from pathlib import Path
import pandas as pd
import pyupbit
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent / ".env")
from core.price_db import _conn
# ── DDL ───────────────────────────────────────────────
_DDL = """
CREATE TABLE ohlcv_hourly (
ticker VARCHAR2(20) NOT NULL,
candle_time TIMESTAMP NOT NULL,
open_price NUMBER(20,8) NOT NULL,
high_price NUMBER(20,8) NOT NULL,
low_price NUMBER(20,8) NOT NULL,
close_price NUMBER(20,8) NOT NULL,
volume NUMBER(30,8) NOT NULL,
CONSTRAINT pk_ohlcv PRIMARY KEY (ticker, candle_time)
)
"""
def ensure_table() -> None:
with _conn() as conn:
cur = conn.cursor()
cur.execute("SELECT COUNT(*) FROM user_tables WHERE table_name='OHLCV_HOURLY'")
if cur.fetchone()[0] == 0:
conn.cursor().execute(_DDL)
print("ohlcv_hourly 테이블 생성 완료")
else:
print("ohlcv_hourly 테이블 이미 존재")
# ── 적재 ──────────────────────────────────────────────
def insert_df(ticker: str, df: pd.DataFrame, batch: int = 500) -> int:
"""DataFrame → ohlcv_hourly 배치 삽입.
신규 레코드만 삽입: 기존 candle_time 조회 후 Python에서 필터링.
"""
sql_existing = """
SELECT candle_time FROM ohlcv_hourly
WHERE ticker = :1
"""
sql_insert = """
INSERT INTO ohlcv_hourly
(ticker, candle_time, open_price, high_price, low_price, close_price, volume)
VALUES (:1, :2, :3, :4, :5, :6, :7)
"""
rows = [
(
ticker,
row.name.to_pydatetime().replace(tzinfo=None),
float(row["open"]),
float(row["high"]),
float(row["low"]),
float(row["close"]),
float(row["volume"]),
)
for _, row in df.iterrows()
]
with _conn() as conn:
cur = conn.cursor()
# 기존 candle_time 조회 → 중복 제거
cur.execute(sql_existing, [ticker])
existing = {r[0].replace(tzinfo=None) for r in cur.fetchall()}
new_rows = [r for r in rows if r[1] not in existing]
if not new_rows:
return 0
for i in range(0, len(new_rows), batch):
cur.executemany(sql_insert, new_rows[i : i + batch])
return len(new_rows)
def load_from_pkl(pkl_path: str | Path) -> None:
"""pkl 파일의 모든 종목을 DB에 적재."""
pkl_path = Path(pkl_path)
data = pickle.load(open(pkl_path, "rb"))
ensure_table()
total = 0
for ticker, df in data.items():
n = insert_df(ticker, df)
total += n
print(f" {ticker}: {n}건 적재")
print(f"\n{total:,}건 적재 완료")
# ── 로드 ──────────────────────────────────────────────
def load_from_db(tickers: list[str], from_date: str = "2025-03-02") -> dict:
"""DB → {ticker: DataFrame} 반환 (시뮬용)."""
from_dt = datetime.strptime(from_date, "%Y-%m-%d")
data = {}
sql = """
SELECT candle_time, open_price, high_price, low_price, close_price, volume
FROM ohlcv_hourly
WHERE ticker = :1 AND candle_time >= :2
ORDER BY candle_time
"""
with _conn() as conn:
for ticker in tickers:
cur = conn.cursor()
cur.execute(sql, [ticker, from_dt])
rows = cur.fetchall()
if not rows:
continue
df = pd.DataFrame(
rows,
columns=["candle_time", "open", "high", "low", "close", "volume"],
)
df.set_index("candle_time", inplace=True)
df.index = pd.to_datetime(df.index)
data[ticker] = df
return data
# ── 증분 업데이트 ──────────────────────────────────────
def update_incremental(tickers: list[str]) -> None:
"""각 종목의 최신 봉 이후 데이터를 API에서 가져와 적재."""
sql_max = "SELECT MAX(candle_time) FROM ohlcv_hourly WHERE ticker = :1"
for ticker in tickers:
with _conn() as conn:
cur = conn.cursor()
cur.execute(sql_max, [ticker])
row = cur.fetchone()
latest = row[0] if row and row[0] else None
if latest:
to_dt = None # 최신까지 fetch
kwargs: dict = dict(ticker=ticker, interval="minute60", count=200)
df = pyupbit.get_ohlcv(**kwargs)
if df is None or df.empty:
continue
df.index = df.index.tz_localize(None)
# latest 이후만 삽입
new_df = df[df.index > latest.replace(tzinfo=None)]
if new_df.empty:
print(f" {ticker}: 신규 봉 없음")
continue
n = insert_df(ticker, new_df)
print(f" {ticker}: +{n}봉 추가")
else:
print(f" {ticker}: DB에 없음, 전체 로드 필요")
time.sleep(0.2)
# ── CLI ───────────────────────────────────────────────
if __name__ == "__main__":
import sys
cmd = sys.argv[1] if len(sys.argv) > 1 else "status"
if cmd == "init":
# pkl → DB 최초 적재
pkl = sys.argv[2] if len(sys.argv) > 2 else "vol_lead_cache_365.pkl"
print(f"pkl 적재: {pkl}")
load_from_pkl(pkl)
elif cmd == "update":
# 증분 업데이트
import pickle as _pk
top30 = _pk.load(open("top30_tickers.pkl", "rb"))
print("증분 업데이트...")
update_incremental(top30)
elif cmd == "status":
# 종목별 레코드 수 확인
with _conn() as conn:
cur = conn.cursor()
try:
cur.execute("""
SELECT ticker, COUNT(*), MIN(candle_time), MAX(candle_time)
FROM ohlcv_hourly
GROUP BY ticker
ORDER BY ticker
""")
rows = cur.fetchall()
if rows:
print(f"{'종목':<16} {'봉수':>6} {'시작':^12} {'종료':^12}")
print("-" * 52)
for r in rows:
print(f"{r[0]:<16} {r[1]:>6}"
f"{r[2].strftime('%y-%m-%d'):^12} "
f"{r[3].strftime('%y-%m-%d'):^12}")
print(f"\n{sum(r[1] for r in rows):,}봉 / {len(rows)}종목")
else:
print("ohlcv_hourly 테이블이 비어 있거나 없음")
except Exception as e:
print(f"오류: {e}")

156
shadow_sim.py Normal file
View File

@@ -0,0 +1,156 @@
"""Shadow Trading 재활 시뮬레이션.
WF차단 종목들에 대해 shadow 포지션을 시뮬레이션하여
몇 번의 shadow 승리 후 WF차단이 해제될 수 있었는지 분석.
"""
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
import oracledb
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
FEE = 0.0005
REHABILITATE_WINS = 2 # shadow N승 → WF 해제
def get_price_series(cur, ticker, from_dt):
"""WF차단 이후 가격 시계열 조회."""
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at >= :dt
ORDER BY recorded_at
""", t=ticker, dt=from_dt)
return cur.fetchall()
def simulate_shadow(prices, buy_price, buy_dt):
"""
단일 shadow 포지션 시뮬레이션.
Returns: (is_win, sell_price, sell_dt, reason, pnl_pct)
"""
peak = buy_price
for price, ts in prices:
# 고점 갱신
if price > peak:
peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
# 트레일링 스탑 (최고가 대비 -STOP_LOSS_PCT)
drop_from_peak = (peak - price) / peak
if drop_from_peak >= STOP_LOSS_PCT:
sell_pnl = (price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return (sell_pnl > 0, price, ts, f"트레일링스탑(peak={peak:.4f})", sell_pnl)
# 타임 스탑
if elapsed_h >= TIME_STOP_HOURS:
if pnl < TIME_STOP_MIN_PCT:
sell_pnl = (price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return (sell_pnl > 0, price, ts, f"타임스탑({elapsed_h:.1f}h,{pnl*100:+.1f}%)", sell_pnl)
# 데이터 끝까지 보유 중
last_price, last_ts = prices[-1]
sell_pnl = (last_price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return (sell_pnl > 0, last_price, last_ts, "데이터종료(보유중)", sell_pnl)
def run():
conn = oracledb.connect(
user=os.getenv('ORACLE_USER'),
password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'),
config_dir=os.getenv('ORACLE_WALLET')
)
cur = conn.cursor()
# WF차단 종목별 5번째 패배 시점 조회
blocked_tickers = {}
for ticker in ['KRW-DKA', 'KRW-SAHARA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT traded_at, pnl_pct, sell_price FROM trade_results
WHERE ticker = :t ORDER BY traded_at
""", t=ticker)
rows = cur.fetchall()
if len(rows) >= 5:
wf_trigger_dt = rows[4][0] # 5번째 거래 완료 시점
last_sell_price = rows[4][2]
blocked_tickers[ticker] = {
'wf_trigger_dt': wf_trigger_dt,
'last_sell_price': last_sell_price,
'trades': rows
}
print("=" * 60)
print("Shadow Trading 재활 시뮬레이션")
print("=" * 60)
for ticker, info in blocked_tickers.items():
wf_dt = info['wf_trigger_dt']
print(f"\n{''*60}")
print(f"[{ticker}] WF차단 발동: {wf_dt.strftime('%m-%d %H:%M')}")
print(f" 직전 5건: {[f'{r[1]:+.2f}%' for r in info['trades']]}")
# WF차단 이후 가격 시계열
prices = get_price_series(cur, ticker, wf_dt)
if not prices:
print(" → 이후 가격 데이터 없음")
continue
print(f" 이후 가격 데이터: {len(prices)}개 ({prices[0][1].strftime('%m-%d %H:%M')} ~ {prices[-1][1].strftime('%m-%d %H:%M')})")
print()
# Shadow 포지션 시뮬레이션
shadow_wins = 0
pos_idx = 0
cursor_idx = 0
rehabilitated = False
while cursor_idx < len(prices) and not rehabilitated:
buy_price, buy_dt = prices[cursor_idx]
remaining = prices[cursor_idx + 1:]
if not remaining:
break
pos_idx += 1
is_win, sell_price, sell_dt, reason, pnl = simulate_shadow(remaining, buy_price, buy_dt)
win_mark = "✅ WIN" if is_win else "❌ LOSS"
if is_win:
shadow_wins += 1
else:
shadow_wins = 0 # 패배 시 카운터 리셋
print(f" Shadow #{pos_idx}: 진입={buy_price:.4f}원 ({buy_dt.strftime('%m-%d %H:%M')})")
print(f" 청산={sell_price:.4f}원 ({sell_dt.strftime('%m-%d %H:%M')}) | {reason}")
print(f" {win_mark} {pnl:+.2f}% | 연속승={shadow_wins}/{REHABILITATE_WINS}")
if shadow_wins >= REHABILITATE_WINS:
print(f"\n 🎉 {REHABILITATE_WINS}연승 달성 → WF 차단 해제! ({sell_dt.strftime('%m-%d %H:%M')})")
rehabilitated = True
break
# 다음 진입: 청산 시점 이후 첫 번째 가격
next_idx = next(
(i for i, (_, ts) in enumerate(prices) if ts > sell_dt),
None
)
if next_idx is None:
break
cursor_idx = next_idx
if not rehabilitated:
print(f"\n ⛔ 데이터 범위 내 재활 실패 (shadow_wins={shadow_wins})")
conn.close()
if __name__ == "__main__":
run()

155
shadow_sim2.py Normal file
View File

@@ -0,0 +1,155 @@
"""Shadow 재활 이후 실제 진입 성과 시뮬레이션."""
import os
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
import oracledb
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
FEE = 0.0005
REHABILITATE_WINS = 2
def get_price_series(cur, ticker, from_dt):
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at >= :dt
ORDER BY recorded_at
""", t=ticker, dt=from_dt)
return cur.fetchall()
def simulate_one(prices, buy_price, buy_dt):
"""단일 포지션 시뮬레이션. Returns (is_win, sell_price, sell_dt, reason, pnl_pct)"""
peak = buy_price
for price, ts in prices:
if price > peak:
peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
drop_from_peak = (peak - price) / peak
if drop_from_peak >= STOP_LOSS_PCT:
sell_pnl = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (sell_pnl > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", sell_pnl)
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
sell_pnl = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (sell_pnl > 0, price, ts, f"타임스탑({elapsed_h:.1f}h)", sell_pnl)
last_price, last_ts = prices[-1]
sell_pnl = (last_price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (sell_pnl > 0, last_price, last_ts, "데이터종료(보유중)", sell_pnl)
def run_shadow_then_real(cur, ticker, wf_trigger_dt):
"""shadow로 재활 후, 재활 시점 이후 실제 거래 성과 시뮬레이션."""
prices = get_price_series(cur, ticker, wf_trigger_dt)
if not prices:
return None
# 1단계: shadow로 재활 시점 찾기
shadow_wins = 0
cursor_idx = 0
rehab_dt = None
while cursor_idx < len(prices):
buy_price, buy_dt = prices[cursor_idx]
remaining = prices[cursor_idx + 1:]
if not remaining:
break
is_win, sell_price, sell_dt, reason, pnl = simulate_one(remaining, buy_price, buy_dt)
if is_win:
shadow_wins += 1
else:
shadow_wins = 0
if shadow_wins >= REHABILITATE_WINS:
rehab_dt = sell_dt
break
next_idx = next((i for i, (_, ts) in enumerate(prices) if ts > sell_dt), None)
if next_idx is None:
break
cursor_idx = next_idx
if rehab_dt is None:
return None # 재활 실패
# 2단계: 재활 이후 실제 거래 시뮬레이션
print(f"\n ★ WF 해제 시점: {rehab_dt.strftime('%m-%d %H:%M')}")
print(f" ─ 이후 실제 진입 시뮬레이션 ─")
post_prices = get_price_series(cur, ticker, rehab_dt)
if not post_prices:
print(" → 재활 이후 가격 데이터 없음")
return
cursor_idx = 0
trade_no = 0
wins = 0
total_pnl = 0.0
while cursor_idx < len(post_prices):
buy_price, buy_dt = post_prices[cursor_idx]
remaining = post_prices[cursor_idx + 1:]
if not remaining:
break
is_win, sell_price, sell_dt, reason, pnl = simulate_one(remaining, buy_price, buy_dt)
trade_no += 1
if is_win:
wins += 1
total_pnl += pnl
mark = "" if is_win else ""
print(f" 실제#{trade_no}: {buy_price:.4f}{sell_price:.4f}원 | {mark} {pnl:+.2f}% | {reason} ({sell_dt.strftime('%m-%d %H:%M')})")
next_idx = next((i for i, (_, ts) in enumerate(post_prices) if ts > sell_dt), None)
if next_idx is None:
break
cursor_idx = next_idx
if trade_no > 0:
wr = wins / trade_no * 100
print(f"\n 📊 재활 후 성과: {trade_no}건 중 {wins}승 | 승률={wr:.0f}% | 누적PnL={total_pnl:+.2f}%")
return {'trades': trade_no, 'wins': wins, 'win_rate': wr, 'total_pnl': total_pnl}
return None
def run():
conn = oracledb.connect(
user=os.getenv('ORACLE_USER'),
password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'),
config_dir=os.getenv('ORACLE_WALLET')
)
cur = conn.cursor()
results = {}
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT traded_at FROM trade_results
WHERE ticker = :t ORDER BY traded_at
""", t=ticker)
rows = cur.fetchall()
wf_trigger_dt = rows[4][0]
print(f"\n{'='*60}")
print(f"[{ticker}] WF차단 발동: {wf_trigger_dt.strftime('%m-%d %H:%M')}")
r = run_shadow_then_real(cur, ticker, wf_trigger_dt)
if r:
results[ticker] = r
print(f"\n{'='*60}")
print("전체 요약")
print(f"{'='*60}")
for ticker, r in results.items():
print(f"{ticker}: 재활 후 {r['trades']}건 | 승률={r['win_rate']:.0f}% | 누적={r['total_pnl']:+.2f}%")
conn.close()
if __name__ == "__main__":
run()

320
shadow_sim3.py Normal file
View File

@@ -0,0 +1,320 @@
"""Shadow 재활 시뮬레이션 v3 - 실제 전략 필터 포함.
전략 조건:
1. 추세: 현재가 vs 2h 전 가격 >= TREND_MIN_GAIN_PCT%
2. 모멘텀: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 * VOL_MULT
3. 15분 워치리스트: 첫 신호 후 15분 재확인
"""
import os, time
from datetime import datetime, timedelta
from collections import defaultdict
from dotenv import load_dotenv
load_dotenv()
import oracledb
import pyupbit
# ── 파라미터 ──────────────────────────────────────────
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
TREND_HOURS = 2
TREND_MIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # Neutral 기준
MA_PERIOD = 20
LOCAL_VOL_HOURS = 5
VOL_MULT = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
CONFIRM_MINUTES = 15
FEE = 0.0005
REHABILITATE_WINS = 2
# ── DB 연결 ───────────────────────────────────────────
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'),
password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'),
config_dir=os.getenv('ORACLE_WALLET')
)
def load_price_history(cur, ticker, from_dt, to_dt):
"""price_history 전체 로드."""
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at BETWEEN :f AND :e
ORDER BY recorded_at
""", t=ticker, f=from_dt, e=to_dt)
return cur.fetchall() # [(price, dt), ...]
# ── pyupbit 과거 데이터 캐시 ───────────────────────────
_daily_cache = {} # (ticker, date_str) → df
_hourly_cache = {} # (ticker, hour_str) → df
def get_ma20(ticker, as_of_dt):
"""as_of_dt 기준 MA20 (일봉 종가)."""
date_str = as_of_dt.strftime("%Y-%m-%d")
key = (ticker, date_str)
if key not in _daily_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="day", count=MA_PERIOD + 2,
to=as_of_dt.strftime("%Y-%m-%d 09:00:00"))
_daily_cache[key] = df
time.sleep(0.1)
except Exception as e:
_daily_cache[key] = None
df = _daily_cache[key]
if df is None or len(df) < MA_PERIOD:
return None
return df["close"].iloc[-MA_PERIOD:].mean()
def get_volume_ratio(ticker, as_of_dt):
"""최근 1h 거래량 / 로컬 5h 평균. (직전 완성봉 기준)"""
# 시간봉은 해당 시각의 이전 7h 데이터 필요
hour_str = as_of_dt.strftime("%Y-%m-%d %H:00:00")
key = (ticker, hour_str)
if key not in _hourly_cache:
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=LOCAL_VOL_HOURS + 3,
to=as_of_dt.strftime("%Y-%m-%d %H:%M:%S"))
_hourly_cache[key] = df
time.sleep(0.1)
except Exception as e:
_hourly_cache[key] = None
df = _hourly_cache[key]
if df is None or len(df) < LOCAL_VOL_HOURS + 1:
return 0.0
recent_vol = df["volume"].iloc[-2]
local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean()
if local_avg <= 0:
return 0.0
return recent_vol / local_avg
# ── 전략 조건 체크 ────────────────────────────────────
def check_trend(prices, idx):
"""현재 idx 기준 2h 전(12틱 전) 대비 +TREND_MIN_PCT% 이상."""
lookback = TREND_HOURS * 6 # 10분봉 기준 2h = 12틱
if idx < lookback:
return False
current = prices[idx][0]
past = prices[idx - lookback][0]
if past <= 0:
return False
gain = (current - past) / past * 100
return gain >= TREND_MIN_PCT
def check_momentum(ticker, current_price, as_of_dt):
"""현재가 > MA20 AND 거래량 비율 >= VOL_MULT."""
ma20 = get_ma20(ticker, as_of_dt)
if ma20 is None:
return False
if current_price <= ma20:
return False
vol_ratio = get_volume_ratio(ticker, as_of_dt)
return vol_ratio >= VOL_MULT
# ── 단일 포지션 시뮬레이션 ────────────────────────────
def simulate_position(prices, buy_idx, buy_price):
"""buy_idx 이후 가격으로 포지션 시뮬레이션."""
buy_dt = prices[buy_idx][1]
peak = buy_price
for price, ts in prices[buy_idx + 1:]:
if price > peak:
peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
drop_from_peak = (peak - price) / peak
if drop_from_peak >= STOP_LOSS_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (net > 0, price, ts, f"트레일링({pnl*100:+.1f}%)", net)
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (net > 0, price, ts, f"타임스탑({elapsed_h:.1f}h)", net)
last_price, last_ts = prices[-1]
net = (last_price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return (net > 0, last_price, last_ts, "데이터종료", net)
# ── Shadow → 재활 → 실제 진입 시뮬레이션 ─────────────
def run_full_sim(cur, ticker, wf_trigger_dt, end_dt):
"""
1. WF차단 시점부터 shadow 포지션 (전략 필터 적용)
2. REHABILITATE_WINS 연승 → WF 해제
3. 해제 이후 실제 진입 성과
"""
prices = load_price_history(cur, ticker, wf_trigger_dt, end_dt)
if not prices:
print(f" 가격 데이터 없음")
return
print(f" 가격 데이터: {len(prices)}개 ({prices[0][1].strftime('%m-%d %H:%M')} ~ {prices[-1][1].strftime('%m-%d %H:%M')})")
# ── Phase 1: Shadow (WF 해제까지) ────────────────
shadow_wins = 0
rehab_dt = None
watchlist_dt = None # 신호 첫 발생 시각
in_position = False
pos_buy_idx = None
pos_buy_price = None
shadow_trade_no = 0
idx = 0
while idx < len(prices) and rehab_dt is None:
current_price, current_dt = prices[idx]
if in_position:
# 포지션 청산 체크
is_win, sell_price, sell_dt, reason, pnl = simulate_position(prices, pos_buy_idx, pos_buy_price)
# 청산 시각에 해당하는 idx로 점프
sell_idx = next((i for i, (_, ts) in enumerate(prices) if ts >= sell_dt), len(prices)-1)
shadow_trade_no += 1
if is_win:
shadow_wins += 1
else:
shadow_wins = 0
mark = "" if is_win else ""
print(f" [Shadow#{shadow_trade_no}] {pos_buy_price:.4f}{sell_price:.4f}"
f"| {mark} {pnl:+.2f}% | {reason} | 연속승={shadow_wins}/{REHABILITATE_WINS}"
f" ({sell_dt.strftime('%m-%d %H:%M')})")
if shadow_wins >= REHABILITATE_WINS:
rehab_dt = sell_dt
idx = sell_idx
break
in_position = False
watchlist_dt = None
idx = sell_idx
continue
# 전략 조건 체크
trend_ok = check_trend(prices, idx)
if trend_ok:
mom_ok = check_momentum(ticker, current_price, current_dt)
else:
mom_ok = False
if trend_ok and mom_ok:
if watchlist_dt is None:
watchlist_dt = current_dt # 첫 신호
elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
# 15분 재확인 → shadow 진입
in_position = True
pos_buy_idx = idx
pos_buy_price = current_price
watchlist_dt = None
# 청산은 다음 루프에서
else:
watchlist_dt = None # 조건 깨지면 초기화
idx += 1
if rehab_dt is None:
print(f"\n ⛔ 전략 필터 적용 시 데이터 범위 내 재활 실패 (shadow_wins={shadow_wins})")
return
print(f"\n 🎉 WF 해제: {rehab_dt.strftime('%m-%d %H:%M')} ({shadow_trade_no}번 shadow 거래)")
# ── Phase 2: 실제 진입 (재활 이후) ───────────────
print(f"\n ── 재활 후 실제 진입 ──")
post_prices = [(p, dt) for p, dt in prices if dt >= rehab_dt]
if not post_prices:
print(" 재활 이후 데이터 없음")
return
real_wins = 0
real_total = 0
real_pnl_sum = 0.0
watchlist_dt = None
in_position = False
pos_buy_idx_g = None # global idx in post_prices
pos_buy_price = None
idx2 = 0
while idx2 < len(post_prices):
current_price, current_dt = post_prices[idx2]
if in_position:
is_win, sell_price, sell_dt, reason, pnl = simulate_position(post_prices, pos_buy_idx_g, pos_buy_price)
sell_idx2 = next((i for i, (_, ts) in enumerate(post_prices) if ts >= sell_dt), len(post_prices)-1)
real_total += 1
if is_win:
real_wins += 1
real_pnl_sum += pnl
mark = "" if is_win else ""
print(f" 실제#{real_total}: {pos_buy_price:.4f}{sell_price:.4f}"
f"| {mark} {pnl:+.2f}% | {reason} ({sell_dt.strftime('%m-%d %H:%M')})")
in_position = False
watchlist_dt = None
idx2 = sell_idx2
continue
trend_ok = check_trend(post_prices, idx2)
if trend_ok:
mom_ok = check_momentum(ticker, current_price, current_dt)
else:
mom_ok = False
if trend_ok and mom_ok:
if watchlist_dt is None:
watchlist_dt = current_dt
elif (current_dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
in_position = True
pos_buy_idx_g = idx2
pos_buy_price = current_price
watchlist_dt = None
else:
watchlist_dt = None
idx2 += 1
if real_total == 0:
print(" 재활 후 전략 조건 충족 진입 없음")
else:
wr = real_wins / real_total * 100
print(f"\n 📊 재활 후: {real_total}건 | 승률={wr:.0f}% | 누적={real_pnl_sum:+.2f}%")
return {'trades': real_total, 'wins': real_wins, 'wr': wr, 'pnl': real_pnl_sum}
def main():
conn = get_conn()
cur = conn.cursor()
# price_history 최대 시각
cur.execute("SELECT MAX(recorded_at) FROM price_history")
end_dt = cur.fetchone()[0]
summary = {}
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT traded_at FROM trade_results
WHERE ticker = :t ORDER BY traded_at
""", t=ticker)
rows = cur.fetchall()
wf_dt = rows[4][0]
print(f"\n{'='*62}")
print(f"[{ticker}] WF차단: {wf_dt.strftime('%m-%d %H:%M')}")
print(f"{'='*62}")
r = run_full_sim(cur, ticker, wf_dt, end_dt)
if r:
summary[ticker] = r
print(f"\n{'='*62}")
print("전체 요약 (전략 필터 적용)")
print(f"{'='*62}")
for ticker, r in summary.items():
print(f"{ticker}: {r['trades']}건 | 승률={r['wr']:.0f}% | 누적={r['pnl']:+.2f}%")
conn.close()
if __name__ == "__main__":
main()

229
sim_365.py Normal file
View File

@@ -0,0 +1,229 @@
"""365일 복리 KRW 시뮬레이션.
- 상위 20개 종목 × vol-lead +4.8% 전략
- MAX_POSITIONS=3, 복리 포지션 크기 (이득 시 증가 / 손실 시 차감)
- 최소 예산 = 초기 예산의 30%
- 데이터: Oracle ADB ohlcv_hourly 테이블
"""
import pickle
import sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv(dotenv_path=Path(__file__).parent / ".env")
sys.path.insert(0, str(Path(__file__).parent))
from ohlcv_db import load_from_db
from vol_lead_sim import run_vol_lead_thresh
# ── 파라미터 ───────────────────────────────────────────
BUDGET = 15_000_000
MIN_BUDGET = BUDGET * 3 // 10 # 하한 30% = 4,500,000원
MAX_POS = 3
THRESH = 4.8
FROM_DATE = "2025-03-02"
TOP30_FILE = Path("top30_tickers.pkl")
def load_data() -> dict:
top30 = pickle.load(open(TOP30_FILE, "rb"))
print(f"DB 로드 중... ({len(top30)}종목)")
data = load_from_db(top30, from_date=FROM_DATE)
valid = {t: df for t, df in data.items() if len(df) >= 500}
print(f"유효 종목: {len(valid)}개 로드 완료")
return valid
pickle.dump(data, open(CACHE_FILE, "wb"))
print(f"\n캐시 저장: {CACHE_FILE} ({len(data)}종목)\n")
return data
# ── WF 필터 (종목별 적용) ──────────────────────────────
WF_WINDOW = 2
WF_MIN_WIN_RATE = 0.01
WF_SHADOW_WINS = 2
def apply_wf(trades: list) -> tuple:
"""종목별 WF 필터: 2연패 시 차단, shadow 2연승 시 복귀."""
history = []
shadow_streak = 0
blocked = False
accepted = []
blocked_cnt = 0
for trade in trades:
is_win = int(trade[0])
if not blocked:
accepted.append(trade)
history.append(is_win)
if len(history) >= WF_WINDOW:
wr = sum(history[-WF_WINDOW:]) / WF_WINDOW
if wr < WF_MIN_WIN_RATE:
blocked = True
shadow_streak = 0
else:
blocked_cnt += 1
if is_win:
shadow_streak += 1
if shadow_streak >= WF_SHADOW_WINS:
blocked = False
history = []
shadow_streak = 0
else:
shadow_streak = 0
return accepted, blocked_cnt
# ── MAX_POSITIONS 필터 ─────────────────────────────────
def collect_trades(data: dict, tickers: list) -> list:
all_trades = []
wf_total_blocked = 0
for t in tickers:
if t not in data:
continue
raw = [(is_win, pnl, buy_dt, sell_dt, reason)
for is_win, pnl, buy_dt, sell_dt, reason
in run_vol_lead_thresh(data[t], THRESH)]
filtered, blocked = apply_wf(raw)
wf_total_blocked += blocked
for is_win, pnl, buy_dt, sell_dt, reason in filtered:
all_trades.append((buy_dt, sell_dt, t, is_win, pnl, reason))
print(f" WF 필터 차단: {wf_total_blocked}")
all_trades.sort(key=lambda x: x[0])
return all_trades
def apply_max_positions(all_trades: list) -> tuple:
open_exits, accepted, skipped = [], [], []
for trade in all_trades:
buy_dt, sell_dt = trade[0], trade[1]
open_exits = [s for s in open_exits if s > buy_dt]
if len(open_exits) < MAX_POS:
open_exits.append(sell_dt)
accepted.append(trade)
else:
skipped.append(trade)
return accepted, skipped
# ── 복리 시뮬레이션 ────────────────────────────────────
def simulate(accepted: list) -> dict:
portfolio = float(BUDGET)
total_krw = 0.0
monthly = {}
trade_log = []
for buy_dt, sell_dt, ticker, is_win, pnl, reason in accepted:
pos_size = max(portfolio, MIN_BUDGET) / MAX_POS
krw_profit = pos_size * pnl / 100
portfolio = max(portfolio + krw_profit, MIN_BUDGET)
total_krw += krw_profit
ym = buy_dt.strftime("%Y-%m")
if ym not in monthly:
monthly[ym] = {"trades": 0, "wins": 0, "pnl_krw": 0.0}
monthly[ym]["trades"] += 1
monthly[ym]["wins"] += int(is_win)
monthly[ym]["pnl_krw"] += krw_profit
trade_log.append({
"buy_dt": buy_dt,
"sell_dt": sell_dt,
"ticker": ticker,
"is_win": is_win,
"pnl_pct": pnl,
"pos_size": pos_size,
"krw_profit": krw_profit,
"portfolio": portfolio,
"reason": reason,
})
wins = sum(1 for t in accepted if t[3])
return {
"portfolio": portfolio,
"total_krw": total_krw,
"roi_pct": (portfolio - BUDGET) / BUDGET * 100,
"total": len(accepted),
"wins": wins,
"wr": wins / len(accepted) * 100 if accepted else 0,
"monthly": monthly,
"trade_log": trade_log,
}
# ── 메인 ──────────────────────────────────────────────
def main() -> None:
data = load_data()
top30 = pickle.load(open(TOP30_FILE, "rb"))
valid = [t for t in top30 if t in data and len(data[t]) >= 500]
use20 = valid[:20]
print(f"{'='*65}")
print(f"365일 복리 시뮬레이션 | vol-lead +{THRESH}% | {len(use20)}종목")
print(f"초기 예산: {BUDGET:,}원 | 최소 예산(하한): {MIN_BUDGET:,}")
print(f"기간: {FROM_DATE[:10]} ~ 2026-03-02")
print(f"{'='*65}")
all_trades = collect_trades(data, use20)
accepted, skipped = apply_max_positions(all_trades)
result = simulate(accepted)
print(f"\n── 전체 결과 ──────────────────────────────────────────")
print(f" 신호 발생: {len(all_trades):>4}")
print(f" 실제 진입: {result['total']:>4}건 ({len(skipped)}건 MAX_POS 스킵)")
print(f" 승/패: {result['wins']}{result['total']-result['wins']}패 (승률 {result['wr']:.0f}%)")
print(f" ─────────────────────────────────────────────────")
print(f" 초기 예산: {BUDGET:>14,}")
print(f" 최종 자산: {result['portfolio']:>14,.0f}")
print(f" 순수익: {result['total_krw']:>+14,.0f}")
print(f" 수익률: {result['roi_pct']:>+13.2f}%")
print(f" 연환산: {result['roi_pct']:>+13.2f}% (이미 1년)")
# 최대 낙폭
peak = BUDGET
max_dd = 0.0
for t in result["trade_log"]:
peak = max(peak, t["portfolio"])
dd = (peak - t["portfolio"]) / peak * 100
max_dd = max(max_dd, dd)
print(f" 최대 낙폭: {-max_dd:>+13.2f}% ({-max_dd/100*BUDGET:>+,.0f}원)")
# 월별
print(f"\n── 월별 수익 ──────────────────────────────────────────")
print(f" {'':^8}{'거래':>4} {'승률':>5}{'월수익(KRW)':>14} {'누적수익(KRW)':>15} {'예산':>14}")
print(f" {''*70}")
cum = 0.0
budget_now = float(BUDGET)
for ym, m in sorted(result["monthly"].items()):
wr = m["wins"] / m["trades"] * 100 if m["trades"] else 0
cum += m["pnl_krw"]
budget_now = max(BUDGET + cum, MIN_BUDGET)
print(f" {ym:^8}{m['trades']:>4}{wr:>4.0f}% │ "
f"{m['pnl_krw']:>+14,.0f}{cum:>+14,.0f}{budget_now:>13,.0f}")
# 종목별
print(f"\n── 종목별 기여 ({len(use20)}종목) ──────────────────────────")
print(f" {'종목':<14}{'거래':>4} {'승률':>5}{'KRW수익':>14} {'평균/건':>10}")
print(f" {''*58}")
stats: dict = {}
for t in result["trade_log"]:
k = t["ticker"]
if k not in stats:
stats[k] = {"n": 0, "wins": 0, "krw": 0.0}
stats[k]["n"] += 1
stats[k]["wins"] += int(t["is_win"])
stats[k]["krw"] += t["krw_profit"]
for t, s in sorted(stats.items(), key=lambda x: -x[1]["krw"]):
wr = s["wins"] / s["n"] * 100 if s["n"] else 0
avg = s["krw"] / s["n"] if s["n"] else 0
print(f" {t:<14}{s['n']:>4}{wr:>4.0f}% │ "
f"{s['krw']:>+14,.0f}{avg:>+9,.0f}원/건")
if __name__ == "__main__":
main()

119
ticker_sim.py Normal file
View File

@@ -0,0 +1,119 @@
"""종목 수 확장 시뮬레이션 - 거래량 상위 N개 종목별 vol-lead 전략 비교."""
import os
import pickle
import sys
from pathlib import Path
from dotenv import load_dotenv
load_dotenv()
import pandas as pd
import pyupbit
# vol_lead_sim.py의 공통 파라미터/함수 재사용
sys.path.insert(0, str(Path(__file__).parent))
from vol_lead_sim import (
STOP_LOSS_PCT, TIME_STOP_HOURS, TIME_STOP_MIN_PCT,
FEE, LOCAL_VOL_HOURS, VOL_MULT, PRICE_QUIET_PCT, SIGNAL_TIMEOUT_H,
FROM_DATE, simulate_pos, run_trend, run_vol_lead_thresh,
)
CACHE_FILE = Path("vol_lead_cache_30.pkl")
TOP30_FILE = Path("top30_tickers.pkl")
DAYS = 46.0
def load_data() -> dict:
return pickle.load(open(CACHE_FILE, "rb"))
def run_subset(data: dict, tickers: list, thresh: float) -> dict:
agg = {"total": 0, "wins": 0, "pnl": 0.0, "per_ticker": []}
for t in tickers:
if t not in data:
continue
trades = run_vol_lead_thresh(data[t], thresh)
n = len(trades)
w = sum(1 for x in trades if x[0])
p = sum(x[1] for x in trades)
agg["total"] += n
agg["wins"] += w
agg["pnl"] += p
agg["per_ticker"].append((t, n, w, p))
agg["wr"] = agg["wins"] / agg["total"] * 100 if agg["total"] else 0
return agg
def main() -> None:
data = load_data()
top30 = pickle.load(open(TOP30_FILE, "rb"))
# 데이터 충분한 종목만 (400봉 이상 = 16일 이상)
valid = [t for t in top30 if t in data and len(data[t]) >= 400]
n_max = len(valid)
print(f"유효 종목: {n_max}")
print(f"기간: 46일 (2026-01-15 ~ 2026-03-02)\n")
# ── A 현행 기준선 (9종목) ─────────────────────────
orig9 = ["KRW-DKA","KRW-LAYER","KRW-SIGN","KRW-SOL","KRW-ETH",
"KRW-XRP","KRW-HOLO","KRW-OM","KRW-ORBS"]
orig9_valid = [t for t in orig9 if t in data]
a_agg = {"total": 0, "wins": 0, "pnl": 0.0}
for t in orig9_valid:
trades = run_trend(data[t])
a_agg["total"] += len(trades)
a_agg["wins"] += sum(1 for x in trades if x[0])
a_agg["pnl"] += sum(x[1] for x in trades)
a_wr = a_agg["wins"] / a_agg["total"] * 100 if a_agg["total"] else 0
print(f"[기준: A 현행 9종목] {a_agg['total']}건 | 승률={a_wr:.0f}% | 누적={a_agg['pnl']:+.2f}%\n")
# ── 종목수별 비교 (임계값 4.8% 고정) ────────────────
THRESH = 4.8
subset_ns = [9, 15, 20, n_max]
print(f"임계값 +{THRESH}% | 종목 수 확장 효과")
print(f"{'종목수':>6}{'총거래':>6} {'일평균':>7} {'월환산':>7}{'승률':>5} {'누적PnL':>10}")
print("" * 56)
for n in subset_ns:
s = run_subset(data, valid[:n], THRESH)
pdm = s["total"] / DAYS
pmm = pdm * 30
marker = " ← 현재설정" if n == 9 else ""
print(f"{n:>5}종목 │ {s['total']:>6}{pdm:>6.2f}회/일 {pmm:>6.1f}회/월 │ "
f"{s['wr']:>4.0f}% {s['pnl']:>+9.2f}%{marker}")
# ── 임계값 × 종목수 매트릭스 ─────────────────────
thresholds = [3.6, 4.0, 4.4, 4.8]
col_ns = [9, 15, 20, n_max]
print(f"\n임계값 × 종목수 매트릭스 (건수 / 승률 / 누적PnL)")
col_w = 20
header = f"{'임계값':>6}"
for n in col_ns:
header += f" {f'{n}종목':^{col_w}}"
print(header)
print("" * (10 + col_w * len(col_ns)))
for thresh in thresholds:
row = f"+{thresh:.1f}% │"
for n in col_ns:
s = run_subset(data, valid[:n], thresh)
wr = s["wins"] / s["total"] * 100 if s["total"] else 0
cell = f"{s['total']}{wr:.0f}% {s['pnl']:+.1f}%"
row += f" {cell:<{col_w}}"
print(row)
# ── 전체 종목별 기여도 (4.8%) ────────────────────
print(f"\n종목별 기여도 ({n_max}종목, +4.8%)")
print(f"{'종목':<16} {'거래':>5} {'승률':>6} {'누적PnL':>10} {'평균PnL/거래':>12}")
print("" * 55)
s = run_subset(data, valid, THRESH)
s["per_ticker"].sort(key=lambda x: x[3], reverse=True)
for t, n, w, p in s["per_ticker"]:
wr = w / n * 100 if n else 0
avg = p / n if n else 0
print(f"{t:<16} {n:>5}{wr:>5.0f}% {p:>+9.2f}% {avg:>+10.2f}%/건")
if __name__ == "__main__":
main()

42
trend_check.py Normal file
View File

@@ -0,0 +1,42 @@
import os
from dotenv import load_dotenv
load_dotenv()
import oracledb
conn = oracledb.connect(user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
cur = conn.cursor()
for ticker in ['KRW-DKA', 'KRW-LAYER', 'KRW-SIGN']:
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker = :t AND recorded_at >= TIMESTAMP '2026-02-28 20:00:00'
ORDER BY recorded_at
""", t=ticker)
rows = cur.fetchall()
lookback = 12 # 10분봉 * 12 = 2h
gains = []
for i in range(lookback, len(rows)):
curr = rows[i][0]
past = rows[i - lookback][0]
if past > 0:
gains.append((curr - past) / past * 100)
if not gains:
continue
above_5 = sum(1 for g in gains if g >= 5.0)
above_3 = sum(1 for g in gains if g >= 3.0)
above_0 = sum(1 for g in gains if g >= 0.0)
negative = sum(1 for g in gains if g < 0.0)
print(f"[{ticker}] 2h 등락률 분포 ({len(gains)}개 틱)")
print(f" 평균={sum(gains)/len(gains):+.2f}% 최고={max(gains):+.2f}% 최저={min(gains):+.2f}%")
print(f" +5% 이상(신호): {above_5}건 ({above_5/len(gains)*100:.0f}%)")
print(f" +3%~+5%: {above_3-above_5}건 ({(above_3-above_5)/len(gains)*100:.0f}%)")
print(f" 0%~+3%: {above_0-above_3}건 ({(above_0-above_3)/len(gains)*100:.0f}%)")
print(f" 음전(하락): {negative}건 ({negative/len(gains)*100:.0f}%)")
print()
conn.close()

396
vol_lead_sim.py Normal file
View File

@@ -0,0 +1,396 @@
"""거래량 선행(Volume Lead) 진입 전략 시뮬레이션.
3가지 전략 비교:
A (현행): 12h 가격 +5% 확인 + 1h 거래량 급증 → 진입 (이미 오른 뒤 추격)
B (신규): 가격 횡보 중 거래량 급증(축적) → 그 후 추세 +N% 시작 시 선진입
C (단순): 거래량 급증만 (베이스라인, 노이즈 확인용)
"""
import os
import pickle
import time
from datetime import datetime
from pathlib import Path
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
import pyupbit
# ── 공통 파라미터 ─────────────────────────────────────
STOP_LOSS_PCT = 0.015 # 트레일링 스탑 1.5%
TIME_STOP_HOURS = 8
TIME_STOP_MIN_PCT = 3.0
FEE = 0.0005
LOCAL_VOL_HOURS = 5 # 거래량 기준 이전 N시간
VOL_MULT = 2.0 # 거래량 배수 기준
# 현행 전략 파라미터
TREND_HOURS = 12
TREND_MIN_PCT = 5.0
# B 전략 파라미터: 거래량 선행 + 이후 소규모 추세 확인
PRICE_QUIET_PCT = 2.0 # 거래량 급증 시점 가격 횡보 기준 (2h 변동 < N%)
TREND_AFTER_VOL = 1.5 # 축적 신호 후 진입 기준 (vol 시점 대비 +N% 상승 시)
SIGNAL_TIMEOUT_H = 8 # 축적 신호 후 N시간 내 추세 미발생 시 초기화
FROM_DATE = "2026-01-15 00:00:00"
TICKERS = [
'KRW-DKA', 'KRW-LAYER', 'KRW-SIGN',
'KRW-SOL', 'KRW-ETH', 'KRW-XRP',
'KRW-HOLO', 'KRW-OM', 'KRW-ORBS',
]
CACHE_FILE = Path("vol_lead_cache.pkl")
# ── 데이터 로드 ───────────────────────────────────────
def fetch_all(ticker: str, from_date: str):
"""1h봉 전체 로드 (from_date 이후, 페이지 역방향 수집)."""
target = datetime.strptime(from_date, "%Y-%m-%d %H:%M:%S")
frames = []
to_dt = None
for _ in range(15): # 최대 15페이지 = 3000h ≈ 125일
kwargs: dict = dict(ticker=ticker, interval="minute60", count=200)
if to_dt:
kwargs["to"] = to_dt.strftime("%Y-%m-%d %H:%M:%S")
df = pyupbit.get_ohlcv(**kwargs)
if df is None or df.empty:
break
frames.append(df)
oldest = df.index[0].to_pydatetime().replace(tzinfo=None)
if oldest <= target:
break
to_dt = oldest
time.sleep(0.2)
if not frames:
return None
result = pd.concat(frames).sort_index().drop_duplicates()
result.index = result.index.tz_localize(None)
return result[result.index >= target]
def load_data() -> dict:
if CACHE_FILE.exists():
print(f"캐시 로드: {CACHE_FILE}")
return pickle.load(open(CACHE_FILE, "rb"))
data = {}
for ticker in TICKERS:
print(f" {ticker} 로딩...", end=" ", flush=True)
df = fetch_all(ticker, FROM_DATE)
if df is not None:
data[ticker] = df
print(f"{len(df)}봉 ({df.index[0].strftime('%m-%d')}~{df.index[-1].strftime('%m-%d')})")
else:
print("실패")
time.sleep(0.3)
pickle.dump(data, open(CACHE_FILE, "wb"))
return data
# ── 포지션 시뮬 ───────────────────────────────────────
def simulate_pos(df: pd.DataFrame, buy_idx: int, buy_price: float):
"""매수 후 청산 시뮬레이션.
- 최고가: 각 봉의 high 기준
- 스탑 발동 체크: 각 봉의 low 기준 (intra-candle 포착)
- 청산가: peak × (1 - stop_pct) 근사
"""
buy_dt = df.index[buy_idx]
peak = buy_price
for i in range(buy_idx + 1, len(df)):
row = df.iloc[i]
ts = df.index[i]
if row["high"] > peak:
peak = row["high"]
elapsed_h = (ts - buy_dt).total_seconds() / 3600
stop_price = peak * (1 - STOP_LOSS_PCT)
# 트레일링 스탑 (low가 stop_price 이하 진입 시)
if row["low"] <= stop_price:
sell_price = stop_price
pnl = (sell_price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, sell_price, ts, f"트레일링({pnl:+.1f}%)", pnl
# 타임 스탑
pnl_now = (row["close"] - buy_price) / buy_price * 100
if elapsed_h >= TIME_STOP_HOURS and pnl_now < TIME_STOP_MIN_PCT:
pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, row["close"], ts, "타임스탑", pnl
last = df.iloc[-1]["close"]
pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
return pnl > 0, last, df.index[-1], "데이터종료", pnl
# ── 현행 전략 (추세 확인형) ───────────────────────────
def run_trend(df: pd.DataFrame) -> list:
"""12h 가격 +5% 확인 + 1h 거래량 급증 + 1h 워치리스트."""
trades = []
watchlist_i = None
in_pos = False
buy_idx = buy_price = None
i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2)
while i < len(df):
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, reason))
in_pos = False
watchlist_i = None
i = next_i
continue
close = df.iloc[i]["close"]
past12 = df.iloc[i - TREND_HOURS]["close"]
trend_ok = (close - past12) / past12 * 100 >= TREND_MIN_PCT
vol_recent = df.iloc[i - 1]["volume"]
vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean()
vol_ok = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT
if trend_ok and vol_ok:
if watchlist_i is None:
watchlist_i = i
elif i - watchlist_i >= 1: # 1h 확인
in_pos = True
buy_idx = i
buy_price = close
watchlist_i = None
else:
watchlist_i = None
i += 1
return trades
# ── B 전략: 거래량 선행 + 소규모 추세 확인 ───────────
def run_vol_lead(df: pd.DataFrame) -> list:
"""거래량 급증(축적) 감지 후 소규모 추세 확인 시 선진입.
흐름:
1. 직전 1h 거래량 > 이전 5h 평균 × VOL_MULT AND
2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
→ 축적 신호 기록 (signal_price = 현재가)
2. 신호 후 현재가가 signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입
(현행 +5% 대신 작은 기준으로 더 일찍 진입)
3. SIGNAL_TIMEOUT_H 시간 내 추세 미발생 → 신호 초기화
"""
trades = []
signal_i = None
signal_price = None
in_pos = False
buy_idx = buy_price = None
i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2)
while i < len(df):
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, reason))
in_pos = False
signal_i = None
signal_price = None
i = next_i
continue
close = df.iloc[i]["close"]
close_2h = df.iloc[i - 2]["close"]
quiet = abs(close - close_2h) / close_2h * 100 < PRICE_QUIET_PCT
vol_recent = df.iloc[i - 1]["volume"]
vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean()
vol_spike = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT
# 축적 신호 갱신
if quiet and vol_spike:
if signal_i is None:
signal_i = i
signal_price = close
else:
if signal_i is not None and close < signal_price:
# 가격 하락 → 축적 실패, 초기화
signal_i = None
signal_price = None
# 타임아웃
if signal_i is not None and (i - signal_i) > SIGNAL_TIMEOUT_H:
signal_i = None
signal_price = None
# 진입: 축적 신호 후 가격 +TREND_AFTER_VOL% 이상 상승
if signal_i is not None:
move = (close - signal_price) / signal_price * 100
if move >= TREND_AFTER_VOL:
in_pos = True
buy_idx = i
buy_price = close
signal_i = None
signal_price = None
i += 1
return trades
# ── 결과 출력 ─────────────────────────────────────────
def summarize(label: str, trades: list) -> dict:
if not trades:
print(f" [{label}] 거래 없음")
return {"total": 0, "wins": 0, "wr": 0.0, "pnl": 0.0}
wins = sum(1 for t in trades if t[0])
total = len(trades)
pnl = sum(t[1] for t in trades)
wr = wins / total * 100
print(f" [{label}] {total}건 | 승률={wr:.0f}% ({wins}{total-wins}패) | 누적={pnl:+.2f}%")
for idx, (is_win, p, bdt, sdt, reason) in enumerate(trades, 1):
mark = "" if is_win else ""
print(f" #{idx}: {mark} {p:+.2f}% | {reason}"
f" ({bdt.strftime('%m-%d %H:%M')}{sdt.strftime('%m-%d %H:%M')})")
return {"total": total, "wins": wins, "wr": wr, "pnl": pnl}
def run_vol_lead_thresh(df: pd.DataFrame, thresh: float) -> list:
"""run_vol_lead의 TREND_AFTER_VOL 파라미터를 동적으로 받는 버전."""
trades = []
signal_i = None
signal_price = None
in_pos = False
buy_idx = buy_price = None
i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2)
while i < len(df):
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price)
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
trades.append((is_win, pnl, df.index[buy_idx], sdt, reason))
in_pos = False
signal_i = None
signal_price = None
i = next_i
continue
close = df.iloc[i]["close"]
close_2h = df.iloc[i - 2]["close"]
quiet = abs(close - close_2h) / close_2h * 100 < PRICE_QUIET_PCT
vol_recent = df.iloc[i - 1]["volume"]
vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean()
vol_spike = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT
if quiet and vol_spike:
if signal_i is None:
signal_i = i
signal_price = close
else:
if signal_i is not None and close < signal_price:
signal_i = None
signal_price = None
if signal_i is not None and (i - signal_i) > SIGNAL_TIMEOUT_H:
signal_i = None
signal_price = None
if signal_i is not None:
move = (close - signal_price) / signal_price * 100
if move >= thresh:
in_pos = True
buy_idx = i
buy_price = close
signal_i = None
signal_price = None
i += 1
return trades
def main() -> None:
print("데이터 로딩 중...")
data = load_data()
# ── A 현행 전략 (기준선) ─────────────────────────────
print(f"\n{'='*72}")
print(f"A(현행 12h+5%+거래량) 기준선 | {FROM_DATE[:10]} ~ 현재")
print(f"{'='*72}")
agg_a = {"total": 0, "wins": 0, "pnl": 0.0}
trend_results = {}
for ticker, df in data.items():
t = run_trend(df)
trend_results[ticker] = t
s = {"total": len(t), "wins": sum(1 for x in t if x[0]),
"pnl": sum(x[1] for x in t)}
agg_a["total"] += s["total"]
agg_a["wins"] += s["wins"]
agg_a["pnl"] += s["pnl"]
a_wr = agg_a["wins"] / agg_a["total"] * 100 if agg_a["total"] else 0
print(f"A 합계: {agg_a['total']}건 | 승률={a_wr:.0f}% | 누적={agg_a['pnl']:+.2f}%")
# ── B 전략: TREND_AFTER_VOL 파라미터 스윕 ───────────
THRESHOLDS = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0]
print(f"\n{'='*72}")
print(f"B(거래량→+N% 선진입) 파라미터 스윕")
print(f"{''*72}")
print(f"{'임계값':>6}{'거래':>5} {'승률':>6} {'누적PnL':>10} │ vs A PnL")
print(f"{''*72}")
best = None
for thresh in THRESHOLDS:
agg = {"total": 0, "wins": 0, "pnl": 0.0}
for ticker, df in data.items():
t = run_vol_lead_thresh(df, thresh)
agg["total"] += len(t)
agg["wins"] += sum(1 for x in t if x[0])
agg["pnl"] += sum(x[1] for x in t)
wr = agg["wins"] / agg["total"] * 100 if agg["total"] else 0
diff = agg["pnl"] - agg_a["pnl"]
marker = " ← best" if (best is None or agg["pnl"] > best["pnl"]) else ""
if marker:
best = {**agg, "thresh": thresh, "wr": wr}
print(f"+{thresh:>4.1f}% │ {agg['total']:>5}{wr:>5.0f}% {agg['pnl']:>+9.2f}% │ {diff:>+8.2f}%{marker}")
print(f"{''*72}")
print(f"\n★ 최적 임계값: +{best['thresh']}% → "
f"{best['total']}건 | 승률={best['wr']:.0f}% | 누적={best['pnl']:+.2f}%")
# ── 최적 임계값으로 종목별 상세 출력 ─────────────────
best_thresh = best["thresh"]
print(f"\n{'='*72}")
print(f"★ B(vol→+{best_thresh}%) vs A(12h+5%+vol) 종목별 비교")
print(f"{''*72}")
print(f"{'종목':<14}{'A 현행':^24}{'B +{:.1f}%'.format(best_thresh):^24}")
print(f"{'':14}{'거래':>4} {'승률':>5} {'누적':>9}{'거래':>4} {'승률':>5} {'누적':>9}")
print(f"{''*72}")
for ticker, df in data.items():
t_a = trend_results[ticker]
t_b = run_vol_lead_thresh(df, best_thresh)
wa = sum(1 for x in t_a if x[0])
wb = sum(1 for x in t_b if x[0])
pa = sum(x[1] for x in t_a)
pb = sum(x[1] for x in t_b)
wr_a = wa / len(t_a) * 100 if t_a else 0
wr_b = wb / len(t_b) * 100 if t_b else 0
print(f"{ticker:<14}{len(t_a):>4}{wr_a:>4.0f}% {pa:>+8.2f}% │"
f" {len(t_b):>4}{wr_b:>4.0f}% {pb:>+8.2f}%")
if __name__ == "__main__":
main()

140
wf_cmp.py Normal file
View File

@@ -0,0 +1,140 @@
"""WF 윈도우 크기별 비교 시뮬레이션.
실제 42건 거래를 시간순으로 재생하며
WF_WINDOW 크기(2, 3, 5)에 따라 차단/허용 여부를 시뮬레이션.
차단된 거래 → P&L 0 (진입 안 함)
허용된 거래 → 실제 P&L 반영
"""
import os
from dotenv import load_dotenv
load_dotenv()
import oracledb
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
def simulate_wf(trades, window, min_wr):
"""
trades: [(ticker, is_win, pnl_pct, krw_profit, traded_at), ...] 시간순
window: WF 윈도우 크기
min_wr: 최소 승률 임계값
Returns: 허용된 거래 목록, 차단된 거래 목록, 요약
"""
history = {} # ticker → [bool, ...]
allowed = []
blocked = []
for t in trades:
ticker, is_win, pnl, profit, dt = t
hist = history.get(ticker, [])
# WF 차단 여부 판단
is_blocked = False
if len(hist) >= window:
recent_wr = sum(hist[-window:]) / window
if recent_wr < min_wr:
is_blocked = True
if is_blocked:
blocked.append(t)
else:
allowed.append(t)
# 실제 결과를 이력에 추가
hist = hist + [bool(is_win)]
if len(hist) > window * 2:
hist = hist[-window:]
history[ticker] = hist
total = len(allowed)
wins = sum(1 for t in allowed if t[1])
pnl = sum(t[2] for t in allowed)
profit = sum(t[3] for t in allowed)
return allowed, blocked, {
'total': total, 'wins': wins,
'wr': wins/total*100 if total else 0,
'pnl': pnl, 'profit': profit,
'blocked_count': len(blocked),
}
def main():
conn = get_conn()
cur = conn.cursor()
# 전체 거래 시간순 로드
cur.execute("""
SELECT ticker, is_win, pnl_pct, NVL(krw_profit,0), traded_at
FROM trade_results
ORDER BY traded_at
""")
trades = cur.fetchall()
print(f"전체 거래: {len(trades)}\n")
configs = [
(2, 0.5, "WF=2 (2연패→차단, 1승→해제)"),
(3, 0.34, "WF=3 (3건중 1승 이상 필요)"),
(5, 0.40, "WF=5 (5건중 2승 이상, 현행)"),
]
results = []
for window, min_wr, label in configs:
allowed, blocked, stats = simulate_wf(trades, window, min_wr)
stats['label'] = label
stats['window'] = window
results.append((label, allowed, blocked, stats))
print(f"[{label}]")
print(f" 허용: {stats['total']}건 | 승률={stats['wr']:.1f}% | "
f"누적수익={stats['profit']:+,.0f}원 | 차단={stats['blocked_count']}")
# 차단된 거래 상세
if blocked:
print(f" 차단된 거래:")
blocked_by_ticker = {}
for t in blocked:
blocked_by_ticker.setdefault(t[0], []).append(t)
for ticker, ts in blocked_by_ticker.items():
pnls = [f"{t[2]:+.1f}%" for t in ts]
print(f" {ticker}: {len(ts)}{pnls}")
print()
# 상세 비교표: 거래별 허용/차단 여부
print("=" * 70)
print(f"{'날짜':>12} {'종목':>12} {'결과':>6} {'PnL':>8}"
f"{'WF=2':>6} {'WF=3':>6} {'WF=5':>6}")
print("" * 70)
# 각 설정별 허용 set 구성 (traded_at + ticker로 식별)
allowed_sets = []
for _, allowed, _, _ in results:
allowed_sets.append(set((t[0], t[4]) for t in allowed))
for t in trades:
ticker, is_win, pnl, profit, dt = t
win_mark = "" if is_win else ""
cols = []
for aset in allowed_sets:
if (ticker, dt) in aset:
cols.append("허용")
else:
cols.append("🔴차단")
print(f"{dt.strftime('%m-%d %H:%M'):>12} {ticker:>12} {win_mark:>4} {pnl:>+7.1f}% │ "
f"{cols[0]:>6} {cols[1]:>6} {cols[2]:>6}")
# 최종 요약
print("\n" + "=" * 70)
print(f"{'설정':<35} {'거래':>5} {'승률':>7} {'KRW수익':>12} {'차단':>5}")
print("" * 70)
for label, _, _, s in results:
print(f"{label:<35} {s['total']:>5}{s['wr']:>6.1f}% {s['profit']:>+12,.0f}{s['blocked_count']:>4}건 차단")
conn.close()
if __name__ == "__main__":
main()

211
wf_cmp2.py Normal file
View File

@@ -0,0 +1,211 @@
"""WF 윈도우 비교 시뮬레이션 v2 - 실거래 + 이후 시뮬 거래 통합.
Phase 1: 실제 42건 거래를 WF 설정별로 허용/차단 재생
Phase 2: 마지막 실거래 이후 price_history 기반 신호로 추가 거래 시뮬
(추세 2h+5% + 15분 워치리스트, 모멘텀은 API 한계로 생략)
→ WF 상태는 Phase1에서 이어짐
비교 설정:
A: WF=2 (min_wr=0.0, 즉 2연패시만 차단 — last2=[L,L]이면 차단)
B: WF=3 (min_wr=0.34)
C: WF=5 현행 (min_wr=0.40)
D: WF 없음
"""
import os, time
from datetime import datetime
from dotenv import load_dotenv
load_dotenv()
import oracledb
STOP_LOSS_PCT = float(os.getenv("STOP_LOSS_PCT", "1.5")) / 100
TIME_STOP_HOURS = int(os.getenv("TIME_STOP_HOURS", "8"))
TIME_STOP_MIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3")) / 100
TREND_MIN_PCT = 5.0
CONFIRM_MINUTES = 15
FEE = 0.0005
def get_conn():
return oracledb.connect(
user=os.getenv('ORACLE_USER'), password=os.getenv('ORACLE_PASSWORD'),
dsn=os.getenv('ORACLE_DSN'), config_dir=os.getenv('ORACLE_WALLET'))
# ── WF 판단 ───────────────────────────────────────────
def is_wf_blocked(hist, window, min_wr):
if window == 0: return False
if len(hist) < window: return False
wr = sum(hist[-window:]) / window
return wr < min_wr
# ── 추세 체크 (price_history 기반) ────────────────────
def check_trend(prices, idx):
lb = 12 # 2h = 12 * 10분봉
if idx < lb: return False
curr, past = prices[idx][0], prices[idx-lb][0]
return past > 0 and (curr-past)/past*100 >= TREND_MIN_PCT
# ── 포지션 시뮬 ───────────────────────────────────────
def simulate_pos(prices, buy_idx, buy_price):
buy_dt = prices[buy_idx][1]
peak = buy_price
for price, ts in prices[buy_idx+1:]:
if price > peak: peak = price
elapsed_h = (ts - buy_dt).total_seconds() / 3600
pnl = (price - buy_price) / buy_price
if (peak - price) / peak >= STOP_LOSS_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net>0, price, ts, f"트레일링({pnl*100:+.1f}%)", net
if elapsed_h >= TIME_STOP_HOURS and pnl < TIME_STOP_MIN_PCT:
net = (price*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net>0, price, ts, "타임스탑", net
lp, lt = prices[-1]
net = (lp*(1-FEE) - buy_price*(1+FEE)) / (buy_price*(1+FEE)) * 100
return net>0, lp, lt, "데이터종료", net
# ── Phase1: 실거래 재생 ───────────────────────────────
def phase1(real_trades, window, min_wr):
"""42건 실거래 재생. Returns (허용목록, 차단목록, history_per_ticker)"""
history = {}
allowed = []
blocked = []
for t in real_trades:
ticker, is_win, pnl, profit, dt = t
hist = history.get(ticker, [])
if is_wf_blocked(hist, window, min_wr):
blocked.append(('block', ticker, is_win, pnl, profit, dt))
else:
allowed.append(('real', ticker, is_win, pnl, profit, dt))
hist = hist + [bool(is_win)]
if window > 0 and len(hist) > window * 2:
hist = hist[-window:]
history[ticker] = hist
return allowed, blocked, history
# ── Phase2: price_history 신호 시뮬 ──────────────────
def phase2(cur, history, real_last_dt, window, min_wr):
"""실거래 종료 이후 price_history 기반 신호 시뮬레이션."""
# 스캔 대상: 실거래에 등장한 종목 전체
tickers = list(history.keys()) if history else []
# 실거래 후 WF 해제 가능한 종목만
# (차단됐어도 shadow 없이는 해제 불가 → 차단 상태 종목 제외)
active_tickers = []
for ticker in tickers:
hist = history.get(ticker, [])
if not is_wf_blocked(hist, window, min_wr):
active_tickers.append(ticker)
if not active_tickers:
return [], history
sim_trades = []
for ticker in active_tickers:
cur.execute("""
SELECT price, recorded_at FROM price_history
WHERE ticker=:t AND recorded_at > :dt
ORDER BY recorded_at
""", t=ticker, dt=real_last_dt)
prices = cur.fetchall()
if len(prices) < 13: continue
hist = list(history.get(ticker, []))
watchlist_dt = None
in_pos = False
buy_idx = buy_price = None
idx = 0
while idx < len(prices):
price, dt = prices[idx]
if in_pos:
is_win, sp, sdt, reason, pnl = simulate_pos(prices, buy_idx, buy_price)
next_idx = next((i for i,(_, ts) in enumerate(prices) if ts > sdt), len(prices))
profit = pnl * 3333333 / 100 # 포지션당 예산 기준 근사
sim_trades.append(('sim', ticker, is_win, pnl, profit, dt))
hist = hist + [bool(is_win)]
if window > 0 and len(hist) > window * 2:
hist = hist[-window:]
history[ticker] = hist
in_pos = False
watchlist_dt = None
idx = next_idx
continue
if is_wf_blocked(hist, window, min_wr):
idx += 1
continue
trend_ok = check_trend(prices, idx)
if trend_ok:
if watchlist_dt is None:
watchlist_dt = dt
elif (dt - watchlist_dt).total_seconds() >= CONFIRM_MINUTES * 60:
in_pos = True
buy_idx = idx
buy_price = price
watchlist_dt = None
else:
watchlist_dt = None
idx += 1
return sim_trades, history
# ── 요약 출력 ─────────────────────────────────────────
def print_summary(label, p1_allowed, p1_blocked, p2_trades):
all_trades = p1_allowed + p2_trades
total = len(all_trades)
wins = sum(1 for t in all_trades if t[2])
pnl = sum(t[4] for t in all_trades)
wr = wins/total*100 if total else 0
blk = len(p1_blocked)
p2_cnt = len(p2_trades)
p2_win = sum(1 for t in p2_trades if t[2])
print(f"\n[{label}]")
print(f" 실거래 허용: {len(p1_allowed)}건 | 차단: {blk}")
print(f" 추가 시뮬: {p2_cnt}건 ({p2_win}승)")
print(f" ─────────────────────────────────────")
print(f" 합계: {total}건 | 승률={wr:.1f}% | KRW={pnl:+,.0f}")
return {'label': label, 'total': total, 'wins': wins, 'wr': wr, 'pnl': pnl,
'blk': blk, 'p2': p2_cnt}
def main():
conn = get_conn()
cur = conn.cursor()
cur.execute("""
SELECT ticker, is_win, pnl_pct, NVL(krw_profit,0), traded_at
FROM trade_results ORDER BY traded_at
""")
real_trades = cur.fetchall()
real_last_dt = real_trades[-1][4]
print(f"실거래: {len(real_trades)}건 (마지막: {real_last_dt.strftime('%m-%d %H:%M')})")
cur.execute("SELECT MAX(recorded_at) FROM price_history")
ph_last = cur.fetchone()[0]
print(f"price_history 끝: {ph_last.strftime('%m-%d %H:%M')}\n")
configs = [
(2, 0.01, "WF=2 (2연패→차단)"),
(3, 0.34, "WF=3"),
(5, 0.40, "WF=5 현행"),
(0, 0.00, "WF없음"),
]
summary = []
for window, min_wr, label in configs:
p1_allowed, p1_blocked, history = phase1(real_trades, window, min_wr)
p2_trades, _ = phase2(cur, history, real_last_dt, window, min_wr)
s = print_summary(label, p1_allowed, p1_blocked, p2_trades)
summary.append(s)
print(f"\n{'='*62}")
print(f"{'설정':<22} {'허용':>5} {'차단':>5} {'추가시뮬':>8} {'승률':>7} {'KRW수익':>13}")
print(f"{''*62}")
for s in summary:
print(f"{s['label']:<22} {s['total']-s['p2']:>5}{s['blk']:>5}"
f"{s['p2']:>6}{s['wr']:>6.1f}% {s['pnl']:>+12,.0f}")
conn.close()
if __name__ == "__main__":
main()