From 80ab004ebaa5f9c5d7ec65376e29096498de610e Mon Sep 17 00:00:00 2001 From: joungmin Date: Sat, 28 Feb 2026 11:26:26 +0900 Subject: [PATCH] feat: replace volatility breakout with DB-backed real-time trend check - price_history table on Oracle ADB stores prices every 10 minutes - check_trend(): current price vs N hours ago (default 1h, +3% threshold) - check_momentum(): unchanged (MA20 + 2x volume still applies) - Ticker list cached 5 minutes to avoid 429 rate limits - Collector starts 30s after boot to avoid simultaneous API calls - Configurable: TREND_HOURS, TREND_MIN_GAIN_PCT in .env Co-Authored-By: Claude Sonnet 4.6 --- .env.example | 10 +++++ core/market.py | 15 ++++++- core/price_collector.py | 52 +++++++++++++++++++++++ core/price_db.py | 91 +++++++++++++++++++++++++++++++++++++++++ core/strategy.py | 57 +++++++++++++++----------- main.py | 7 ++++ 6 files changed, 206 insertions(+), 26 deletions(-) create mode 100644 core/price_collector.py create mode 100644 core/price_db.py diff --git a/.env.example b/.env.example index 894f8de..985107a 100644 --- a/.env.example +++ b/.env.example @@ -1,6 +1,16 @@ ACCESS_KEY= SECRET_KEY= +# Oracle ADB (price_history 저장) +ORACLE_USER= +ORACLE_PASSWORD= +ORACLE_DSN= +ORACLE_WALLET= + +# 추세 판단: N시간 전 대비 +M% 이상이면 상승 중으로 판단 +TREND_HOURS=1 +TREND_MIN_GAIN_PCT=3 + # 트레일링 스탑: 최고가 대비 -N% 도달 시 청산 STOP_LOSS_PCT=5 diff --git a/core/market.py b/core/market.py index 334d7c9..db4b15c 100644 --- a/core/market.py +++ b/core/market.py @@ -3,6 +3,7 @@ from __future__ import annotations import logging +import time import pyupbit import requests @@ -12,9 +13,17 @@ logger = logging.getLogger(__name__) TOP_N = 20 # 거래량 상위 N개 종목만 스캔 _TICKER_URL = "https://api.upbit.com/v1/ticker" +# 티커 목록 캐시 (5분 TTL — API rate limit 방지) +_ticker_cache: list[str] = [] +_ticker_cache_time: float = 0.0 +_TICKER_CACHE_TTL = 300 # 5분 + def get_top_tickers() -> list[str]: - """24시간 거래대금 상위 KRW 마켓 티커 반환.""" + """24시간 거래대금 상위 KRW 마켓 티커 반환 (5분 캐시).""" + global _ticker_cache, _ticker_cache_time + if _ticker_cache and (time.time() - _ticker_cache_time) < _TICKER_CACHE_TTL: + return _ticker_cache try: all_tickers = pyupbit.get_tickers(fiat="KRW") if not all_tickers: @@ -37,11 +46,13 @@ def get_top_tickers() -> list[str]: # 24h 거래대금 기준 정렬 ticker_data.sort(key=lambda x: x.get("acc_trade_price_24h", 0), reverse=True) top = [t["market"] for t in ticker_data[:TOP_N]] + _ticker_cache = top + _ticker_cache_time = time.time() logger.debug(f"상위 {TOP_N}개: {top[:5]}...") return top except Exception as e: logger.error(f"get_top_tickers 실패: {e}") - return [] + return _ticker_cache # 실패 시 이전 캐시 반환 def get_ohlcv(ticker: str, count: int = 21): diff --git a/core/price_collector.py b/core/price_collector.py new file mode 100644 index 0000000..425ceed --- /dev/null +++ b/core/price_collector.py @@ -0,0 +1,52 @@ +"""10분마다 상위 종목 현재가를 Oracle DB에 저장하는 수집기.""" + +from __future__ import annotations + +import logging +import time + +import requests + +from .market import get_top_tickers +from .price_db import cleanup_old_prices, insert_prices + +logger = logging.getLogger(__name__) + +COLLECT_INTERVAL = 600 # 10분 (초) +CLEANUP_EVERY = 6 # 1시간(10분 × 6)마다 오래된 데이터 정리 + + +def run_collector(interval: int = COLLECT_INTERVAL) -> None: + """가격 수집 루프.""" + logger.info(f"가격 수집기 시작 (주기={interval//60}분)") + time.sleep(30) # 스캐너와 동시 API 호출 방지 + cycle = 0 + while True: + try: + tickers = get_top_tickers() + if not tickers: + continue + resp = requests.get( + "https://api.upbit.com/v1/ticker", + params={"markets": ",".join(tickers)}, + timeout=5, + ) + resp.raise_for_status() + data = resp.json() + valid = { + item["market"]: item["trade_price"] + for item in data + if item.get("trade_price") + } + insert_prices(valid) + logger.info(f"[수집] {len(valid)}개 종목 가격 저장") + + cycle += 1 + if cycle % CLEANUP_EVERY == 0: + cleanup_old_prices(keep_hours=48) + logger.info("오래된 가격 데이터 정리 완료") + + except Exception as e: + logger.error(f"가격 수집 오류: {e}") + + time.sleep(interval) diff --git a/core/price_db.py b/core/price_db.py new file mode 100644 index 0000000..b762124 --- /dev/null +++ b/core/price_db.py @@ -0,0 +1,91 @@ +"""Oracle ADB price_history CRUD.""" + +from __future__ import annotations + +import os +from contextlib import contextmanager +from typing import Generator, Optional + +import oracledb + +_pool: Optional[oracledb.ConnectionPool] = None + + +def _get_pool() -> oracledb.ConnectionPool: + global _pool + if _pool is None: + kwargs: dict = dict( + user=os.environ["ORACLE_USER"], + password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"], + min=1, + max=3, + increment=1, + ) + wallet = os.environ.get("ORACLE_WALLET") + if wallet: + kwargs["config_dir"] = wallet + _pool = oracledb.create_pool(**kwargs) + return _pool + + +@contextmanager +def _conn() -> Generator[oracledb.Connection, None, None]: + pool = _get_pool() + conn = pool.acquire() + try: + yield conn + conn.commit() + except Exception: + conn.rollback() + raise + finally: + pool.release(conn) + + +def insert_prices(ticker_prices: dict[str, float]) -> None: + """여러 종목의 현재가를 한 번에 저장.""" + if not ticker_prices: + return + rows = [(ticker, price) for ticker, price in ticker_prices.items()] + sql = "INSERT INTO price_history (ticker, price) VALUES (:1, :2)" + with _conn() as conn: + conn.cursor().executemany(sql, rows) + + +def get_price_n_hours_ago(ticker: str, hours: float) -> Optional[float]: + """N시간 전 가장 가까운 가격 반환. 데이터 없으면 None.""" + sql = """ + SELECT price FROM price_history + WHERE ticker = :ticker + AND recorded_at BETWEEN + SYSTIMESTAMP - INTERVAL ':h' HOUR - INTERVAL '10' MINUTE + AND SYSTIMESTAMP - INTERVAL ':h' HOUR + INTERVAL '10' MINUTE + ORDER BY ABS(CAST(recorded_at AS DATE) - + CAST(SYSTIMESTAMP - INTERVAL ':h' HOUR AS DATE)) + FETCH FIRST 1 ROWS ONLY + """ + # Oracle INTERVAL bind param 미지원으로 직접 포맷 + h = int(hours) + sql = f""" + SELECT price FROM price_history + WHERE ticker = :ticker + AND recorded_at BETWEEN + SYSTIMESTAMP - ({h}/24) - (10/1440) + AND SYSTIMESTAMP - ({h}/24) + (10/1440) + ORDER BY ABS(CAST(recorded_at AS DATE) - + CAST(SYSTIMESTAMP - ({h}/24) AS DATE)) + FETCH FIRST 1 ROWS ONLY + """ + with _conn() as conn: + cursor = conn.cursor() + cursor.execute(sql, {"ticker": ticker}) + row = cursor.fetchone() + return float(row[0]) if row else None + + +def cleanup_old_prices(keep_hours: int = 48) -> None: + """N시간 이상 오래된 데이터 삭제 (DB 용량 관리).""" + sql = f"DELETE FROM price_history WHERE recorded_at < SYSTIMESTAMP - ({keep_hours}/24)" + with _conn() as conn: + conn.cursor().execute(sql) diff --git a/core/strategy.py b/core/strategy.py index a005430..ea6a1cb 100644 --- a/core/strategy.py +++ b/core/strategy.py @@ -1,36 +1,47 @@ -"""Strategy C: 변동성 돌파 AND 모멘텀 동시 충족 시 매수 신호.""" +"""Strategy C: 실시간 상승 추세(DB) AND 거래량 모멘텀 동시 충족 시 매수 신호.""" + +from __future__ import annotations import logging +import os from .market import get_current_price, get_ohlcv +from .price_db import get_price_n_hours_ago logger = logging.getLogger(__name__) -# 변동성 돌파 계수 (래리 윌리엄스 기본값) -BREAKOUT_K = 0.5 -# 모멘텀 이동평균 기간 +# 추세 판단: N시간 전 대비 +M% 이상이면 상승 중 +TREND_HOURS = float(os.getenv("TREND_HOURS", "1")) +TREND_MIN_GAIN_PCT = float(os.getenv("TREND_MIN_GAIN_PCT", "3")) + +# 모멘텀: MA 기간, 거래량 급증 배수 MA_PERIOD = 20 -# 거래량 급증 배수 VOLUME_MULTIPLIER = 2.0 -def check_volatility_breakout(ticker: str) -> bool: - """변동성 돌파 조건: 현재가 > 오늘 시가 + 전일 변동폭 × K.""" - df = get_ohlcv(ticker, count=2) - if df is None or len(df) < 2: +def check_trend(ticker: str) -> bool: + """상승 추세 조건: 현재가가 N시간 전 대비 +M% 이상.""" + past_price = get_price_n_hours_ago(ticker, TREND_HOURS) + if past_price is None: + logger.debug(f"[추세] {ticker} 과거 가격 없음 (데이터 수집 중)") return False - prev = df.iloc[-2] - today = df.iloc[-1] - target = today["open"] + (prev["high"] - prev["low"]) * BREAKOUT_K current = get_current_price(ticker) - - if current is None: + if not current: return False - result = current > target + gain_pct = (current - past_price) / past_price * 100 + result = gain_pct >= TREND_MIN_GAIN_PCT + if result: - logger.debug(f"[변동성돌파] {ticker} 현재가={current:,.0f} 목표가={target:,.0f}") + logger.info( + f"[추세↑] {ticker} {TREND_HOURS:.0f}h 전={past_price:,.0f} " + f"현재={current:,.0f} (+{gain_pct:.1f}%)" + ) + else: + logger.debug( + f"[추세✗] {ticker} {gain_pct:+.1f}% (기준={TREND_MIN_GAIN_PCT:+.0f}%)" + ) return result @@ -41,7 +52,7 @@ def check_momentum(ticker: str) -> bool: return False ma = df["close"].iloc[-MA_PERIOD:].mean() - avg_vol = df["volume"].iloc[:-1].mean() # 오늘 제외한 20일 평균 + avg_vol = df["volume"].iloc[:-1].mean() today_vol = df["volume"].iloc[-1] current = get_current_price(ticker) @@ -54,16 +65,14 @@ def check_momentum(ticker: str) -> bool: if result: logger.debug( - f"[모멘텀] {ticker} 현재가={current:,.0f} MA20={ma:,.0f} " - f"오늘거래량={today_vol:.1f} 평균={avg_vol:.1f}" + f"[모멘텀] {ticker} 현재={current:,.0f} MA20={ma:,.0f} " + f"거래량={today_vol:.0f} 평균={avg_vol:.0f}" ) return result def should_buy(ticker: str) -> bool: - """Strategy C: 변동성 돌파 AND 모멘텀 모두 충족 시 True.""" - vb = check_volatility_breakout(ticker) - if not vb: + """Strategy C: 실시간 상승 추세 AND 거래량 모멘텀 모두 충족 시 True.""" + if not check_trend(ticker): return False - mo = check_momentum(ticker) - return vb and mo + return check_momentum(ticker) diff --git a/main.py b/main.py index 0f1b848..d2053c5 100644 --- a/main.py +++ b/main.py @@ -19,6 +19,7 @@ logging.basicConfig( from core.monitor import run_monitor from core.notify import notify_error, notify_status +from core.price_collector import run_collector from core.trader import get_positions, restore_positions from daemon.runner import run_scanner @@ -56,6 +57,12 @@ def main() -> None: ) status_thread.start() + # 10분 주기 가격 수집 스레드 (추세 판단용 DB 저장) + collector_thread = threading.Thread( + target=run_collector, daemon=True, name="collector" + ) + collector_thread.start() + # 매수 스캔 루프 (60초 주기, 메인 스레드) — 예외 발생 시 Telegram 알림 후 재시작 while True: try: