Compare commits

..

8 Commits

Author SHA1 Message Date
joungmin
83a229dd26 feat: add market regime filter and compound reinvestment
- Add market_regime.py: BTC/ETH/SOL/XRP weighted 2h trend score
  Bull(≥+1.5%) / Neutral / Bear(<-1%) regime detection with 10min cache
- strategy.py: dynamic TREND/VOL thresholds based on current regime
  Bull: 3%/1.5x, Neutral: 5%/2.0x, Bear: 8%/3.5x
- price_collector.py: always include leader coins in price history
- trader.py: compound reinvestment (profit added to budget, floor at initial)
- notify.py: regime info in hourly report, P&L icons (/, 💚/🔴)
- main.py: hourly status at top-of-hour, filter positions held 1h+
- backtest.py: timestop/combo comparison modes

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 10:14:36 +09:00
joungmin
035b3e2f30 fix: use actual order fills for weighted avg sell price
After sell_market_order, query Upbit /v1/order API to get actual
trade fills. If split across multiple fills, compute weighted average
price and use actual paid_fee instead of estimate.

Falls back to get_current_price if order query fails.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 06:11:29 +09:00
joungmin
bcef128155 feat: add trade_id + full trade record to trade_results
Each buy generates a UUID trade_id stored in positions table.
Each sell links via same trade_id in trade_results, enabling
round-trip grouping of buy→sell pairs.

Additional fields saved per trade:
- fee_krw: commission amount (0.05% each side)
- krw_profit: net KRW profit/loss after fees
- buy_price / sell_price: exact prices
- invested_krw: capital deployed
- sell_reason: trailing stop / time stop / etc.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 05:54:06 +09:00
joungmin
60e739d18b fix: use local 5h volume baseline instead of 23h global average
23h average includes high-volume daytime periods, causing false negatives
at early morning hours. Now compare last 1h candle against the previous
5h local average (same time-of-day context) with 1.2x multiplier.

Also add momentum failure debug logs to show exact reason for rejection.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 05:46:25 +09:00
joungmin
5df56a933e feat: use 60min volume, add KRW P&L log, relax re-entry after win
- strategy: replace daily volume check with 60-min candle volume
  (daily volume at 5am is tiny -> BTC/ETH never matched; now uses
  last 1h candle vs previous 23h avg × 2)
- trader: log actual KRW net profit and fee on every sell
- trader: skip re-entry +1% block when last trade was a win
  (allow re-entry on new trend signal even below last sell price)

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 05:38:37 +09:00
joungmin
d2a5c3ae9e fix: persist sell prices to DB and add WF filter bootstrap
- price_db: add sell_prices table (ensure/upsert/load/delete)
- trader: restore _last_sell_prices from DB on startup so re-entry
  block survives restarts; persist each sell price immediately
- market: retry chunk requests up to 3 times with backoff on 429

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-01 05:19:22 +09:00
joungmin
0b264b304c feat: add backtest module with DB cache and scenario comparison
Backtest improvements:
- Add backtest.py with Oracle DB-backed OHLCV cache (no repeated API calls)
- Add backtest_trades table to cache simulation results by params hash
  (same params -> instant load, skip re-simulation)
- Add walk-forward scenario comparison (--walkforward-cmp)
- Add trend ceiling filter (--trend-cmp, max gain threshold)
- Add ticker win-rate filter (--ticker-cmp, SQL-based instant analysis)
- Precompute daily_features once per data load (not per scenario)

Live bot fixes:
- monitor: add hard stop-loss from buy price (in addition to trailing)
- strategy: fix re-entry condition to require +1% above last sell price
- price_collector: add 48h backfill on startup for trend calculation
- main: call backfill_prices() at startup

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 23:28:27 +09:00
joungmin
4888aa0faa feat: add walk-forward trade filter to prevent re-entry on losing tickers
- Add trade_results table to Oracle DB for persistent trade history
- Record win/loss after each sell with pnl_pct
- Load last N trades per ticker from DB on startup (survives restarts)
- Block buy() when recent win rate (last 5 trades) < 40% threshold
- Configurable via WF_WINDOW and WF_MIN_WIN_RATE env vars
- Backtest showed improvement from -7.5% to +37.4% cumulative return

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-02-28 23:28:07 +09:00
10 changed files with 2588 additions and 99 deletions

1701
backtest.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -29,15 +29,27 @@ def get_top_tickers() -> list[str]:
if not all_tickers: if not all_tickers:
return [] return []
# 100개씩 나눠서 조회 (URL 길이 제한) # 100개씩 나눠서 조회 (URL 길이 제한, 429 재시도 포함)
chunk_size = 100 chunk_size = 100
ticker_data = [] ticker_data = []
for i in range(0, len(all_tickers), chunk_size): for i in range(0, len(all_tickers), chunk_size):
chunk = all_tickers[i:i + chunk_size] chunk = all_tickers[i:i + chunk_size]
params = {"markets": ",".join(chunk)} params = {"markets": ",".join(chunk)}
resp = requests.get(_TICKER_URL, params=params, timeout=5) for attempt in range(3):
resp.raise_for_status() try:
ticker_data.extend(resp.json()) resp = requests.get(_TICKER_URL, params=params, timeout=5)
if resp.status_code == 429:
wait = 2 ** attempt # 1s → 2s → 4s
logger.warning(f"429 Rate Limit, {wait}s 대기 후 재시도 ({attempt+1}/3)")
time.sleep(wait)
continue
resp.raise_for_status()
ticker_data.extend(resp.json())
break
except Exception as e:
if attempt == 2:
raise
time.sleep(1)
# 스테이블코인 제외 # 스테이블코인 제외
EXCLUDE = {"KRW-USDT", "KRW-USDC", "KRW-DAI", "KRW-BUSD"} EXCLUDE = {"KRW-USDT", "KRW-USDC", "KRW-DAI", "KRW-BUSD"}

110
core/market_regime.py Normal file
View File

@@ -0,0 +1,110 @@
"""시장 레짐(Bull/Neutral/Bear) 판단.
BTC·ETH·SOL·XRP 가중 평균 2h 추세로 레짐을 결정하고
매수 조건 파라미터(trend_pct, vol_mult)를 동적으로 반환한다.
계산된 현재가는 price_history DB에 저장해 재활용한다.
"""
from __future__ import annotations
import logging
import time
import pyupbit
from .price_db import get_price_n_hours_ago, insert_prices
logger = logging.getLogger(__name__)
# 대장 코인 가중치
LEADERS: dict[str, float] = {
"KRW-BTC": 0.40,
"KRW-ETH": 0.30,
"KRW-SOL": 0.15,
"KRW-XRP": 0.15,
}
TREND_HOURS = 2 # 2h 추세 기준
BULL_THRESHOLD = 1.5 # score ≥ 1.5% → Bull
BEAR_THRESHOLD = -1.0 # score < -1.0% → Bear
# 레짐별 매수 조건 파라미터
REGIME_PARAMS: dict[str, dict] = {
"bull": {"trend_pct": 3.0, "vol_mult": 1.5, "emoji": "🟢"},
"neutral": {"trend_pct": 5.0, "vol_mult": 2.0, "emoji": "🟡"},
"bear": {"trend_pct": 8.0, "vol_mult": 3.5, "emoji": "🔴"},
}
# 10분 캐시 (스캔 루프마다 API 호출 방지)
_cache: dict = {}
_cache_ts: float = 0.0
_CACHE_TTL = 600
def get_regime() -> dict:
"""현재 시장 레짐 반환.
Returns:
{
'name': 'bull' | 'neutral' | 'bear',
'score': float, # 가중 평균 2h 추세(%)
'trend_pct': float, # 매수 추세 임계값
'vol_mult': float, # 거래량 배수 임계값
'emoji': str,
}
"""
global _cache, _cache_ts
if _cache and (time.time() - _cache_ts) < _CACHE_TTL:
return _cache
score = 0.0
current_prices: dict[str, float] = {}
for ticker, weight in LEADERS.items():
try:
current = pyupbit.get_current_price(ticker)
if not current:
continue
current_prices[ticker] = current
# DB에서 2h 전 가격 조회 → 없으면 API 캔들로 대체
past = get_price_n_hours_ago(ticker, TREND_HOURS)
if past is None:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=4)
if df is not None and len(df) >= 3:
past = float(df["close"].iloc[-3])
if past:
trend = (current - past) / past * 100
score += trend * weight
logger.debug(f"[레짐] {ticker} {trend:+.2f}% (기여 {trend*weight:+.3f})")
except Exception as e:
logger.warning(f"[레짐] {ticker} 오류: {e}")
# 현재가 DB 저장 (다음 레짐 계산 및 추세 판단에 재활용)
if current_prices:
try:
insert_prices(current_prices)
except Exception as e:
logger.warning(f"[레짐] 가격 저장 오류: {e}")
# 레짐 결정
if score >= BULL_THRESHOLD:
name = "bull"
elif score < BEAR_THRESHOLD:
name = "bear"
else:
name = "neutral"
params = REGIME_PARAMS[name]
result = {"name": name, "score": round(score, 3), **params}
logger.info(
f"[레짐] score={score:+.3f}% → {params['emoji']} {name.upper()} "
f"(TREND≥{params['trend_pct']}% / VOL≥{params['vol_mult']}x)"
)
_cache = result
_cache_ts = time.time()
return result

View File

@@ -19,7 +19,7 @@ TIME_STOP_MIN_GAIN_PCT = float(os.getenv("TIME_STOP_MIN_GAIN_PCT", "3"))
def _check_trailing_stop(ticker: str, pos: dict, current: float) -> bool: def _check_trailing_stop(ticker: str, pos: dict, current: float) -> bool:
"""트레일링 스탑 체크. 매도 시 True 반환.""" """트레일링 스탑(최고가 기준) + 고정 스탑(매수가 기준) 체크. 매도 시 True 반환."""
trader.update_peak(ticker, current) trader.update_peak(ticker, current)
pos = trader.get_positions().get(ticker) pos = trader.get_positions().get(ticker)
@@ -27,15 +27,24 @@ def _check_trailing_stop(ticker: str, pos: dict, current: float) -> bool:
return False return False
peak = pos["peak_price"] peak = pos["peak_price"]
buy_price = pos["buy_price"]
drop_from_peak = (peak - current) / peak drop_from_peak = (peak - current) / peak
drop_from_buy = (buy_price - current) / buy_price # 구매가 대비 하락률
if drop_from_peak >= STOP_LOSS_PCT: if drop_from_peak >= STOP_LOSS_PCT:
reason = ( reason = (
f"트레일링스탑 | 최고가={peak:,.0f}원 → " f"트레일링스탑 | 최고가={peak:,.0f}원 → "
f"현재={current:,.0f}원 ({drop_from_peak:.1%} 하락)" f"현재={current:,.0f}원 ({drop_from_peak:.1%} 하락)"
) )
trader.sell(ticker, reason=reason) return trader.sell(ticker, reason=reason)
return True
if drop_from_buy >= STOP_LOSS_PCT:
reason = (
f"스탑로스 | 매수가={buy_price:,.0f}원 → "
f"현재={current:,.0f}원 ({drop_from_buy:.1%} 하락)"
)
return trader.sell(ticker, reason=reason)
return False return False
@@ -70,15 +79,17 @@ def _check_position(ticker: str, pos: dict) -> None:
if current is None: if current is None:
return return
pnl = (current - pos["buy_price"]) / pos["buy_price"] * 100 buy_price = pos["buy_price"]
pnl = (current - buy_price) / buy_price * 100
peak = pos["peak_price"] peak = pos["peak_price"]
drop_from_peak = (peak - current) / peak drop_from_peak = (peak - current) / peak
drop_from_buy = (buy_price - current) / buy_price
entry_time = pos.get("entry_time", datetime.now()) entry_time = pos.get("entry_time", datetime.now())
elapsed_hours = (datetime.now() - entry_time).total_seconds() / 3600 elapsed_hours = (datetime.now() - entry_time).total_seconds() / 3600
logger.info( logger.info(
f"[감시] {ticker} 현재={current:,.0f} | 최고={peak:,.0f} | " f"[감시] {ticker} 현재={current:,.0f} | 매수가={buy_price:,.0f} | 최고={peak:,.0f} | "
f"하락={drop_from_peak:.1%} | 수익률={pnl:+.1f}% | " f"수익률={pnl:+.1f}% | peak하락={drop_from_peak:.1%} | buy하락={drop_from_buy:.1%} | "
f"보유={elapsed_hours:.1f}h" f"보유={elapsed_hours:.1f}h"
) )
@@ -94,7 +105,7 @@ def run_monitor(interval: int = CHECK_INTERVAL) -> None:
"""전체 포지션 감시 루프.""" """전체 포지션 감시 루프."""
logger.info( logger.info(
f"모니터 시작 | 체크={interval}초 | " f"모니터 시작 | 체크={interval}초 | "
f"트레일링스탑={STOP_LOSS_PCT:.0%} | " f"트레일링스탑={STOP_LOSS_PCT:.1%} | "
f"타임스탑={TIME_STOP_HOURS:.0f}h/{TIME_STOP_MIN_GAIN_PCT:+.0f}%" f"타임스탑={TIME_STOP_HOURS:.0f}h/{TIME_STOP_MIN_GAIN_PCT:+.0f}%"
) )
while True: while True:

View File

@@ -28,21 +28,36 @@ def _send(text: str) -> None:
logger.error(f"Telegram 알림 실패: {e}") logger.error(f"Telegram 알림 실패: {e}")
def notify_buy(ticker: str, price: float, amount: float, invested_krw: int) -> None: def notify_buy(
ticker: str, price: float, amount: float, invested_krw: int,
max_budget: int = 0, per_position: int = 0,
) -> None:
budget_line = (
f"운용예산: {max_budget:,}원 (포지션당 {per_position:,}원)\n"
if max_budget else ""
)
_send( _send(
f"📈 <b>[매수]</b> {ticker}\n" f"📈 <b>[매수]</b> {ticker}\n"
f"가격: {price:,.0f}\n" f"가격: {price:,.0f}\n"
f"수량: {amount}\n" f"수량: {amount}\n"
f"투자금: {invested_krw:,}" f"투자금: {invested_krw:,}\n"
f"{budget_line}"
) )
def notify_sell(ticker: str, price: float, pnl_pct: float, reason: str) -> None: def notify_sell(
emoji = "" if pnl_pct >= 0 else "🔴" ticker: str, price: float, pnl_pct: float, reason: str,
krw_profit: float = 0.0, fee_krw: float = 0.0,
cum_profit: float = 0.0,
) -> None:
trade_emoji = "" if pnl_pct >= 0 else ""
cum_emoji = "💚" if cum_profit >= 0 else "🔴"
_send( _send(
f"{emoji} <b>[매도]</b> {ticker}\n" f"{trade_emoji} <b>[매도]</b> {ticker}\n"
f"가격: {price:,.0f}\n" f"가격: {price:,.0f}\n"
f"수익률: {pnl_pct:+.1f}%\n" f"수익률: {pnl_pct:+.2f}%\n"
f"실손익: {krw_profit:+,.0f}원 (수수료 {fee_krw:,.0f}원)\n"
f"{cum_emoji} 누적손익: {cum_profit:+,.0f}\n"
f"사유: {reason}" f"사유: {reason}"
) )
@@ -51,19 +66,50 @@ def notify_error(message: str) -> None:
_send(f"⚠️ <b>[오류]</b>\n{message}") _send(f"⚠️ <b>[오류]</b>\n{message}")
def notify_status(positions: dict) -> None: def notify_status(
"""1시간마다 포지션 현황 요약 전송.""" positions: dict,
max_budget: int = 0,
per_position: int = 0,
cum_profit: float = 0.0,
) -> None:
"""정각마다 시장 레짐 + 1시간 이상 보유 포지션 현황 전송."""
from datetime import datetime from datetime import datetime
import pyupbit import pyupbit
from .market_regime import get_regime
now = datetime.now().strftime("%H:%M") now = datetime.now().strftime("%H:%M")
cum_sign = "+" if cum_profit >= 0 else ""
if not positions: # 시장 레짐
_send(f"📊 <b>[{now} 현황]</b>\n보유 포지션 없음 — 매수 신호 대기 중") regime = get_regime()
regime_line = (
f"{regime['emoji']} 시장: {regime['name'].upper()} "
f"(score {regime['score']:+.2f}%) "
f"| 조건 TREND≥{regime['trend_pct']}% / VOL≥{regime['vol_mult']}x\n"
)
# 1시간 이상 보유 포지션만 필터
long_positions = {
ticker: pos for ticker, pos in positions.items()
if (datetime.now() - pos["entry_time"]).total_seconds() >= 3600
}
cum_emoji = "💚" if cum_profit >= 0 else "🔴"
budget_info = (
f"💰 운용예산: {max_budget:,}원 | 포지션당: {per_position:,}\n"
f"{cum_emoji} 누적손익: {cum_sign}{cum_profit:,.0f}\n"
if max_budget else ""
)
# 포지션 없어도 레짐 정보는 전송
header = f"📊 <b>[{now} 현황]</b>\n{regime_line}{budget_info}"
if not long_positions:
_send(header + "1h+ 보유 포지션 없음")
return return
lines = [f"📊 <b>[{now} 현황]</b>"] lines = [header]
for ticker, pos in positions.items(): for ticker, pos in long_positions.items():
current = pyupbit.get_current_price(ticker) current = pyupbit.get_current_price(ticker)
if not current: if not current:
continue continue
@@ -73,9 +119,9 @@ def notify_status(positions: dict) -> None:
elapsed = (datetime.now() - pos["entry_time"]).total_seconds() / 3600 elapsed = (datetime.now() - pos["entry_time"]).total_seconds() / 3600
emoji = "📈" if pnl >= 0 else "📉" emoji = "📈" if pnl >= 0 else "📉"
lines.append( lines.append(
f"\n{emoji} <b>{ticker}</b>\n" f"{emoji} <b>{ticker}</b>\n"
f" 현재가: {current:,.0f}\n" f" 현재가: {current:,.0f}\n"
f" 수익률: {pnl:+.1f}%\n" f" 수익률: {pnl:+.2f}%\n"
f" 최고가 대비: -{drop:.1f}%\n" f" 최고가 대비: -{drop:.1f}%\n"
f" 보유: {elapsed:.1f}h" f" 보유: {elapsed:.1f}h"
) )

View File

@@ -5,10 +5,12 @@ from __future__ import annotations
import logging import logging
import time import time
import pyupbit
import requests import requests
from .market import get_top_tickers from .market import get_top_tickers
from .price_db import cleanup_old_prices, insert_prices from .market_regime import LEADERS
from .price_db import cleanup_old_prices, insert_prices, insert_prices_with_time
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -16,6 +18,42 @@ COLLECT_INTERVAL = 600 # 10분 (초)
CLEANUP_EVERY = 6 # 1시간(10분 × 6)마다 오래된 데이터 정리 CLEANUP_EVERY = 6 # 1시간(10분 × 6)마다 오래된 데이터 정리
def backfill_prices(hours: int = 48) -> None:
"""시작 시 과거 N시간치 1시간봉 종가를 DB에 백필.
price_history에 데이터가 없으면 추세 판단이 불가능하므로
봇 시작 직후 한 번 호출해 과거 데이터를 채운다.
"""
tickers = get_top_tickers()
if not tickers:
logger.warning("[백필] 종목 목록 없음, 스킵")
return
# 대장 코인 항상 포함
for leader in LEADERS:
if leader not in tickers:
tickers = tickers + [leader]
count = hours + 2 # 여유 있게 요청
total_rows = 0
for ticker in tickers:
try:
df = pyupbit.get_ohlcv(ticker, interval="minute60", count=count)
if df is None or df.empty:
continue
rows = [
(ticker, float(row["close"]), ts.to_pydatetime())
for ts, row in df.iterrows()
]
insert_prices_with_time(rows)
total_rows += len(rows)
time.sleep(0.1)
except Exception as e:
logger.error(f"[백필] {ticker} 오류: {e}")
logger.info(f"[백필] 완료 — {len(tickers)}개 종목 / {total_rows}개 레코드 저장")
def run_collector(interval: int = COLLECT_INTERVAL) -> None: def run_collector(interval: int = COLLECT_INTERVAL) -> None:
"""가격 수집 루프.""" """가격 수집 루프."""
logger.info(f"가격 수집기 시작 (주기={interval//60}분)") logger.info(f"가격 수집기 시작 (주기={interval//60}분)")
@@ -26,6 +64,10 @@ def run_collector(interval: int = COLLECT_INTERVAL) -> None:
tickers = get_top_tickers() tickers = get_top_tickers()
if not tickers: if not tickers:
continue continue
# 대장 코인은 top20 밖이어도 항상 포함
for leader in LEADERS:
if leader not in tickers:
tickers = tickers + [leader]
resp = requests.get( resp = requests.get(
"https://api.upbit.com/v1/ticker", "https://api.upbit.com/v1/ticker",
params={"markets": ",".join(tickers)}, params={"markets": ",".join(tickers)},

View File

@@ -44,7 +44,7 @@ def _conn() -> Generator[oracledb.Connection, None, None]:
def insert_prices(ticker_prices: dict[str, float]) -> None: def insert_prices(ticker_prices: dict[str, float]) -> None:
"""여러 종목의 현재가를 한 번에 저장.""" """여러 종목의 현재가를 한 번에 저장 (recorded_at = 현재 시각)."""
if not ticker_prices: if not ticker_prices:
return return
rows = [(ticker, price) for ticker, price in ticker_prices.items()] rows = [(ticker, price) for ticker, price in ticker_prices.items()]
@@ -53,6 +53,18 @@ def insert_prices(ticker_prices: dict[str, float]) -> None:
conn.cursor().executemany(sql, rows) conn.cursor().executemany(sql, rows)
def insert_prices_with_time(rows: list[tuple]) -> None:
"""(ticker, price, recorded_at) 튜플 리스트를 한 번에 저장 (백필용)."""
if not rows:
return
sql = """
INSERT INTO price_history (ticker, price, recorded_at)
VALUES (:1, :2, :3)
"""
with _conn() as conn:
conn.cursor().executemany(sql, rows)
def get_price_n_hours_ago(ticker: str, hours: float) -> Optional[float]: def get_price_n_hours_ago(ticker: str, hours: float) -> Optional[float]:
"""N시간 전 가장 가까운 가격 반환. 데이터 없으면 None.""" """N시간 전 가장 가까운 가격 반환. 데이터 없으면 None."""
sql = """ sql = """
@@ -89,3 +101,208 @@ def cleanup_old_prices(keep_hours: int = 48) -> None:
sql = f"DELETE FROM price_history WHERE recorded_at < SYSTIMESTAMP - ({keep_hours}/24)" sql = f"DELETE FROM price_history WHERE recorded_at < SYSTIMESTAMP - ({keep_hours}/24)"
with _conn() as conn: with _conn() as conn:
conn.cursor().execute(sql) conn.cursor().execute(sql)
# ── 포지션 영구 저장 (재시작 후 실제 매수가 복원용) ──────────────────────────
def upsert_position(
ticker: str,
buy_price: float,
peak_price: float,
amount: float,
invested_krw: int,
entry_time: str, # ISO 포맷 문자열
trade_id: str = "",
) -> None:
"""포지션 저장 또는 갱신 (MERGE)."""
sql = """
MERGE INTO positions p
USING (SELECT :ticker AS ticker FROM dual) s
ON (p.ticker = s.ticker)
WHEN MATCHED THEN
UPDATE SET peak_price = :peak_price,
amount = :amount,
invested_krw = :invested_krw,
updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
INSERT (ticker, buy_price, peak_price, amount, invested_krw, entry_time, trade_id)
VALUES (:ticker, :buy_price, :peak_price, :amount, :invested_krw,
TO_TIMESTAMP(:entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6'), :trade_id)
"""
with _conn() as conn:
conn.cursor().execute(sql, {
"ticker": ticker,
"buy_price": buy_price,
"peak_price": peak_price,
"amount": amount,
"invested_krw": invested_krw,
"entry_time": entry_time,
"trade_id": trade_id,
})
def delete_position(ticker: str) -> None:
"""포지션 삭제 (매도 완료 시)."""
with _conn() as conn:
conn.cursor().execute(
"DELETE FROM positions WHERE ticker = :ticker", {"ticker": ticker}
)
# ── Walk-forward 거래 이력 ────────────────────────────────────────────────────
def ensure_trade_results_table() -> None:
"""trade_results 테이블이 없으면 생성."""
ddl = """
CREATE TABLE trade_results (
id NUMBER GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
ticker VARCHAR2(20) NOT NULL,
is_win NUMBER(1) NOT NULL,
pnl_pct NUMBER(10,4),
traded_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
)
"""
idx = "CREATE INDEX idx_tr_ticker ON trade_results (ticker, traded_at DESC)"
with _conn() as conn:
for sql in (ddl, idx):
try:
conn.cursor().execute(sql)
except oracledb.DatabaseError as e:
if e.args[0].code not in (955, 1408):
raise
def record_trade(
ticker: str,
is_win: bool,
pnl_pct: float,
fee_krw: float = 0.0,
krw_profit: float = 0.0,
trade_id: str = "",
buy_price: float = 0.0,
sell_price: float = 0.0,
invested_krw: int = 0,
sell_reason: str = "",
) -> None:
"""거래 결과 저장 (수수료·KRW 손익·trade_id 포함)."""
with _conn() as conn:
conn.cursor().execute(
"INSERT INTO trade_results "
"(ticker, is_win, pnl_pct, fee_krw, krw_profit, "
" trade_id, buy_price, sell_price, invested_krw, sell_reason) "
"VALUES (:t, :w, :p, :f, :k, :tid, :bp, :sp, :ikrw, :sr)",
{
"t": ticker,
"w": 1 if is_win else 0,
"p": round(pnl_pct, 4),
"f": round(fee_krw, 2),
"k": round(krw_profit, 2),
"tid": trade_id,
"bp": buy_price,
"sp": sell_price,
"ikrw": invested_krw,
"sr": sell_reason,
},
)
def get_cumulative_krw_profit() -> float:
"""전체 거래 누적 KRW 손익 반환 (수수료 차감 후). 데이터 없으면 0."""
with _conn() as conn:
cur = conn.cursor()
cur.execute("SELECT SUM(krw_profit) FROM trade_results WHERE krw_profit IS NOT NULL")
row = cur.fetchone()
return float(row[0]) if row and row[0] is not None else 0.0
def load_recent_wins(ticker: str, n: int = 5) -> list[bool]:
"""직전 N건 거래의 승/패 리스트 반환 (오래된 순). 없으면 빈 리스트."""
sql = """
SELECT is_win FROM (
SELECT is_win FROM trade_results
WHERE ticker = :t
ORDER BY traded_at DESC
FETCH FIRST :n ROWS ONLY
) ORDER BY ROWNUM DESC
"""
with _conn() as conn:
cur = conn.cursor()
cur.execute(sql, {"t": ticker, "n": n})
rows = cur.fetchall()
return [bool(r[0]) for r in rows]
# ── 직전 매도가 영구 저장 (재시작 후 재매수 차단 유지용) ──────────────────────
def ensure_sell_prices_table() -> None:
"""sell_prices 테이블이 없으면 생성."""
ddl = """
CREATE TABLE sell_prices (
ticker VARCHAR2(20) NOT NULL PRIMARY KEY,
price NUMBER(20,8) NOT NULL,
updated_at TIMESTAMP DEFAULT SYSTIMESTAMP NOT NULL
)
"""
with _conn() as conn:
try:
conn.cursor().execute(ddl)
except oracledb.DatabaseError as e:
if e.args[0].code != 955: # ORA-00955: 이미 존재
raise
def upsert_sell_price(ticker: str, price: float) -> None:
"""직전 매도가 저장 또는 갱신."""
sql = """
MERGE INTO sell_prices s
USING (SELECT :ticker AS ticker FROM dual) d
ON (s.ticker = d.ticker)
WHEN MATCHED THEN
UPDATE SET price = :price, updated_at = SYSTIMESTAMP
WHEN NOT MATCHED THEN
INSERT (ticker, price) VALUES (:ticker, :price)
"""
with _conn() as conn:
conn.cursor().execute(sql, {"ticker": ticker, "price": price})
def load_sell_prices() -> dict[str, float]:
"""저장된 직전 매도가 전체 로드."""
with _conn() as conn:
cur = conn.cursor()
cur.execute("SELECT ticker, price FROM sell_prices")
return {r[0]: float(r[1]) for r in cur.fetchall()}
def delete_sell_price(ticker: str) -> None:
"""매도가 기록 삭제 (더 이상 필요 없을 때)."""
with _conn() as conn:
conn.cursor().execute(
"DELETE FROM sell_prices WHERE ticker = :ticker", {"ticker": ticker}
)
def load_positions() -> list[dict]:
"""저장된 전체 포지션 로드."""
sql = """
SELECT ticker, buy_price, peak_price, amount, invested_krw,
TO_CHAR(entry_time, 'YYYY-MM-DD"T"HH24:MI:SS.FF6') AS entry_time,
trade_id
FROM positions
"""
with _conn() as conn:
cursor = conn.cursor()
cursor.execute(sql)
rows = cursor.fetchall()
return [
{
"ticker": r[0],
"buy_price": float(r[1]),
"peak_price": float(r[2]),
"amount": float(r[3]),
"invested_krw": int(r[4]),
"entry_time": r[5],
"trade_id": r[6] or "",
}
for r in rows
]

View File

@@ -1,29 +1,32 @@
"""Strategy C: 실시간 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호.""" """Strategy C: 현재 기준 N시간 전 대비 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호."""
from __future__ import annotations from __future__ import annotations
import logging import logging
import os import os
import pyupbit
from .market import get_current_price, get_ohlcv from .market import get_current_price, get_ohlcv
from .market_regime import get_regime
from .price_db import get_price_n_hours_ago from .price_db import get_price_n_hours_ago
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# 추세 판단: N시간 전 대비 +M% 이상이면 상승 중 # 추세 판단: 현재 기준 N시간 전 DB 가격 대비 +M% 이상이면 상승 중
TREND_HOURS = float(os.getenv("TREND_HOURS", "1")) TREND_HOURS = float(os.getenv("TREND_HOURS", "12"))
TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "3")) TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "5")) # 레짐이 없을 때 기본값
# 모멘텀: MA 기간, 거래량 급증 배수 # 모멘텀: MA 기간, 거래량 급증 배수
MA_PERIOD = 20 MA_PERIOD = 20
VOLUME_MULTIPLIER = 2.0 VOLUME_MULTIPLIER = float(os.getenv("VOLUME_MULTIPLIER", "2.0")) # 레짐이 없을 때 기본값
LOCAL_VOL_HOURS = 5 # 로컬 기준 시간 (h)
def check_trend(ticker: str) -> bool: def check_trend(ticker: str, min_gain_pct: float) -> bool:
"""상승 추세 조건: 현재가가 N시간 전 대비 +M% 이상.""" """상승 추세 조건: 현재가가 DB에 저장된 N시간 전 가격 대비 +min_gain_pct% 이상."""
past_price = get_price_n_hours_ago(ticker, TREND_HOURS) past_price = get_price_n_hours_ago(ticker, TREND_HOURS)
if past_price is None: if past_price is None:
logger.debug(f"[추세] {ticker} 과거 가격 없음 (데이터 수집 중)") logger.debug(f"[추세] {ticker} {TREND_HOURS:.0f}h 전 가격 없음 (수집 중)")
return False return False
current = get_current_price(ticker) current = get_current_price(ticker)
@@ -31,48 +34,74 @@ def check_trend(ticker: str) -> bool:
return False return False
gain_pct = (current - past_price) / past_price * 100 gain_pct = (current - past_price) / past_price * 100
result = gain_pct >= TREND_MIN_GAIN_PCT result = gain_pct >= min_gain_pct
if result: if result:
logger.info( logger.info(
f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.0f} " f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.2f} "
f"현재={current:,.0f} (+{gain_pct:.1f}%)" f"현재={current:,.2f} (+{gain_pct:.1f}%{min_gain_pct}%)"
) )
else: else:
logger.debug( logger.debug(
f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={TREND_MIN_GAIN_PCT:+.0f}%)" f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={min_gain_pct:+.0f}%)"
) )
return result return result
def check_momentum(ticker: str) -> bool: def check_momentum(ticker: str, vol_mult: float) -> bool:
"""모멘텀 조건: 현재가 > MA20 AND 오늘 거래량 > 20일 평균 × 2.""" """모멘텀 조건: 현재가 > MA20(일봉) AND 최근 1h 거래량 > 로컬 5h 평균 × vol_mult.
df = get_ohlcv(ticker, count=MA_PERIOD + 1)
if df is None or len(df) < MA_PERIOD + 1: 23h 평균은 낮 시간대 고거래량이 포함돼 새벽에 항상 미달하므로,
로컬 5h 평균(같은 시간대 컨텍스트)과 비교한다.
"""
# MA20: 일봉 기준
df_daily = get_ohlcv(ticker, count=MA_PERIOD + 1)
if df_daily is None or len(df_daily) < MA_PERIOD + 1:
return False return False
ma = df["close"].iloc[-MA_PERIOD:].mean() ma = df_daily["close"].iloc[-MA_PERIOD:].mean()
avg_vol = df["volume"].iloc[:-1].mean()
today_vol = df["volume"].iloc[-1]
current = get_current_price(ticker) current = get_current_price(ticker)
if current is None: if current is None:
return False return False
price_ok = current > ma price_ok = current > ma
vol_ok = today_vol > avg_vol * VOLUME_MULTIPLIER if not price_ok:
result = price_ok and vol_ok logger.debug(f"[모멘텀✗] {ticker} 현재={current:,.0f} < MA20={ma:,.0f} (가격 기준 미달)")
return False
if result: # 거래량: 60분봉 기준 (최근 1h vs 이전 LOCAL_VOL_HOURS h 로컬 평균)
logger.debug( fetch_count = LOCAL_VOL_HOURS + 3 # 여유 있게 fetch
f"[모멘텀] {ticker} 현재={current:,.0f} MA20={ma:,.0f} " try:
f"거래량={today_vol:.0f} 평균={avg_vol:.0f}" df_hour = pyupbit.get_ohlcv(ticker, interval="minute60", count=fetch_count)
except Exception:
return False
if df_hour is None or len(df_hour) < LOCAL_VOL_HOURS + 1:
return False
recent_vol = df_hour["volume"].iloc[-2] # 직전 완성된 1h 봉
local_avg = df_hour["volume"].iloc[-(LOCAL_VOL_HOURS + 1):-2].mean() # 이전 LOCAL_VOL_HOURS h 평균
vol_ok = local_avg > 0 and recent_vol >= local_avg * vol_mult
ratio = recent_vol / local_avg if local_avg > 0 else 0
if vol_ok:
logger.info(
f"[모멘텀↑] {ticker} 현재={current:,.0f} MA20={ma:,.0f} "
f"1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} ({ratio:.2f}x ≥ {vol_mult}x)"
) )
return result else:
logger.debug(
f"[모멘텀✗] {ticker} 1h거래량={recent_vol:.0f} 로컬{LOCAL_VOL_HOURS}h평균={local_avg:.0f} "
f"({ratio:.2f}x < {vol_mult}x)"
)
return vol_ok
def should_buy(ticker: str) -> bool: def should_buy(ticker: str) -> bool:
"""Strategy C: 실시간 상승 추세 AND 거래량 모멘텀 모두 충족 시 True.""" """Strategy C + 시장 레짐: 레짐별 동적 임계값으로 추세 AND 모멘텀 판단."""
if not check_trend(ticker): regime = get_regime()
trend_pct = regime["trend_pct"]
vol_mult = regime["vol_mult"]
if not check_trend(ticker, trend_pct):
return False return False
return check_momentum(ticker) return check_momentum(ticker, vol_mult)

View File

@@ -6,25 +6,65 @@ import logging
import os import os
import threading import threading
import time import time
import uuid
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
import pyupbit import pyupbit
from dotenv import load_dotenv from dotenv import load_dotenv
from .notify import notify_buy, notify_sell, notify_error from .notify import notify_buy, notify_sell, notify_error
from .price_db import (
delete_position, load_positions, upsert_position,
ensure_trade_results_table, record_trade, load_recent_wins,
ensure_sell_prices_table, upsert_sell_price, load_sell_prices,
get_cumulative_krw_profit,
)
load_dotenv() load_dotenv()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
MAX_BUDGET = 1_000_000 # 총 운용 한도: 100만원 INITIAL_BUDGET = int(os.getenv("MAX_BUDGET", "10000000")) # 초기 원금 (고정)
MAX_POSITIONS = 3 # 최대 동시 보유 종목 수 MAX_POSITIONS = int(os.getenv("MAX_POSITIONS", "3")) # 최대 동시 보유 종목 수
PER_POSITION = MAX_BUDGET // MAX_POSITIONS # 종목당 33만3천원
# 복리 적용 예산 (매도 후 재계산) — 수익 발생 시만 증가, 손실 시 원금 유지
MAX_BUDGET = INITIAL_BUDGET
PER_POSITION = INITIAL_BUDGET // MAX_POSITIONS
def _recalc_compound_budget() -> None:
"""누적 수익을 반영해 MAX_BUDGET / PER_POSITION 재계산.
수익이 발생한 만큼만 예산에 더함 (손실 시 원금 아래로 내려가지 않음).
매도 완료 후 호출.
"""
global MAX_BUDGET, PER_POSITION
try:
cum_profit = get_cumulative_krw_profit()
effective = INITIAL_BUDGET + max(int(cum_profit), 0)
MAX_BUDGET = effective
PER_POSITION = effective // MAX_POSITIONS
logger.info(
f"[복리] 누적수익={cum_profit:+,.0f}원 | "
f"운용예산={MAX_BUDGET:,}원 | 포지션당={PER_POSITION:,}"
)
except Exception as e:
logger.warning(f"[복리] 예산 재계산 실패 (이전 값 유지): {e}")
# Walk-forward 필터 설정
WF_WINDOW = int(float(os.getenv("WF_WINDOW", "5"))) # 이력 윈도우 크기
WF_MIN_WIN_RATE = float(os.getenv("WF_MIN_WIN_RATE", "0.40")) # 최소 승률 임계값
_lock = threading.Lock() _lock = threading.Lock()
_positions: dict = {} _positions: dict = {}
# 구조: { ticker: { buy_price, peak_price, amount, invested_krw, entry_time } } # 구조: { ticker: { buy_price, peak_price, amount, invested_krw, entry_time } }
_last_sell_prices: dict[str, float] = {}
# 직전 매도가 기록 — 재매수 시 이 가격 이상일 때만 진입 허용
_trade_history: dict[str, list[bool]] = {}
# walk-forward 이력: { ticker: [True/False, ...] } (True=수익)
_upbit: Optional[pyupbit.Upbit] = None _upbit: Optional[pyupbit.Upbit] = None
@@ -35,14 +75,107 @@ def _get_upbit() -> pyupbit.Upbit:
return _upbit return _upbit
def _get_history(ticker: str) -> list[bool]:
"""in-memory 이력 반환. 없으면 DB에서 초기 로드."""
if ticker not in _trade_history:
try:
_trade_history[ticker] = load_recent_wins(ticker, WF_WINDOW)
except Exception:
_trade_history[ticker] = []
return _trade_history[ticker]
def _update_history(
ticker: str, is_win: bool, pnl_pct: float,
fee_krw: float = 0.0, krw_profit: float = 0.0,
trade_id: str = "", buy_price: float = 0.0,
sell_price: float = 0.0, invested_krw: int = 0,
sell_reason: str = "",
) -> None:
"""매도 후 in-memory 이력 갱신 + DB 기록."""
hist = _trade_history.setdefault(ticker, [])
hist.append(is_win)
# 윈도우 초과분 제거 (메모리 절약)
if len(hist) > WF_WINDOW * 2:
_trade_history[ticker] = hist[-WF_WINDOW:]
try:
record_trade(
ticker, is_win, pnl_pct, fee_krw, krw_profit,
trade_id, buy_price, sell_price, invested_krw, sell_reason,
)
except Exception as e:
logger.error(f"거래 이력 저장 실패 {ticker}: {e}")
def _db_upsert(ticker: str, pos: dict) -> None:
"""포지션을 Oracle DB에 저장 (실패해도 거래는 계속)."""
try:
upsert_position(
ticker=ticker,
buy_price=pos["buy_price"],
peak_price=pos["peak_price"],
amount=pos["amount"],
invested_krw=pos["invested_krw"],
entry_time=pos["entry_time"].isoformat(),
trade_id=pos.get("trade_id", ""),
)
except Exception as e:
logger.error(f"포지션 DB 저장 실패 {ticker}: {e}")
def get_positions() -> dict: def get_positions() -> dict:
return _positions return _positions
def get_budget_info() -> dict:
"""현재 복리 예산 정보 반환 (main.py 등 외부에서 동적 조회용)."""
return {
"max_budget": MAX_BUDGET,
"per_position": PER_POSITION,
"initial": INITIAL_BUDGET,
}
def restore_positions() -> None: def restore_positions() -> None:
"""시작 시 Upbit 실제 잔고를 읽어 포지션 복원 (재시작 이중 매수 방지).""" """시작 시 Oracle DB + Upbit 잔고를 교차 확인하여 포지션 복원.
trade_results 테이블도 이 시점에 생성 (없으면).
DB에 저장된 실제 매수가를 복원하고, Upbit 잔고에 없으면 DB에서도 삭제한다.
"""
# trade_results / sell_prices 테이블 초기화
try:
ensure_trade_results_table()
except Exception as e:
logger.warning(f"trade_results 테이블 생성 실패 (무시): {e}")
# 시작 시 복리 예산 복원 (이전 세션 수익 반영)
_recalc_compound_budget()
try:
ensure_sell_prices_table()
except Exception as e:
logger.warning(f"sell_prices 테이블 생성 실패 (무시): {e}")
# 직전 매도가 복원 (재매수 차단 기준 유지)
try:
loaded = load_sell_prices()
_last_sell_prices.update(loaded)
if loaded:
logger.info(f"[복원] 직전 매도가 {len(loaded)}건 복원: {list(loaded.keys())}")
except Exception as e:
logger.warning(f"직전 매도가 복원 실패 (무시): {e}")
# DB에서 저장된 포지션 로드
try:
saved = {row["ticker"]: row for row in load_positions()}
except Exception as e:
logger.error(f"DB 포지션 로드 실패: {e}")
saved = {}
upbit = _get_upbit() upbit = _get_upbit()
balances = upbit.get_balances() balances = upbit.get_balances()
upbit_tickers = set()
for b in balances: for b in balances:
currency = b["currency"] currency = b["currency"]
if currency == "KRW": if currency == "KRW":
@@ -57,18 +190,52 @@ def restore_positions() -> None:
invested_krw = int(amount * current) invested_krw = int(amount * current)
if invested_krw < 1_000: # 소액 잔고 무시 if invested_krw < 1_000: # 소액 잔고 무시
continue continue
with _lock:
_positions[ticker] = { upbit_tickers.add(ticker)
"buy_price": current, # 정확한 매수가 불명 → 현재가로 초기화
"peak_price": current, if ticker in saved:
"amount": amount, # DB에 저장된 실제 매수가 복원
"invested_krw": min(invested_krw, PER_POSITION), s = saved[ticker]
"entry_time": datetime.now(), peak = max(s["peak_price"], current) # 재시작 중 올랐을 수 있으므로 높은 쪽
} entry_time = datetime.fromisoformat(s["entry_time"]) if isinstance(s["entry_time"], str) else s["entry_time"]
logger.info( with _lock:
f"[복원] {ticker} 수량={amount} | 현재가={current:,.0f}" _positions[ticker] = {
f"(재시작 시 복원, 매수가 불명으로 현재가 기준)" "buy_price": s["buy_price"],
) "peak_price": peak,
"amount": amount,
"invested_krw": s["invested_krw"],
"entry_time": entry_time,
"trade_id": s.get("trade_id", ""),
}
logger.info(
f"[복원] {ticker} 매수가={s['buy_price']:,.0f}원 | 현재가={current:,.0f}"
f"| 수량={amount} (DB 복원)"
)
else:
# DB에 없음 → 현재가로 초기화 후 DB에 저장
entry_time = datetime.now()
with _lock:
_positions[ticker] = {
"buy_price": current,
"peak_price": current,
"amount": amount,
"invested_krw": min(invested_krw, PER_POSITION),
"entry_time": entry_time,
}
_db_upsert(ticker, _positions[ticker])
logger.warning(
f"[복원] {ticker} 현재가={current:,.0f}원 | 수량={amount} "
f"(DB 기록 없음 → 현재가로 초기화)"
)
# Upbit 잔고에 없는데 DB에 남아있는 항목 정리
for ticker in saved:
if ticker not in upbit_tickers:
try:
delete_position(ticker)
logger.info(f"[정리] {ticker} Upbit 잔고 없음 → DB 포지션 삭제")
except Exception as e:
logger.error(f"DB 포지션 삭제 실패 {ticker}: {e}")
def buy(ticker: str) -> bool: def buy(ticker: str) -> bool:
@@ -78,6 +245,34 @@ def buy(ticker: str) -> bool:
logger.debug(f"{ticker} 이미 보유 중") logger.debug(f"{ticker} 이미 보유 중")
return False return False
# 직전 매도가 +1% 이상일 때만 재진입 (손절 직후 역방향 재매수 방지)
# 단, 직전 거래가 수익(승)이었으면 이 필터 스킵 — 다시 상승 시 재진입 허용
if ticker in _last_sell_prices:
hist = _get_history(ticker)
last_was_win = bool(hist[-1]) if hist else False
if not last_was_win:
current_check = pyupbit.get_current_price(ticker)
last_sell = _last_sell_prices[ticker]
threshold = last_sell * 1.01
if current_check and current_check < threshold:
logger.info(
f"[재매수 차단] {ticker} 현재={current_check:,.2f} < "
f"직전매도+1%={threshold:,.2f} → 상승 흐름 미확인"
)
return False
# Walk-forward 필터: 직전 WF_WINDOW건 승률이 낮으면 진입 차단
if WF_MIN_WIN_RATE > 0:
hist = _get_history(ticker)
if len(hist) >= WF_WINDOW:
recent_wr = sum(hist[-WF_WINDOW:]) / WF_WINDOW
if recent_wr < WF_MIN_WIN_RATE:
logger.info(
f"[WF차단] {ticker} 직전{WF_WINDOW}건 승률={recent_wr*100:.0f}%"
f" < {WF_MIN_WIN_RATE*100:.0f}% → 진입 차단"
)
return False
if len(_positions) >= MAX_POSITIONS: if len(_positions) >= MAX_POSITIONS:
logger.info(f"최대 포지션 도달({MAX_POSITIONS}), {ticker} 패스") logger.info(f"최대 포지션 도달({MAX_POSITIONS}), {ticker} 패스")
return False return False
@@ -92,7 +287,6 @@ def buy(ticker: str) -> bool:
upbit = _get_upbit() upbit = _get_upbit()
try: try:
current = pyupbit.get_current_price(ticker)
result = upbit.buy_market_order(ticker, order_krw) result = upbit.buy_market_order(ticker, order_krw)
if not result or "error" in str(result): if not result or "error" in str(result):
logger.error(f"매수 실패: {result}") logger.error(f"매수 실패: {result}")
@@ -102,18 +296,26 @@ def buy(ticker: str) -> bool:
currency = ticker.split("-")[1] currency = ticker.split("-")[1]
amount = float(upbit.get_balance(currency) or 0) amount = float(upbit.get_balance(currency) or 0)
# 실제 체결가 = 투자금 / 수량 (시장가 주문 슬리피지 반영)
actual_price = order_krw / amount if amount > 0 else pyupbit.get_current_price(ticker)
entry_time = datetime.now()
trade_id = str(uuid.uuid4())
_positions[ticker] = { _positions[ticker] = {
"buy_price": current, "buy_price": actual_price,
"peak_price": current, "peak_price": actual_price,
"amount": amount, "amount": amount,
"invested_krw": order_krw, "invested_krw": order_krw,
"entry_time": datetime.now(), "entry_time": entry_time,
"trade_id": trade_id,
} }
_db_upsert(ticker, _positions[ticker])
logger.info( logger.info(
f"[매수] {ticker} @ {current:,.0f}원 | " f"[매수] {ticker} @ {actual_price:,.0f} (실체결가) | "
f"수량={amount} | 투자금={order_krw:,}" f"수량={amount} | 투자금={order_krw:,} | trade_id={trade_id[:8]}"
) )
notify_buy(ticker, current, amount, order_krw) notify_buy(ticker, actual_price, amount, order_krw,
max_budget=MAX_BUDGET, per_position=PER_POSITION)
return True return True
except Exception as e: except Exception as e:
logger.error(f"매수 예외 {ticker}: {e}") logger.error(f"매수 예외 {ticker}: {e}")
@@ -121,6 +323,59 @@ def buy(ticker: str) -> bool:
return False return False
def _get_avg_fill_price(
upbit: pyupbit.Upbit,
order_uuid: str,
ticker: str,
fallback: float,
) -> tuple[float, float | None]:
"""주문 UUID로 실제 체결 내역을 조회해 가중평균 체결가와 실수수료를 반환.
분할 체결(여러 fills)이면 합산 평균가 계산.
조회 실패 시 (fallback_price, None) 반환.
"""
if not order_uuid:
return fallback, None
try:
import hashlib
import jwt as _jwt
import requests as _req
query_str = f"uuid={order_uuid}"
payload = {
"access_key": upbit.access_key,
"nonce": str(uuid.uuid4()),
"query_hash": hashlib.sha512(query_str.encode()).hexdigest(),
"query_hash_alg": "SHA512",
}
token = _jwt.encode(payload, upbit.secret_key, algorithm="HS256")
resp = _req.get(
"https://api.upbit.com/v1/order",
params={"uuid": order_uuid},
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
data = resp.json()
trades = data.get("trades", [])
if not trades:
return fallback, None
total_vol = sum(float(t["volume"]) for t in trades)
total_krw = sum(float(t["price"]) * float(t["volume"]) for t in trades)
avg_price = total_krw / total_vol if total_vol > 0 else fallback
paid_fee = float(data.get("paid_fee", 0))
if len(trades) > 1:
logger.info(
f"[분할체결] {ticker} {len(trades)}건 → "
f"평균={avg_price:,.4f}원 (수수료={paid_fee:,.0f}원)"
)
return avg_price, paid_fee
except Exception as e:
logger.debug(f"[체결조회 실패] {ticker} uuid={order_uuid[:8]}: {e}")
return fallback, None
def sell(ticker: str, reason: str = "") -> bool: def sell(ticker: str, reason: str = "") -> bool:
"""시장가 전량 매도.""" """시장가 전량 매도."""
with _lock: with _lock:
@@ -130,19 +385,71 @@ def sell(ticker: str, reason: str = "") -> bool:
pos = _positions[ticker] pos = _positions[ticker]
upbit = _get_upbit() upbit = _get_upbit()
try: try:
result = upbit.sell_market_order(ticker, pos["amount"]) currency = ticker.split("-")[1]
# 실제 잔고 확인 (재시작 후 이미 매도된 경우 대비)
actual_amount = float(upbit.get_balance(currency) or 0)
if actual_amount < 0.00001:
logger.warning(f"[매도] {ticker} 실제 잔고 없음 → 포지션 정리 (이미 매도됨)")
del _positions[ticker]
return True
result = upbit.sell_market_order(ticker, actual_amount)
if not result or "error" in str(result): if not result or "error" in str(result):
logger.error(f"매도 실패: {result}") logger.error(f"매도 실패: {result}")
# 실패 후에도 잔고 재확인 → 0이면 실제로는 매도됨
actual_amount2 = float(upbit.get_balance(currency) or 0)
if actual_amount2 < 0.00001:
logger.warning(f"[매도] {ticker} 잔고 소진 확인 → 포지션 정리")
del _positions[ticker]
return True
return False return False
current = pyupbit.get_current_price(ticker) time.sleep(0.5) # 체결 완료 대기
pnl = (current - pos["buy_price"]) / pos["buy_price"] * 100
logger.info( # 실제 체결 내역으로 가중평균 매도가 계산 (분할 체결 대응)
f"[매도] {ticker} @ {current:,.0f}원 | " order_uuid = result.get("uuid", "") if isinstance(result, dict) else ""
f"수익률={pnl:+.1f}% | 사유={reason}" fallback_price = pyupbit.get_current_price(ticker) or pos["buy_price"]
actual_sell_price, actual_fee_from_order = _get_avg_fill_price(
upbit, order_uuid, ticker, fallback_price
)
pnl = (actual_sell_price - pos["buy_price"]) / pos["buy_price"] * 100
sell_value = actual_sell_price * actual_amount
# 수수료: 주문 조회 성공 시 실제값, 아니면 추정값 (0.05% 양방향)
fee = actual_fee_from_order if actual_fee_from_order is not None \
else (pos["invested_krw"] * 0.0005 + sell_value * 0.0005)
krw_profit = sell_value - pos["invested_krw"] - fee
logger.info(
f"[매도] {ticker} @ {actual_sell_price:,.4f}원 | "
f"수익률={pnl:+.1f}% | 순익={krw_profit:+,.0f}원 (수수료 {fee:,.0f}원) | 사유={reason}"
)
try:
cum = get_cumulative_krw_profit() + krw_profit
except Exception:
cum = 0.0
notify_sell(ticker, actual_sell_price, pnl, reason,
krw_profit=krw_profit, fee_krw=fee, cum_profit=cum)
_last_sell_prices[ticker] = actual_sell_price
try:
upsert_sell_price(ticker, actual_sell_price)
except Exception as e:
logger.error(f"직전 매도가 DB 저장 실패 {ticker}: {e}")
_update_history(
ticker, pnl > 0, pnl, fee, krw_profit,
trade_id=pos.get("trade_id", ""),
buy_price=pos["buy_price"],
sell_price=actual_sell_price,
invested_krw=pos["invested_krw"],
sell_reason=reason,
) )
notify_sell(ticker, current, pnl, reason)
del _positions[ticker] del _positions[ticker]
try:
delete_position(ticker)
except Exception as e:
logger.error(f"포지션 DB 삭제 실패 {ticker}: {e}")
# 복리 예산 재계산: 수익 발생분만 다음 투자에 반영
_recalc_compound_budget()
return True return True
except Exception as e: except Exception as e:
logger.error(f"매도 예외 {ticker}: {e}") logger.error(f"매도 예외 {ticker}: {e}")
@@ -156,3 +463,4 @@ def update_peak(ticker: str, current_price: float) -> None:
if ticker in _positions: if ticker in _positions:
if current_price > _positions[ticker]["peak_price"]: if current_price > _positions[ticker]["peak_price"]:
_positions[ticker]["peak_price"] = current_price _positions[ticker]["peak_price"] = current_price
_db_upsert(ticker, _positions[ticker])

37
main.py
View File

@@ -19,24 +19,33 @@ logging.basicConfig(
from core.monitor import run_monitor from core.monitor import run_monitor
from core.notify import notify_error, notify_status from core.notify import notify_error, notify_status
from core.price_collector import run_collector from core.price_collector import backfill_prices, run_collector
from core.trader import get_positions, restore_positions from core.price_db import get_cumulative_krw_profit
from core.trader import get_positions, get_budget_info, restore_positions
from daemon.runner import run_scanner from daemon.runner import run_scanner
STATUS_INTERVAL = 3600 # 1시간마다 요약 전송
def run_status_reporter() -> None:
def run_status_reporter(interval: int = STATUS_INTERVAL) -> None: """매 정각마다 1시간 이상 보유 포지션 현황 전송."""
"""주기적으로 포지션 현황을 Telegram으로 전송.""" import datetime as _dt
logger = logging.getLogger("status") logger = logging.getLogger("status")
logger.info(f"상태 리포터 시작 (주기={interval//60})") logger.info("상태 리포터 시작 (매 정각 트리거)")
time.sleep(interval) # 첫 전송은 1시간 후
while True: while True:
now = _dt.datetime.now()
# 다음 정각까지 대기
secs_to_next_hour = (60 - now.minute) * 60 - now.second
time.sleep(secs_to_next_hour)
try: try:
notify_status(dict(get_positions())) budget = get_budget_info()
cum = get_cumulative_krw_profit()
notify_status(
dict(get_positions()),
max_budget=budget["max_budget"],
per_position=budget["per_position"],
cum_profit=cum,
)
except Exception as e: except Exception as e:
logger.error(f"상태 리포트 오류: {e}") logger.error(f"상태 리포트 오류: {e}")
time.sleep(interval)
def main() -> None: def main() -> None:
@@ -45,19 +54,23 @@ def main() -> None:
# 재시작 시 기존 잔고 복원 (이중 매수 방지) # 재시작 시 기존 잔고 복원 (이중 매수 방지)
restore_positions() restore_positions()
# 과거 가격 백필 (추세 판단용 DB 데이터가 없는 경우 채움)
logger.info("과거 가격 백필 시작 (48시간)...")
backfill_prices(hours=48)
# 트레일링 스탑 감시 스레드 (10초 주기) # 트레일링 스탑 감시 스레드 (10초 주기)
monitor_thread = threading.Thread( monitor_thread = threading.Thread(
target=run_monitor, args=(10,), daemon=True, name="monitor" target=run_monitor, args=(10,), daemon=True, name="monitor"
) )
monitor_thread.start() monitor_thread.start()
# 1시간 주기 상태 리포트 스레드 # 매 정각 상태 리포트 스레드 (1시간 이상 보유 포지션만)
status_thread = threading.Thread( status_thread = threading.Thread(
target=run_status_reporter, daemon=True, name="status" target=run_status_reporter, daemon=True, name="status"
) )
status_thread.start() status_thread.start()
# 10분 주기 가격 수집 스레드 (추세 판단용 DB 저장) # 가격 수집 스레드 (10분 주기 → Oracle DB price_history 저장, 추세 판단용)
collector_thread = threading.Thread( collector_thread = threading.Thread(
target=run_collector, daemon=True, name="collector" target=run_collector, daemon=True, name="collector"
) )