feat: replace trend strategy with volume-lead accumulation strategy
- strategy.py: rewrite should_buy() with volume-lead logic - detect accumulation: vol spike + 2h quiet price → record signal_price - entry: price rises ≥ TREND_AFTER_VOL% from signal_price - signal reset: timeout (8h) or price drops below signal_price - .env: add PRICE_QUIET_PCT=2.0, TREND_AFTER_VOL=4.8, SIGNAL_TIMEOUT_H=8.0 - vol_lead_sim.py: add parameter sweep 0.5~5.0% + fine sweep 4.0~5.0% Backtest result (9 tickers, 2026-01-15~): +4.8% threshold 26 trades | 69% win rate | +73.38% cumulative (vs A 33 trades 45% +24.25%) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
218
core/strategy.py
218
core/strategy.py
@@ -1,8 +1,11 @@
|
||||
"""Strategy C: 현재 기준 N시간 전 대비 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호.
|
||||
"""Volume Lead 전략: 거래량 축적(급증+횡보) 감지 후 +TREND_AFTER_VOL% 상승 시 선진입.
|
||||
|
||||
추가 필터:
|
||||
1. 6h 추세 확인 (단기 급등 아닌 지속 추세)
|
||||
2. 15분 확인 워치리스트 (신호 첫 발생 후 15분 내 재확인 시 진입)
|
||||
흐름:
|
||||
1. 직전 1h 거래량 > 로컬 5h 평균 × VOL_MULT AND
|
||||
2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
|
||||
→ 신호가(signal_price) 기록
|
||||
2. signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입
|
||||
3. SIGNAL_TIMEOUT_H 내 임계값 미달 또는 신호가 이하 하락 시 신호 초기화
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
@@ -12,174 +15,127 @@ import os
|
||||
import time
|
||||
|
||||
import pyupbit
|
||||
from .market import get_current_price, get_ohlcv
|
||||
|
||||
from .market import get_current_price
|
||||
from .market_regime import get_regime
|
||||
from .price_db import get_price_n_hours_ago
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# 추세 판단: 현재 기준 N시간 전 DB 가격 대비 +M% 이상이면 상승 중
|
||||
TREND_HOURS = float(os.getenv("TREND_HOURS", "12"))
|
||||
TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # 레짐이 없을 때 기본값
|
||||
# 축적 감지 파라미터
|
||||
PRICE_QUIET_PCT = float(os.getenv("PRICE_QUIET_PCT", "2.0")) # 2h 횡보 기준 (%)
|
||||
TREND_AFTER_VOL = float(os.getenv("TREND_AFTER_VOL", "5.0")) # 진입 임계값 (신호가 대비 %)
|
||||
SIGNAL_TIMEOUT_H = float(os.getenv("SIGNAL_TIMEOUT_H", "8.0")) # 신호 유효 시간 (h)
|
||||
|
||||
# 6h 단기 추세 최소 상승률 (추세 지속형 필터)
|
||||
TREND_6H_MIN_PCT = float(os.getenv("TREND_6H_MIN_PCT", "1.0"))
|
||||
|
||||
# 모멘텀: MA 기간, 거래량 급증 배수
|
||||
MA_PERIOD = 20
|
||||
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) # 레짐이 없을 때 기본값
|
||||
# 거래량 파라미터
|
||||
LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h)
|
||||
VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0"))
|
||||
|
||||
# 15분 확인 워치리스트: 신호 첫 발생 시각(unix ts) 기록
|
||||
CONFIRM_SECONDS = int(os.getenv("CONFIRM_SECONDS", "900")) # 기본 15분
|
||||
_watchlist: dict[str, float] = {} # ticker → first_signal_time (unix timestamp)
|
||||
# 축적 신호 상태: ticker → {"price": float, "time": float(unix)}
|
||||
_accum_signals: dict[str, dict] = {}
|
||||
|
||||
|
||||
def check_trend(ticker: str, min_gain_pct: float) -> bool:
|
||||
"""상승 추세 조건: 현재가가 DB에 저장된 N시간 전 가격 대비 +min_gain_pct% 이상."""
|
||||
past_price = get_price_n_hours_ago(ticker, TREND_HOURS)
|
||||
if past_price is None:
|
||||
logger.debug(f"[추세] {ticker} {TREND_HOURS:.0f}h 전 가격 없음 (수집 중)")
|
||||
return False
|
||||
|
||||
current = get_current_price(ticker)
|
||||
if not current:
|
||||
return False
|
||||
|
||||
gain_pct = (current - past_price) / past_price * 100
|
||||
result = gain_pct >= min_gain_pct
|
||||
|
||||
if result:
|
||||
logger.info(
|
||||
f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.2f} "
|
||||
f"현재={current:,.2f} (+{gain_pct:.1f}% ≥ {min_gain_pct}%)"
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={min_gain_pct:+.0f}%)"
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def check_momentum(ticker: str, vol_mult: float) -> bool:
|
||||
"""모멘텀 조건: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 × vol_mult.
|
||||
|
||||
23h 평균은 낮 시간대 고거래량이 포함돼 새벽에 항상 미달하므로,
|
||||
로컬 5h 평균(같은 시간대 컨텍스트)과 비교한다.
|
||||
"""
|
||||
# MA20: 일봉 기준
|
||||
df_daily = get_ohlcv(ticker, count=MA_PERIOD + 1)
|
||||
if df_daily is None or len(df_daily) < MA_PERIOD + 1:
|
||||
return False
|
||||
|
||||
ma = df_daily["close"].iloc[-MA_PERIOD:].mean()
|
||||
current = get_current_price(ticker)
|
||||
if current is None:
|
||||
return False
|
||||
|
||||
price_ok = current > ma
|
||||
if not price_ok:
|
||||
logger.debug(f"[모멘텀✗] {ticker} 현재={current:,.0f} < MA20={ma:,.0f} (가격 기준 미달)")
|
||||
return False
|
||||
|
||||
# 거래량: 60분봉 기준 (최근 1h vs 이전 LOCAL_VOL_HOURS h 로컬 평균)
|
||||
fetch_count = LOCAL_VOL_HOURS + 3 # 여유 있게 fetch
|
||||
def _check_vol_spike(ticker: str, vol_mult: float) -> bool:
|
||||
"""직전 완성 1h 거래량이 로컬 5h 평균의 vol_mult 배 이상인지 확인."""
|
||||
fetch_count = LOCAL_VOL_HOURS + 3
|
||||
try:
|
||||
df_hour = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
|
||||
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
|
||||
except Exception:
|
||||
return False
|
||||
if df_hour is None or len(df_hour) < LOCAL_VOL_HOURS + 1:
|
||||
if df is None or len(df) < LOCAL_VOL_HOURS + 1:
|
||||
return False
|
||||
|
||||
recent_vol = df_hour["volume"].iloc[-2] # 직전 완성된 1h 봉
|
||||
local_avg = df_hour["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 LOCAL_VOL_HOURS h 평균
|
||||
vol_ok = local_avg > 0 and recent_vol >= local_avg * vol_mult
|
||||
|
||||
ratio = recent_vol / local_avg if local_avg > 0 else 0
|
||||
if vol_ok:
|
||||
logger.info(
|
||||
f"[모멘텀↑] {ticker} 현재={current:,.0f} MA20={ma:,.0f} "
|
||||
f"1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"[모멘텀✗] {ticker} 1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} "
|
||||
f"({ratio:.2f}x < {vol_mult}x)"
|
||||
)
|
||||
return vol_ok
|
||||
|
||||
|
||||
def check_trend_6h(ticker: str) -> bool:
|
||||
"""6h 추세 지속 확인: 6h 전 대비 +TREND_6H_MIN_PCT% 이상 상승 중이어야 진입 허용."""
|
||||
past = get_price_n_hours_ago(ticker, 6)
|
||||
if past is None:
|
||||
logger.debug(f"[6h추세] {ticker} 6h 전 가격 없음 (수집 중)")
|
||||
return True # 데이터 없으면 필터 패스 (수집 초기)
|
||||
|
||||
current = get_current_price(ticker)
|
||||
if not current:
|
||||
recent_vol = df["volume"].iloc[-2] # 직전 완성된 1h 봉
|
||||
local_avg = df["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 5h 평균
|
||||
if local_avg <= 0:
|
||||
return False
|
||||
|
||||
gain_pct = (current - past) / past * 100
|
||||
result = gain_pct >= TREND_6H_MIN_PCT
|
||||
|
||||
ratio = recent_vol / local_avg
|
||||
result = ratio >= vol_mult
|
||||
if result:
|
||||
logger.debug(
|
||||
f"[6h추세↑] {ticker} 6h전={past:,.2f} 현재={current:,.2f} "
|
||||
f"(+{gain_pct:.1f}% ≥ {TREND_6H_MIN_PCT}%)"
|
||||
f"[거래량↑] {ticker} 1h={recent_vol:.0f} / 5h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
|
||||
)
|
||||
else:
|
||||
logger.debug(
|
||||
f"[6h추세✗] {ticker} 6h {gain_pct:+.1f}% (기준={TREND_6H_MIN_PCT:+.1f}%)"
|
||||
f"[거래량✗] {ticker} {ratio:.2f}x < {vol_mult}x"
|
||||
)
|
||||
return result
|
||||
|
||||
|
||||
def should_buy(ticker: str) -> bool:
|
||||
"""Strategy C + 시장 레짐 + 추세 지속형 진입 (6h 추세 + 15분 확인 워치리스트).
|
||||
"""Volume Lead 전략.
|
||||
|
||||
진입 조건:
|
||||
1. 12h 추세 ≥ 레짐별 임계값 (bull 3% / neutral 5% / bear 8%)
|
||||
2. 6h 추세 ≥ 1% (단기 급등이 아닌 지속 추세)
|
||||
3. 모멘텀 (MA20 초과 + 1h 거래량 급증)
|
||||
4. 위 조건 최초 충족 후 15분 경과 시 실제 진입 (확인 필터)
|
||||
1단계: 거래량 급증 + 2h 횡보 → 신호가 기록
|
||||
2단계: 신호가 대비 +TREND_AFTER_VOL% 상승 확인 시 진입
|
||||
"""
|
||||
regime = get_regime()
|
||||
trend_pct = regime["trend_pct"]
|
||||
vol_mult = regime["vol_mult"]
|
||||
|
||||
# 조건 평가 (순서: 가장 빠른 필터 먼저)
|
||||
if not check_trend(ticker, trend_pct):
|
||||
_watchlist.pop(ticker, None) # 조건 깨지면 워치리스트 초기화
|
||||
current = get_current_price(ticker)
|
||||
if not current:
|
||||
return False
|
||||
|
||||
if not check_trend_6h(ticker):
|
||||
_watchlist.pop(ticker, None)
|
||||
return False
|
||||
|
||||
if not check_momentum(ticker, vol_mult):
|
||||
_watchlist.pop(ticker, None)
|
||||
return False
|
||||
|
||||
# 모든 조건 충족 — 15분 확인 워치리스트 처리
|
||||
now = time.time()
|
||||
if ticker not in _watchlist:
|
||||
_watchlist[ticker] = now
|
||||
logger.info(
|
||||
f"[워치] {ticker} 신호 첫 발생 → {CONFIRM_SECONDS//60}분 후 진입 예정"
|
||||
)
|
||||
|
||||
# ── 기존 신호 유효성 확인 ────────────────────────────────
|
||||
sig = _accum_signals.get(ticker)
|
||||
if sig is not None:
|
||||
age_h = (now - sig["time"]) / 3600
|
||||
if age_h > SIGNAL_TIMEOUT_H:
|
||||
del _accum_signals[ticker]
|
||||
sig = None
|
||||
logger.debug(f"[축적타임아웃] {ticker} {age_h:.1f}h 경과 → 신호 초기화")
|
||||
|
||||
# ── 신호 없음: 축적 조건 체크 ────────────────────────────
|
||||
if sig is None:
|
||||
# 2h 가격 횡보 확인 (DB 가격 활용)
|
||||
price_2h = get_price_n_hours_ago(ticker, 2)
|
||||
if price_2h is None:
|
||||
return False
|
||||
|
||||
elapsed = now - _watchlist[ticker]
|
||||
if elapsed < CONFIRM_SECONDS:
|
||||
quiet = abs(current - price_2h) / price_2h * 100 < PRICE_QUIET_PCT
|
||||
|
||||
if not quiet:
|
||||
logger.debug(
|
||||
f"[워치] {ticker} 확인 대기 중 ({elapsed/60:.1f}분 / {CONFIRM_SECONDS//60}분)"
|
||||
f"[횡보✗] {ticker} 2h변동={(current - price_2h) / price_2h * 100:+.1f}% "
|
||||
f"(기준={PRICE_QUIET_PCT}%)"
|
||||
)
|
||||
return False
|
||||
|
||||
# 15분 경과 → 진입 확정
|
||||
_watchlist.pop(ticker, None)
|
||||
# 거래량 급증 확인
|
||||
if not _check_vol_spike(ticker, vol_mult):
|
||||
return False
|
||||
|
||||
# 축적 신호 기록
|
||||
_accum_signals[ticker] = {"price": current, "time": now}
|
||||
logger.info(
|
||||
f"[워치확인] {ticker} {elapsed/60:.1f}분 경과 → 진입 확정"
|
||||
f"[축적감지] {ticker} 거래량 급증 + 2h 횡보 → 신호가={current:,.2f}원"
|
||||
)
|
||||
return False # 신호 첫 발생 시는 진입 안 함
|
||||
|
||||
# ── 신호 있음: 상승 확인 → 진입 ─────────────────────────
|
||||
signal_price = sig["price"]
|
||||
move_pct = (current - signal_price) / signal_price * 100
|
||||
|
||||
if current < signal_price:
|
||||
# 신호가 이하 하락 → 축적 실패
|
||||
del _accum_signals[ticker]
|
||||
logger.debug(
|
||||
f"[축적실패] {ticker} 신호가={signal_price:,.2f} → 현재={current:,.2f} (하락) → 초기화"
|
||||
)
|
||||
return False
|
||||
|
||||
if move_pct >= TREND_AFTER_VOL:
|
||||
del _accum_signals[ticker]
|
||||
logger.info(
|
||||
f"[축적진입] {ticker} 신호가={signal_price:,.2f}원 → 현재={current:,.2f}원 "
|
||||
f"(+{move_pct:.1f}% ≥ {TREND_AFTER_VOL}%)"
|
||||
)
|
||||
return True
|
||||
|
||||
logger.debug(
|
||||
f"[축적대기] {ticker} 신호가={signal_price:,.2f} 현재={current:,.2f} "
|
||||
f"(+{move_pct:.1f}% / 목표={TREND_AFTER_VOL}%)"
|
||||
)
|
||||
return False
|
||||
|
||||
396
vol_lead_sim.py
Normal file
396
vol_lead_sim.py
Normal file
@@ -0,0 +1,396 @@
|
||||
"""거래량 선행(Volume Lead) 진입 전략 시뮬레이션.
|
||||
|
||||
3가지 전략 비교:
|
||||
A (현행): 12h 가격 +5% 확인 + 1h 거래량 급증 → 진입 (이미 오른 뒤 추격)
|
||||
B (신규): 가격 횡보 중 거래량 급증(축적) → 그 후 추세 +N% 시작 시 선진입
|
||||
C (단순): 거래량 급증만 (베이스라인, 노이즈 확인용)
|
||||
"""
|
||||
|
||||
import os
|
||||
import pickle
|
||||
import time
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
import pandas as pd
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
import pyupbit
|
||||
|
||||
# ── 공통 파라미터 ─────────────────────────────────────
|
||||
STOP_LOSS_PCT = 0.015 # 트레일링 스탑 1.5%
|
||||
TIME_STOP_HOURS = 8
|
||||
TIME_STOP_MIN_PCT = 3.0
|
||||
FEE = 0.0005
|
||||
LOCAL_VOL_HOURS = 5 # 거래량 기준 이전 N시간
|
||||
VOL_MULT = 2.0 # 거래량 배수 기준
|
||||
|
||||
# 현행 전략 파라미터
|
||||
TREND_HOURS = 12
|
||||
TREND_MIN_PCT = 5.0
|
||||
|
||||
# B 전략 파라미터: 거래량 선행 + 이후 소규모 추세 확인
|
||||
PRICE_QUIET_PCT = 2.0 # 거래량 급증 시점 가격 횡보 기준 (2h 변동 < N%)
|
||||
TREND_AFTER_VOL = 1.5 # 축적 신호 후 진입 기준 (vol 시점 대비 +N% 상승 시)
|
||||
SIGNAL_TIMEOUT_H = 8 # 축적 신호 후 N시간 내 추세 미발생 시 초기화
|
||||
|
||||
FROM_DATE = "2026-01-15 00:00:00"
|
||||
|
||||
TICKERS = [
|
||||
'KRW-DKA', 'KRW-LAYER', 'KRW-SIGN',
|
||||
'KRW-SOL', 'KRW-ETH', 'KRW-XRP',
|
||||
'KRW-HOLO', 'KRW-OM', 'KRW-ORBS',
|
||||
]
|
||||
|
||||
CACHE_FILE = Path("vol_lead_cache.pkl")
|
||||
|
||||
|
||||
# ── 데이터 로드 ───────────────────────────────────────
|
||||
def fetch_all(ticker: str, from_date: str):
|
||||
"""1h봉 전체 로드 (from_date 이후, 페이지 역방향 수집)."""
|
||||
target = datetime.strptime(from_date, "%Y-%m-%d %H:%M:%S")
|
||||
frames = []
|
||||
to_dt = None
|
||||
|
||||
for _ in range(15): # 최대 15페이지 = 3000h ≈ 125일
|
||||
kwargs: dict = dict(ticker=ticker, interval="minute60", count=200)
|
||||
if to_dt:
|
||||
kwargs["to"] = to_dt.strftime("%Y-%m-%d %H:%M:%S")
|
||||
|
||||
df = pyupbit.get_ohlcv(**kwargs)
|
||||
if df is None or df.empty:
|
||||
break
|
||||
|
||||
frames.append(df)
|
||||
oldest = df.index[0].to_pydatetime().replace(tzinfo=None)
|
||||
if oldest <= target:
|
||||
break
|
||||
to_dt = oldest
|
||||
time.sleep(0.2)
|
||||
|
||||
if not frames:
|
||||
return None
|
||||
|
||||
result = pd.concat(frames).sort_index().drop_duplicates()
|
||||
result.index = result.index.tz_localize(None)
|
||||
return result[result.index >= target]
|
||||
|
||||
|
||||
def load_data() -> dict:
|
||||
if CACHE_FILE.exists():
|
||||
print(f"캐시 로드: {CACHE_FILE}")
|
||||
return pickle.load(open(CACHE_FILE, "rb"))
|
||||
|
||||
data = {}
|
||||
for ticker in TICKERS:
|
||||
print(f" {ticker} 로딩...", end=" ", flush=True)
|
||||
df = fetch_all(ticker, FROM_DATE)
|
||||
if df is not None:
|
||||
data[ticker] = df
|
||||
print(f"{len(df)}봉 ({df.index[0].strftime('%m-%d')}~{df.index[-1].strftime('%m-%d')})")
|
||||
else:
|
||||
print("실패")
|
||||
time.sleep(0.3)
|
||||
|
||||
pickle.dump(data, open(CACHE_FILE, "wb"))
|
||||
return data
|
||||
|
||||
|
||||
# ── 포지션 시뮬 ───────────────────────────────────────
|
||||
def simulate_pos(df: pd.DataFrame, buy_idx: int, buy_price: float):
|
||||
"""매수 후 청산 시뮬레이션.
|
||||
|
||||
- 최고가: 각 봉의 high 기준
|
||||
- 스탑 발동 체크: 각 봉의 low 기준 (intra-candle 포착)
|
||||
- 청산가: peak × (1 - stop_pct) 근사
|
||||
"""
|
||||
buy_dt = df.index[buy_idx]
|
||||
peak = buy_price
|
||||
|
||||
for i in range(buy_idx + 1, len(df)):
|
||||
row = df.iloc[i]
|
||||
ts = df.index[i]
|
||||
|
||||
if row["high"] > peak:
|
||||
peak = row["high"]
|
||||
|
||||
elapsed_h = (ts - buy_dt).total_seconds() / 3600
|
||||
stop_price = peak * (1 - STOP_LOSS_PCT)
|
||||
|
||||
# 트레일링 스탑 (low가 stop_price 이하 진입 시)
|
||||
if row["low"] <= stop_price:
|
||||
sell_price = stop_price
|
||||
pnl = (sell_price * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
|
||||
return pnl > 0, sell_price, ts, f"트레일링({pnl:+.1f}%)", pnl
|
||||
|
||||
# 타임 스탑
|
||||
pnl_now = (row["close"] - buy_price) / buy_price * 100
|
||||
if elapsed_h >= TIME_STOP_HOURS and pnl_now < TIME_STOP_MIN_PCT:
|
||||
pnl = (row["close"] * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
|
||||
return pnl > 0, row["close"], ts, "타임스탑", pnl
|
||||
|
||||
last = df.iloc[-1]["close"]
|
||||
pnl = (last * (1 - FEE) - buy_price * (1 + FEE)) / (buy_price * (1 + FEE)) * 100
|
||||
return pnl > 0, last, df.index[-1], "데이터종료", pnl
|
||||
|
||||
|
||||
# ── 현행 전략 (추세 확인형) ───────────────────────────
|
||||
def run_trend(df: pd.DataFrame) -> list:
|
||||
"""12h 가격 +5% 확인 + 1h 거래량 급증 + 1h 워치리스트."""
|
||||
trades = []
|
||||
watchlist_i = None
|
||||
in_pos = False
|
||||
buy_idx = buy_price = None
|
||||
i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2)
|
||||
|
||||
while i < len(df):
|
||||
if in_pos:
|
||||
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price)
|
||||
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
|
||||
trades.append((is_win, pnl, df.index[buy_idx], sdt, reason))
|
||||
in_pos = False
|
||||
watchlist_i = None
|
||||
i = next_i
|
||||
continue
|
||||
|
||||
close = df.iloc[i]["close"]
|
||||
past12 = df.iloc[i - TREND_HOURS]["close"]
|
||||
trend_ok = (close - past12) / past12 * 100 >= TREND_MIN_PCT
|
||||
|
||||
vol_recent = df.iloc[i - 1]["volume"]
|
||||
vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean()
|
||||
vol_ok = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT
|
||||
|
||||
if trend_ok and vol_ok:
|
||||
if watchlist_i is None:
|
||||
watchlist_i = i
|
||||
elif i - watchlist_i >= 1: # 1h 확인
|
||||
in_pos = True
|
||||
buy_idx = i
|
||||
buy_price = close
|
||||
watchlist_i = None
|
||||
else:
|
||||
watchlist_i = None
|
||||
|
||||
i += 1
|
||||
|
||||
return trades
|
||||
|
||||
|
||||
# ── B 전략: 거래량 선행 + 소규모 추세 확인 ───────────
|
||||
def run_vol_lead(df: pd.DataFrame) -> list:
|
||||
"""거래량 급증(축적) 감지 후 소규모 추세 확인 시 선진입.
|
||||
|
||||
흐름:
|
||||
1. 직전 1h 거래량 > 이전 5h 평균 × VOL_MULT AND
|
||||
2h 가격 변동 < PRICE_QUIET_PCT% (횡보 중 축적)
|
||||
→ 축적 신호 기록 (signal_price = 현재가)
|
||||
2. 신호 후 현재가가 signal_price 대비 +TREND_AFTER_VOL% 이상 상승 시 진입
|
||||
(현행 +5% 대신 작은 기준으로 더 일찍 진입)
|
||||
3. SIGNAL_TIMEOUT_H 시간 내 추세 미발생 → 신호 초기화
|
||||
"""
|
||||
trades = []
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
in_pos = False
|
||||
buy_idx = buy_price = None
|
||||
i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2)
|
||||
|
||||
while i < len(df):
|
||||
if in_pos:
|
||||
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price)
|
||||
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
|
||||
trades.append((is_win, pnl, df.index[buy_idx], sdt, reason))
|
||||
in_pos = False
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
i = next_i
|
||||
continue
|
||||
|
||||
close = df.iloc[i]["close"]
|
||||
close_2h = df.iloc[i - 2]["close"]
|
||||
quiet = abs(close - close_2h) / close_2h * 100 < PRICE_QUIET_PCT
|
||||
|
||||
vol_recent = df.iloc[i - 1]["volume"]
|
||||
vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean()
|
||||
vol_spike = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT
|
||||
|
||||
# 축적 신호 갱신
|
||||
if quiet and vol_spike:
|
||||
if signal_i is None:
|
||||
signal_i = i
|
||||
signal_price = close
|
||||
else:
|
||||
if signal_i is not None and close < signal_price:
|
||||
# 가격 하락 → 축적 실패, 초기화
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
|
||||
# 타임아웃
|
||||
if signal_i is not None and (i - signal_i) > SIGNAL_TIMEOUT_H:
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
|
||||
# 진입: 축적 신호 후 가격 +TREND_AFTER_VOL% 이상 상승
|
||||
if signal_i is not None:
|
||||
move = (close - signal_price) / signal_price * 100
|
||||
if move >= TREND_AFTER_VOL:
|
||||
in_pos = True
|
||||
buy_idx = i
|
||||
buy_price = close
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
|
||||
i += 1
|
||||
|
||||
return trades
|
||||
|
||||
|
||||
# ── 결과 출력 ─────────────────────────────────────────
|
||||
def summarize(label: str, trades: list) -> dict:
|
||||
if not trades:
|
||||
print(f" [{label}] 거래 없음")
|
||||
return {"total": 0, "wins": 0, "wr": 0.0, "pnl": 0.0}
|
||||
|
||||
wins = sum(1 for t in trades if t[0])
|
||||
total = len(trades)
|
||||
pnl = sum(t[1] for t in trades)
|
||||
wr = wins / total * 100
|
||||
|
||||
print(f" [{label}] {total}건 | 승률={wr:.0f}% ({wins}승 {total-wins}패) | 누적={pnl:+.2f}%")
|
||||
for idx, (is_win, p, bdt, sdt, reason) in enumerate(trades, 1):
|
||||
mark = "✅" if is_win else "❌"
|
||||
print(f" #{idx}: {mark} {p:+.2f}% | {reason}"
|
||||
f" ({bdt.strftime('%m-%d %H:%M')}→{sdt.strftime('%m-%d %H:%M')})")
|
||||
|
||||
return {"total": total, "wins": wins, "wr": wr, "pnl": pnl}
|
||||
|
||||
|
||||
def run_vol_lead_thresh(df: pd.DataFrame, thresh: float) -> list:
|
||||
"""run_vol_lead의 TREND_AFTER_VOL 파라미터를 동적으로 받는 버전."""
|
||||
trades = []
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
in_pos = False
|
||||
buy_idx = buy_price = None
|
||||
i = max(TREND_HOURS, LOCAL_VOL_HOURS + 2)
|
||||
|
||||
while i < len(df):
|
||||
if in_pos:
|
||||
is_win, sp, sdt, reason, pnl = simulate_pos(df, buy_idx, buy_price)
|
||||
next_i = next((j for j in range(i, len(df)) if df.index[j] > sdt), len(df))
|
||||
trades.append((is_win, pnl, df.index[buy_idx], sdt, reason))
|
||||
in_pos = False
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
i = next_i
|
||||
continue
|
||||
|
||||
close = df.iloc[i]["close"]
|
||||
close_2h = df.iloc[i - 2]["close"]
|
||||
quiet = abs(close - close_2h) / close_2h * 100 < PRICE_QUIET_PCT
|
||||
|
||||
vol_recent = df.iloc[i - 1]["volume"]
|
||||
vol_avg = df.iloc[i - LOCAL_VOL_HOURS - 1:i - 1]["volume"].mean()
|
||||
vol_spike = vol_avg > 0 and vol_recent >= vol_avg * VOL_MULT
|
||||
|
||||
if quiet and vol_spike:
|
||||
if signal_i is None:
|
||||
signal_i = i
|
||||
signal_price = close
|
||||
else:
|
||||
if signal_i is not None and close < signal_price:
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
|
||||
if signal_i is not None and (i - signal_i) > SIGNAL_TIMEOUT_H:
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
|
||||
if signal_i is not None:
|
||||
move = (close - signal_price) / signal_price * 100
|
||||
if move >= thresh:
|
||||
in_pos = True
|
||||
buy_idx = i
|
||||
buy_price = close
|
||||
signal_i = None
|
||||
signal_price = None
|
||||
|
||||
i += 1
|
||||
|
||||
return trades
|
||||
|
||||
|
||||
def main() -> None:
|
||||
print("데이터 로딩 중...")
|
||||
data = load_data()
|
||||
|
||||
# ── A 현행 전략 (기준선) ─────────────────────────────
|
||||
print(f"\n{'='*72}")
|
||||
print(f"A(현행 12h+5%+거래량) 기준선 | {FROM_DATE[:10]} ~ 현재")
|
||||
print(f"{'='*72}")
|
||||
agg_a = {"total": 0, "wins": 0, "pnl": 0.0}
|
||||
trend_results = {}
|
||||
for ticker, df in data.items():
|
||||
t = run_trend(df)
|
||||
trend_results[ticker] = t
|
||||
s = {"total": len(t), "wins": sum(1 for x in t if x[0]),
|
||||
"pnl": sum(x[1] for x in t)}
|
||||
agg_a["total"] += s["total"]
|
||||
agg_a["wins"] += s["wins"]
|
||||
agg_a["pnl"] += s["pnl"]
|
||||
a_wr = agg_a["wins"] / agg_a["total"] * 100 if agg_a["total"] else 0
|
||||
print(f"A 합계: {agg_a['total']}건 | 승률={a_wr:.0f}% | 누적={agg_a['pnl']:+.2f}%")
|
||||
|
||||
# ── B 전략: TREND_AFTER_VOL 파라미터 스윕 ───────────
|
||||
THRESHOLDS = [0.5, 1.0, 1.5, 2.0, 2.5, 3.0, 4.0, 5.0]
|
||||
|
||||
print(f"\n{'='*72}")
|
||||
print(f"B(거래량→+N% 선진입) 파라미터 스윕")
|
||||
print(f"{'─'*72}")
|
||||
print(f"{'임계값':>6} │ {'거래':>5} {'승률':>6} {'누적PnL':>10} │ vs A PnL")
|
||||
print(f"{'─'*72}")
|
||||
|
||||
best = None
|
||||
for thresh in THRESHOLDS:
|
||||
agg = {"total": 0, "wins": 0, "pnl": 0.0}
|
||||
for ticker, df in data.items():
|
||||
t = run_vol_lead_thresh(df, thresh)
|
||||
agg["total"] += len(t)
|
||||
agg["wins"] += sum(1 for x in t if x[0])
|
||||
agg["pnl"] += sum(x[1] for x in t)
|
||||
wr = agg["wins"] / agg["total"] * 100 if agg["total"] else 0
|
||||
diff = agg["pnl"] - agg_a["pnl"]
|
||||
marker = " ← best" if (best is None or agg["pnl"] > best["pnl"]) else ""
|
||||
if marker:
|
||||
best = {**agg, "thresh": thresh, "wr": wr}
|
||||
print(f"+{thresh:>4.1f}% │ {agg['total']:>5}건 {wr:>5.0f}% {agg['pnl']:>+9.2f}% │ {diff:>+8.2f}%{marker}")
|
||||
|
||||
print(f"{'─'*72}")
|
||||
print(f"\n★ 최적 임계값: +{best['thresh']}% → "
|
||||
f"{best['total']}건 | 승률={best['wr']:.0f}% | 누적={best['pnl']:+.2f}%")
|
||||
|
||||
# ── 최적 임계값으로 종목별 상세 출력 ─────────────────
|
||||
best_thresh = best["thresh"]
|
||||
print(f"\n{'='*72}")
|
||||
print(f"★ B(vol→+{best_thresh}%) vs A(12h+5%+vol) 종목별 비교")
|
||||
print(f"{'─'*72}")
|
||||
print(f"{'종목':<14} │ {'A 현행':^24} │ {'B +{:.1f}%'.format(best_thresh):^24}")
|
||||
print(f"{'':14} │ {'거래':>4} {'승률':>5} {'누적':>9} │ {'거래':>4} {'승률':>5} {'누적':>9}")
|
||||
print(f"{'─'*72}")
|
||||
for ticker, df in data.items():
|
||||
t_a = trend_results[ticker]
|
||||
t_b = run_vol_lead_thresh(df, best_thresh)
|
||||
wa = sum(1 for x in t_a if x[0])
|
||||
wb = sum(1 for x in t_b if x[0])
|
||||
pa = sum(x[1] for x in t_a)
|
||||
pb = sum(x[1] for x in t_b)
|
||||
wr_a = wa / len(t_a) * 100 if t_a else 0
|
||||
wr_b = wb / len(t_b) * 100 if t_b else 0
|
||||
print(f"{ticker:<14} │ {len(t_a):>4}건 {wr_a:>4.0f}% {pa:>+8.2f}% │"
|
||||
f" {len(t_b):>4}건 {wr_b:>4.0f}% {pb:>+8.2f}%")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user