- tick_trader.py를 Controller로 축소, 로직을 3개 모듈로 분리: - core/signal.py: 시그널 감지, 지표 계산 (calc_vr, calc_atr, detect_signal) - core/order.py: Upbit 주문 실행 (매수/매도/취소/조회) - core/position_manager.py: 포지션 관리, DB sync, 복구, 청산 조건 - type hints, Google docstring, 구체적 예외 타입 적용 - 50줄 초과 함수 분리 (process_signal, restore_positions) - 미사용 파일 58개 archive/ 폴더로 이동 - README.md 추가 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
364 lines
15 KiB
Python
364 lines
15 KiB
Python
"""DB 기반 하이브리드 시뮬레이션 (신호=10분봉 / 스탑=1분봉).
|
|
|
|
실행 흐름:
|
|
1. pyupbit에서 10분봉(300봉) + 1분봉(2880봉) fetch → BACKTEST_OHLCV upsert
|
|
2. BACKTEST_OHLCV에서 데이터 로드
|
|
3. 하이브리드 시뮬 실행
|
|
- 10분봉 타임인덱스로 신호 감지 / 진입
|
|
- 각 10분봉 구간 내 1분봉으로 트레일링스탑 체크
|
|
4. 결과 출력
|
|
"""
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
from dotenv import load_dotenv
|
|
load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env'))
|
|
|
|
import time
|
|
import pyupbit
|
|
import pandas as pd
|
|
import numpy as np
|
|
import oracledb
|
|
|
|
# ── Oracle 연결 ─────────────────────────────────────────────────────────────
|
|
def _get_conn():
|
|
kwargs = dict(
|
|
user=os.environ["ORACLE_USER"],
|
|
password=os.environ["ORACLE_PASSWORD"],
|
|
dsn=os.environ["ORACLE_DSN"],
|
|
)
|
|
wallet = os.environ.get("ORACLE_WALLET")
|
|
if wallet:
|
|
kwargs["config_dir"] = wallet
|
|
return oracledb.connect(**kwargs)
|
|
|
|
|
|
# ── 전략 파라미터 ────────────────────────────────────────────────────────────
|
|
LOCAL_VOL_N = 28
|
|
QUIET_N = 12
|
|
QUIET_PCT = 2.0
|
|
THRESH = 4.8
|
|
SIGNAL_TO = 48
|
|
VOL_THRESH = 6.0
|
|
FNG = 14
|
|
ATR_N = 28
|
|
ATR_MULT = 1.5
|
|
ATR_MIN = 0.010
|
|
ATR_MAX = 0.020
|
|
TS_N = 48
|
|
TIME_STOP_PCT = 3.0
|
|
BUDGET = 15_000_000
|
|
MAX_POS = 3
|
|
FEE = 0.0005
|
|
PER_POS = BUDGET // MAX_POS
|
|
|
|
TICKERS = [
|
|
'KRW-XRP','KRW-BTC','KRW-ETH','KRW-SOL','KRW-DOGE',
|
|
'KRW-ADA','KRW-SUI','KRW-NEAR','KRW-KAVA','KRW-SXP',
|
|
'KRW-AKT','KRW-SONIC','KRW-IP','KRW-ORBS','KRW-VIRTUAL',
|
|
'KRW-BARD','KRW-XPL','KRW-KITE','KRW-ENSO','KRW-0G',
|
|
]
|
|
|
|
SIM_START = '2026-03-03 00:00:00'
|
|
SIM_END = '2026-03-04 13:30:00'
|
|
|
|
|
|
# ── BACKTEST_OHLCV 공통 함수 ─────────────────────────────────────────────────
|
|
|
|
def upsert_ohlcv(conn, ticker: str, interval_cd: str, df: pd.DataFrame) -> int:
|
|
"""DataFrame을 BACKTEST_OHLCV에 저장 (기존 TS 스킵). 반환: 신규 삽입 건수."""
|
|
cur = conn.cursor()
|
|
min_ts = df.index.min().to_pydatetime()
|
|
cur.execute(
|
|
"SELECT ts FROM backtest_ohlcv WHERE ticker=:t AND interval_cd=:iv AND ts >= :since",
|
|
{"t": ticker, "iv": interval_cd, "since": min_ts}
|
|
)
|
|
existing = {r[0] for r in cur.fetchall()}
|
|
|
|
new_rows = [
|
|
(ticker, interval_cd, ts_idx.to_pydatetime(),
|
|
float(row["open"]), float(row["high"]), float(row["low"]),
|
|
float(row["close"]), float(row["volume"]))
|
|
for ts_idx, row in df.iterrows()
|
|
if ts_idx.to_pydatetime() not in existing
|
|
]
|
|
if not new_rows:
|
|
return 0
|
|
|
|
cur.executemany(
|
|
"INSERT INTO backtest_ohlcv (ticker, interval_cd, ts, open_p, high_p, low_p, close_p, volume_p) "
|
|
"VALUES (:1, :2, :3, :4, :5, :6, :7, :8)",
|
|
new_rows
|
|
)
|
|
conn.commit()
|
|
return len(new_rows)
|
|
|
|
|
|
def load_from_db(conn, ticker: str, interval_cd: str, since: str):
|
|
"""BACKTEST_OHLCV에서 ticker의 OHLCV 로드."""
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"SELECT ts, open_p, high_p, low_p, close_p, volume_p "
|
|
"FROM backtest_ohlcv "
|
|
"WHERE ticker=:ticker AND interval_cd=:iv "
|
|
"AND ts >= TO_TIMESTAMP(:since, 'YYYY-MM-DD HH24:MI:SS') "
|
|
"ORDER BY ts",
|
|
{"ticker": ticker, "iv": interval_cd, "since": since}
|
|
)
|
|
rows = cur.fetchall()
|
|
if not rows:
|
|
return None
|
|
df = pd.DataFrame(rows, columns=["ts","open","high","low","close","volume"])
|
|
df["ts"] = pd.to_datetime(df["ts"])
|
|
df.set_index("ts", inplace=True)
|
|
return df.astype(float)
|
|
|
|
|
|
# ── 1단계: DB 업데이트 ───────────────────────────────────────────────────────
|
|
|
|
def refresh_db(conn) -> None:
|
|
"""10분봉(300봉) + 1분봉(2880봉) fetch → BACKTEST_OHLCV upsert."""
|
|
print("BACKTEST_OHLCV 업데이트 중...", flush=True)
|
|
total10, total1 = 0, 0
|
|
for tk in TICKERS:
|
|
try:
|
|
# 10분봉
|
|
df10 = pyupbit.get_ohlcv(tk, interval='minute10', count=300)
|
|
if df10 is not None and len(df10) >= 10:
|
|
total10 += upsert_ohlcv(conn, tk, 'minute10', df10)
|
|
time.sleep(0.12)
|
|
|
|
# 1분봉 (2일치 ≈ 2880봉, 내부 자동 페이지네이션)
|
|
df1 = pyupbit.get_ohlcv(tk, interval='minute1', count=2880)
|
|
if df1 is not None and len(df1) >= 60:
|
|
total1 += upsert_ohlcv(conn, tk, 'minute1', df1)
|
|
time.sleep(0.15)
|
|
|
|
print(f" {tk}: 10m={len(df10) if df10 is not None else 0}봉 / "
|
|
f"1m={len(df1) if df1 is not None else 0}봉", flush=True)
|
|
except Exception as e:
|
|
print(f" {tk} 오류: {e}")
|
|
|
|
print(f"DB 업데이트 완료: 10분봉 신규 {total10}행 / 1분봉 신규 {total1}행\n", flush=True)
|
|
|
|
|
|
# ── ATR 계산 ────────────────────────────────────────────────────────────────
|
|
|
|
def calc_atr(df, i, n=28):
|
|
start = max(0, i - n)
|
|
sub = df.iloc[start:i]
|
|
if len(sub) < 5:
|
|
return ATR_MIN
|
|
hi = sub['high'].values
|
|
lo = sub['low'].values
|
|
cl = sub['close'].values
|
|
tr = [max(hi[j]-lo[j], abs(hi[j]-cl[j-1]) if j>0 else 0,
|
|
abs(lo[j]-cl[j-1]) if j>0 else 0) for j in range(len(sub))]
|
|
atr_pct = np.mean(tr) / cl[-1] if cl[-1] > 0 else ATR_MIN
|
|
return max(ATR_MIN, min(ATR_MAX, atr_pct * ATR_MULT))
|
|
|
|
|
|
# ── 하이브리드 시뮬 ──────────────────────────────────────────────────────────
|
|
|
|
def run_sim(raw10: dict, raw1: dict) -> None:
|
|
"""신호=10분봉 / 스탑=1분봉 하이브리드 시뮬."""
|
|
|
|
# 10분봉 공통 타임인덱스
|
|
all_idx10 = None
|
|
for df in raw10.values():
|
|
all_idx10 = df.index if all_idx10 is None else all_idx10.union(df.index)
|
|
all_idx10 = all_idx10.sort_values()
|
|
mask = (all_idx10 >= SIM_START) & (all_idx10 <= SIM_END)
|
|
sim_idx10 = all_idx10[mask]
|
|
|
|
if len(sim_idx10) == 0:
|
|
print("시뮬 구간 데이터 없음")
|
|
return
|
|
|
|
print(f"시뮬 구간(10분봉): {sim_idx10[0]} ~ {sim_idx10[-1]} ({len(sim_idx10)}봉)")
|
|
# 1분봉 커버리지 확인
|
|
n1m = sum(
|
|
len(df[(df.index >= SIM_START) & (df.index <= SIM_END)])
|
|
for df in raw1.values() if len(raw1) > 0
|
|
)
|
|
print(f"1분봉 총 {n1m}봉 (스탑 체크용)\n")
|
|
|
|
positions = {} # ticker → {buy_price, peak, entry_i, invested, atr_stop}
|
|
signals = {} # ticker → {price, vol_r, sig_i}
|
|
trades = []
|
|
|
|
for i, ts10 in enumerate(sim_idx10):
|
|
ts10_prev = sim_idx10[i - 1] if i > 0 else ts10
|
|
|
|
# ── 1) 1분봉으로 스탑 체크 ────────────────────────────────────────
|
|
for tk in list(positions.keys()):
|
|
pos = positions[tk]
|
|
# 진입 캔들 종료 전엔 체크 금지
|
|
# 10분봉 ts=X 는 [X, X+10min) 구간이므로 실제 진입은 X+9:59
|
|
# → X+10min 이후 1분봉부터 체크
|
|
entry_candle_end = sim_idx10[pos['entry_i']] + pd.Timedelta(minutes=10)
|
|
|
|
if tk not in raw1:
|
|
# 1분봉 없으면 10분봉 종가로 fallback (단, 진입 다음 봉부터)
|
|
if ts10 <= sim_idx10[pos['entry_i']]:
|
|
continue
|
|
if tk not in raw10 or ts10 not in raw10[tk].index:
|
|
continue
|
|
current = float(raw10[tk].loc[ts10, 'close'])
|
|
_check_stop(pos, tk, current, ts10, i, sim_idx10, trades, positions)
|
|
continue
|
|
|
|
df1 = raw1[tk]
|
|
# 진입 캔들 종료(entry_candle_end) 이후 + 현재 10분봉 구간 이내
|
|
mask1 = (df1.index >= entry_candle_end) & (df1.index > ts10_prev) & (df1.index <= ts10)
|
|
sub1 = df1[mask1]
|
|
|
|
for ts1m, row1m in sub1.iterrows():
|
|
current = float(row1m['close'])
|
|
if _check_stop(pos, tk, current, ts1m, i, sim_idx10, trades, positions):
|
|
break # 이미 청산됨
|
|
|
|
# ── 2) 신호 만료 ────────────────────────────────────────────────
|
|
for tk in list(signals.keys()):
|
|
if i - signals[tk]['sig_i'] > SIGNAL_TO:
|
|
del signals[tk]
|
|
|
|
# ── 3) 신호 감지 + 진입 (10분봉 기준) ──────────────────────────
|
|
for tk in TICKERS:
|
|
if tk in positions: continue
|
|
if len(positions) >= MAX_POS: break
|
|
if tk not in raw10: continue
|
|
df10 = raw10[tk]
|
|
if ts10 not in df10.index: continue
|
|
loc = df10.index.get_loc(ts10)
|
|
if loc < LOCAL_VOL_N + QUIET_N + 2: continue
|
|
|
|
vol_prev = float(df10['volume'].iloc[loc - 1])
|
|
vol_avg = float(df10['volume'].iloc[loc - LOCAL_VOL_N - 1:loc - 1].mean())
|
|
vol_r = vol_prev / vol_avg if vol_avg > 0 else 0.0
|
|
current = float(df10['close'].iloc[loc])
|
|
close_qn = float(df10['close'].iloc[loc - QUIET_N])
|
|
chg = abs(current - close_qn) / close_qn * 100 if close_qn > 0 else 999.0
|
|
|
|
if vol_r >= VOL_THRESH and chg < QUIET_PCT:
|
|
if tk not in signals or vol_r > signals[tk]['vol_r']:
|
|
signals[tk] = {'price': current, 'vol_r': vol_r, 'sig_i': i}
|
|
|
|
if tk in signals:
|
|
sig_p = signals[tk]['price']
|
|
move = (current - sig_p) / sig_p * 100
|
|
if move >= THRESH:
|
|
atr_stop = calc_atr(df10, loc, ATR_N)
|
|
positions[tk] = {
|
|
'buy_price': current, 'peak': current,
|
|
'entry_i': i, 'invested': PER_POS,
|
|
'atr_stop': atr_stop,
|
|
}
|
|
del signals[tk]
|
|
|
|
# ── 미청산 강제 청산 ─────────────────────────────────────────────────────
|
|
last_ts10 = sim_idx10[-1]
|
|
for tk, pos in list(positions.items()):
|
|
if tk not in raw10: continue
|
|
df10 = raw10[tk]
|
|
current = float(df10.loc[last_ts10, 'close']) if last_ts10 in df10.index else pos['buy_price']
|
|
pnl = (current - pos['buy_price']) / pos['buy_price'] * 100
|
|
fee = pos['invested'] * FEE + current * (pos['invested']/pos['buy_price']) * FEE
|
|
krw = current * (pos['invested']/pos['buy_price']) - pos['invested'] - fee
|
|
trades.append({
|
|
'date': last_ts10.date(), 'ticker': tk,
|
|
'buy_price': pos['buy_price'], 'sell_price': current,
|
|
'pnl_pct': pnl, 'krw': krw, 'reason': '미청산(현재가)',
|
|
'entry_ts': sim_idx10[pos['entry_i']], 'exit_ts': last_ts10,
|
|
})
|
|
|
|
# ── 결과 출력 ────────────────────────────────────────────────────────────
|
|
if not trades:
|
|
print("거래 없음")
|
|
return
|
|
|
|
import collections
|
|
by_date = collections.defaultdict(list)
|
|
for t in trades:
|
|
by_date[t['date']].append(t)
|
|
|
|
total_krw, total_wins = 0, 0
|
|
for date in sorted(by_date.keys()):
|
|
day_trades = by_date[date]
|
|
day_krw = sum(t['krw'] for t in day_trades)
|
|
day_wins = sum(1 for t in day_trades if t['pnl_pct'] > 0)
|
|
total_krw += day_krw
|
|
total_wins += day_wins
|
|
print(f"\n{'='*62}")
|
|
print(f"【{date}】 {len(day_trades)}건 | 승률={day_wins/len(day_trades)*100:.0f}% | 일손익={day_krw:+,.0f}원")
|
|
print(f"{'='*62}")
|
|
for t in sorted(day_trades, key=lambda x: x['entry_ts']):
|
|
e = '✅' if t['pnl_pct'] > 0 else '❌'
|
|
print(f" {e} {t['ticker']:12s} "
|
|
f"매수={t['buy_price']:,.0f} @ {str(t['entry_ts'])[5:16]} → "
|
|
f"매도={t['sell_price']:,.0f} @ {str(t['exit_ts'])[5:16]} "
|
|
f"| {t['pnl_pct']:+.1f}% ({t['krw']:+,.0f}원) [{t['reason']}]")
|
|
|
|
print(f"\n{'='*62}")
|
|
print(f"【2일 합계】 {len(trades)}건 | "
|
|
f"승률={total_wins/len(trades)*100:.0f}% | 총손익={total_krw:+,.0f}원")
|
|
print(f" [1분봉 스탑 시뮬] VOL≥{VOL_THRESH}x + 횡보<{QUIET_PCT}% → +{THRESH}% 진입 (F&G={FNG})")
|
|
|
|
|
|
def _check_stop(pos, tk, current, ts, i, sim_idx10, trades, positions) -> bool:
|
|
"""스탑 체크. 청산 시 True 반환."""
|
|
if current > pos['peak']:
|
|
pos['peak'] = current
|
|
age = i - pos['entry_i']
|
|
pnl = (current - pos['buy_price']) / pos['buy_price'] * 100
|
|
peak_drop = (pos['peak'] - current) / pos['peak'] * 100
|
|
atr_stop = pos['atr_stop']
|
|
reason = None
|
|
if peak_drop >= atr_stop * 100:
|
|
reason = f"트레일링스탑({atr_stop*100:.1f}%)"
|
|
elif age >= TS_N and pnl < TIME_STOP_PCT:
|
|
reason = f"타임스탑(+{pnl:.1f}%<{TIME_STOP_PCT}%)"
|
|
if reason:
|
|
fee = pos['invested'] * FEE + current * (pos['invested']/pos['buy_price']) * FEE
|
|
krw = current * (pos['invested']/pos['buy_price']) - pos['invested'] - fee
|
|
trades.append({
|
|
'date': ts.date() if hasattr(ts, 'date') else ts,
|
|
'ticker': tk,
|
|
'buy_price': pos['buy_price'], 'sell_price': current,
|
|
'pnl_pct': pnl, 'krw': krw, 'reason': reason,
|
|
'entry_ts': sim_idx10[pos['entry_i']], 'exit_ts': ts,
|
|
})
|
|
del positions[tk]
|
|
return True
|
|
return False
|
|
|
|
|
|
# ── main ─────────────────────────────────────────────────────────────────────
|
|
|
|
conn = _get_conn()
|
|
|
|
# 1) DB 업데이트
|
|
refresh_db(conn)
|
|
|
|
# 2) DB에서 데이터 로드
|
|
print("DB에서 OHLCV 로드 중...", flush=True)
|
|
LOAD_SINCE = '2026-03-01 00:00:00'
|
|
raw10, raw1 = {}, {}
|
|
for tk in TICKERS:
|
|
df10 = load_from_db(conn, tk, 'minute10', LOAD_SINCE)
|
|
if df10 is not None and len(df10) > LOCAL_VOL_N + QUIET_N:
|
|
raw10[tk] = df10
|
|
|
|
df1 = load_from_db(conn, tk, 'minute1', LOAD_SINCE)
|
|
if df1 is not None and len(df1) > 60:
|
|
raw1[tk] = df1
|
|
|
|
print(f"로드 완료: 10분봉 {len(raw10)}종목 / 1분봉 {len(raw1)}종목\n")
|
|
conn.close()
|
|
|
|
# 3) 시뮬 실행
|
|
print("="*62)
|
|
print(f"하이브리드 시뮬 (신호=10분봉 / 스탑=1분봉) | 2026-03-03 ~ 03-04")
|
|
print("="*62)
|
|
run_sim(raw10, raw1)
|