"""OpenRouter LLM 기반 매매 어드바이저. 매수: 시그널 감지 후 LLM이 매수 여부 + 지정가 결정 매도: 1분 주기로 LLM이 매도 목표가 결정 (cascade fallback) LLM에게 제공하는 DB Tool (OpenAI function calling): - get_price_ticks(ticker, minutes): Oracle price_tick 테이블 (최근 N분 가격 틱) - get_ohlcv(ticker, limit): Oracle backtest_ohlcv 1분봉 (지지/저항 파악용) - get_ticker_context(ticker): 종목 평판 정보 (가격 변동, 뉴스) - get_trade_history(ticker): 최근 거래 이력 (승패, 손익) - get_btc_trend(): BTC 최근 동향 (알트 매수 판단용) """ from __future__ import annotations import json import logging import os from datetime import datetime from typing import Optional log = logging.getLogger(__name__) # 프롬프트에 포함할 봉 수 INPUT_BARS = 20 # ── Oracle DB Tools ────────────────────────────────────────────────────────── def _get_conn(): """Oracle ADB 연결 (.env 기반).""" import oracledb kwargs = dict( user = os.environ['ORACLE_USER'], password = os.environ['ORACLE_PASSWORD'], dsn = os.environ['ORACLE_DSN'], ) if w := os.environ.get('ORACLE_WALLET'): kwargs['config_dir'] = w return oracledb.connect(**kwargs) def _tool_get_price_ticks(ticker: str, minutes: int = 10) -> str: """Oracle price_tick 테이블에서 최근 N분 가격 틱 조회.""" try: conn = _get_conn() cur = conn.cursor() cur.execute( """SELECT ts, price FROM price_tick WHERE ticker = :t AND ts >= SYSTIMESTAMP - NUMTODSINTERVAL(:m, 'MINUTE') ORDER BY ts DESC FETCH FIRST 100 ROWS ONLY""", {'t': ticker, 'm': minutes}, ) rows = cur.fetchall() conn.close() if not rows: return f"{ticker} 최근 {minutes}분 틱 데이터 없음" lines = [f" {r[0].strftime('%H:%M:%S')} {float(r[1]):>12,.2f}원" for r in rows] return f"{ticker} 최근 {minutes}분 가격 틱 ({len(rows)}건):\n" + "\n".join(lines) except Exception as e: return f"DB 오류: {e}" def _tool_get_ohlcv(ticker: str, limit: int = 30) -> str: """Oracle backtest_ohlcv 1분봉 최근 N개 조회 (지지/저항 파악용).""" try: conn = _get_conn() cur = conn.cursor() cur.execute( """SELECT ts, open_p, high_p, low_p, close_p, volume_p FROM backtest_ohlcv WHERE ticker = :t AND interval_cd = 'minute1' ORDER BY ts DESC FETCH FIRST :n ROWS ONLY""", {'t': ticker, 'n': limit}, ) rows = cur.fetchall() conn.close() if not rows: return f"{ticker} 1분봉 데이터 없음" lines = [ f" {r[0].strftime('%H:%M')} O{float(r[1]):>10,.0f} H{float(r[2]):>10,.0f}" f" L{float(r[3]):>10,.0f} C{float(r[4]):>10,.0f} V{float(r[5]):.0f}" for r in reversed(rows) ] return f"{ticker} 1분봉 최근 {len(rows)}개:\n" + "\n".join(lines) except Exception as e: return f"DB 오류: {e}" # ── 종목 컨텍스트 조회 ──────────────────────────────────────────────────────── def _tool_get_context(ticker: str) -> str: """ticker_context 테이블에서 종목의 가격 통계 + 뉴스 조회.""" try: conn = _get_conn() cur = conn.cursor() cur.execute( "SELECT context_type, content FROM ticker_context WHERE ticker = :t", {'t': ticker}, ) rows = cur.fetchall() conn.close() if not rows: return f"{ticker} 컨텍스트 데이터 없음" parts = [] for ctx_type, content in rows: parts.append(f"[{ctx_type}]\n{content}") return f"{ticker} 종목 컨텍스트:\n" + "\n\n".join(parts) except Exception as e: return f"DB 오류: {e}" # ── 거래 이력 조회 ──────────────────────────────────────────────────────────── def _tool_get_trade_history(ticker: str, limit: int = 10) -> str: """trade_results 테이블에서 해당 종목 최근 거래 이력 조회.""" try: conn = _get_conn() cur = conn.cursor() cur.execute( """SELECT traded_at, is_win, pnl_pct, buy_price, sell_price, sell_reason FROM trade_results WHERE ticker = :t ORDER BY traded_at DESC FETCH FIRST :n ROWS ONLY""", {'t': ticker, 'n': limit}, ) rows = cur.fetchall() conn.close() if not rows: return f"{ticker} 거래 이력 없음" lines = [] wins = sum(1 for r in rows if r[1]) for r in rows: ts = r[0].strftime('%m/%d %H:%M') if r[0] else '?' wl = '승' if r[1] else '패' pnl = float(r[2]) if r[2] else 0 reason = r[5] or '' lines.append(f" {ts} {wl} {pnl:+.2f}% {reason}") header = f"{ticker} 최근 {len(rows)}건 (승률 {wins}/{len(rows)}={wins/len(rows)*100:.0f}%):" return header + "\n" + "\n".join(lines) except Exception as e: return f"DB 오류: {e}" def _tool_get_btc_trend() -> str: """BTC 최근 동향 (1시간봉 6개 + 일봉).""" try: import pyupbit lines = [] # 일봉 2개 day_df = pyupbit.get_ohlcv('KRW-BTC', interval='day', count=2) if day_df is not None and len(day_df) >= 2: today = day_df.iloc[-1] prev = day_df.iloc[-2] chg = (today['close'] - prev['close']) / prev['close'] * 100 lines.append(f"[BTC 일봉] 현재 {today['close']:,.0f}원 (전일 대비 {chg:+.2f}%)") lines.append(f" 고가 {today['high']:,.0f} 저가 {today['low']:,.0f}") # 1시간봉 6개 h1_df = pyupbit.get_ohlcv('KRW-BTC', interval='minute60', count=6) if h1_df is not None and not h1_df.empty: first_c = float(h1_df['close'].iloc[0]) last_c = float(h1_df['close'].iloc[-1]) h_chg = (last_c - first_c) / first_c * 100 trend = '상승' if h_chg > 0.5 else '하락' if h_chg < -0.5 else '횡보' lines.append(f"[BTC 6시간 추세] {trend} ({h_chg:+.2f}%)") for ts, row in h1_df.iterrows(): lines.append( f" {ts.strftime('%H:%M')} 종{row['close']:>12,.0f} " f"거래량{row['volume']:,.2f}" ) return "\n".join(lines) if lines else "BTC 데이터 조회 실패" except Exception as e: return f"BTC 조회 오류: {e}" # ── Tool 정의 (OpenAI function calling 형식) ───────────────────────────────── _TOOLS = [ { 'type': 'function', 'function': { 'name': 'get_price_ticks', 'description': ( 'Oracle DB에서 특정 종목의 최근 N분간 가격 틱 데이터를 조회합니다. ' '단기 가격 흐름과 지지/저항 수준 파악에 사용하세요.' ), 'parameters': { 'type': 'object', 'properties': { 'ticker': {'type': 'string', 'description': '종목 코드 (예: KRW-XRP)'}, 'minutes': {'type': 'integer', 'description': '최근 N분 데이터 (기본 10)'}, }, 'required': ['ticker'], }, }, }, { 'type': 'function', 'function': { 'name': 'get_ohlcv', 'description': ( 'Oracle DB에서 특정 종목의 1분봉 OHLCV 데이터를 조회합니다. ' '지지선/저항선 파악과 추세 분석에 사용하세요.' ), 'parameters': { 'type': 'object', 'properties': { 'ticker': {'type': 'string', 'description': '종목 코드 (예: KRW-XRP)'}, 'limit': {'type': 'integer', 'description': '조회할 봉 수 (기본 30)'}, }, 'required': ['ticker'], }, }, }, { 'type': 'function', 'function': { 'name': 'get_ticker_context', 'description': ( '종목의 평판 정보를 조회합니다. 24h/7d 가격 변동률, 거래량 추이, ' '최근 뉴스 등 중장기 컨텍스트를 제공합니다. ' '매매 판단 시 시장 분위기와 종목 상황을 파악하는 데 사용하세요.' ), 'parameters': { 'type': 'object', 'properties': { 'ticker': {'type': 'string', 'description': '종목 코드 (예: KRW-XRP)'}, }, 'required': ['ticker'], }, }, }, { 'type': 'function', 'function': { 'name': 'get_trade_history', 'description': ( '특정 종목의 최근 거래 이력을 조회합니다. ' '승률, 손익, 매도 사유 등을 확인해 이 종목의 과거 성과를 파악하세요.' ), 'parameters': { 'type': 'object', 'properties': { 'ticker': {'type': 'string', 'description': '종목 코드 (예: KRW-XRP)'}, 'limit': {'type': 'integer', 'description': '조회 건수 (기본 10)'}, }, 'required': ['ticker'], }, }, }, { 'type': 'function', 'function': { 'name': 'get_btc_trend', 'description': ( 'BTC(비트코인)의 최근 가격 동향을 조회합니다. ' '알트코인 매수 전 BTC 추세를 반드시 확인하세요. ' 'BTC 하락 시 알트코인 동반 하락 위험이 높습니다.' ), 'parameters': { 'type': 'object', 'properties': {}, }, }, }, ] def _execute_tool(tool_name: str, tool_input: dict) -> str: """LLM이 요청한 tool을 실행하고 결과를 반환.""" if tool_name == 'get_price_ticks': return _tool_get_price_ticks( ticker = tool_input['ticker'], minutes = tool_input.get('minutes', 10), ) if tool_name == 'get_ohlcv': return _tool_get_ohlcv( ticker = tool_input['ticker'], limit = tool_input.get('limit', 30), ) if tool_name == 'get_ticker_context': return _tool_get_context(ticker=tool_input['ticker']) if tool_name == 'get_trade_history': return _tool_get_trade_history( ticker=tool_input['ticker'], limit=tool_input.get('limit', 10), ) if tool_name == 'get_btc_trend': return _tool_get_btc_trend() return f'알 수 없는 tool: {tool_name}' # ── 시장 컨텍스트 수집 ─────────────────────────────────────────────────────── def _get_market_context(ticker: str) -> str: """오늘 일봉 요약 + 최근 4시간 1시간봉을 LLM 프롬프트용 텍스트로 반환. pyupbit API 호출 실패 시 빈 문자열 반환 (graceful degradation). """ try: import pyupbit lines: list[str] = [] # 오늘 일봉 (오늘 + 전일 2개) day_df = pyupbit.get_ohlcv(ticker, interval='day', count=2) if day_df is not None and not day_df.empty: today = day_df.iloc[-1] prev = day_df.iloc[-2] if len(day_df) > 1 else None vol_note = '' if prev is not None and prev['volume'] > 0: vol_note = f' (전일 대비 {today["volume"] / prev["volume"]:.1f}x)' lines.append('[오늘 일봉]') lines.append(f' 고가 {today["high"]:>12,.0f}원 저가 {today["low"]:>12,.0f}원') lines.append(f' 시가 {today["open"]:>12,.0f}원 거래량 {today["volume"]:,.0f}{vol_note}') # 최근 4시간 1시간봉 h1_df = pyupbit.get_ohlcv(ticker, interval='minute60', count=4) if h1_df is not None and not h1_df.empty: lines.append(f'[최근 {len(h1_df)}시간봉]') for ts, row in h1_df.iterrows(): lines.append( f' {ts.strftime("%H:%M")} ' f'고{row["high"]:>10,.0f} 저{row["low"]:>10,.0f} ' f'종{row["close"]:>10,.0f} 거래량{row["volume"]:,.0f}' ) return '\n'.join(lines) except Exception as e: log.debug(f'[LLM] 시장 컨텍스트 조회 실패: {e}') return '' # ── 프롬프트 빌더 ───────────────────────────────────────────────────────────── def _describe_bars(bar_list: list[dict], current_price: float) -> str: """봉 데이터를 LLM이 읽기 쉬운 텍스트로 변환.""" recent = bar_list[-INPUT_BARS:] if len(bar_list) >= INPUT_BARS else bar_list if not recent: return '봉 데이터 없음' lines = [] for b in recent: ts_str = b['ts'].strftime('%H:%M:%S') if isinstance(b.get('ts'), datetime) else '' lines.append( f' {ts_str} 종가{b["close"]:>10,.0f} 고가{b["high"]:>10,.0f}' f' 저가{b["low"]:>10,.0f}' ) highs5 = [b['high'] for b in recent[-5:]] closes5 = [b['close'] for b in recent[-5:]] period_high = max(b['high'] for b in recent) from_peak = (current_price - period_high) / period_high * 100 trend_h = '하락▼' if highs5[-1] < highs5[0] else '상승▲' if highs5[-1] > highs5[0] else '횡보─' trend_c = '하락▼' if closes5[-1] < closes5[0] else '상승▲' if closes5[-1] > closes5[0] else '횡보─' summary = ( f'\n[패턴 요약]\n' f'- 고가 추세: {trend_h} ({highs5[0]:,.0f}→{highs5[-1]:,.0f})\n' f'- 종가 추세: {trend_c} ({closes5[0]:,.0f}→{closes5[-1]:,.0f})\n' f'- 구간 최고가: {period_high:,.0f}원 현재가 대비: {from_peak:+.2f}%' ) return '\n'.join(lines) + summary def _build_prompt( ticker: str, entry_price: float, current_price: float, elapsed_min: float, current_target: float, bar_desc: str, market_context: str = '', ) -> str: pnl_pct = (current_price - entry_price) / entry_price * 100 target_gap = (current_target - current_price) / current_price * 100 market_section = f'\n{market_context}\n' if market_context else '' return f"""당신은 암호화폐 단기 트레이더입니다. 아래 포지션과 가격 흐름을 분석해 **지정가 매도 목표가**를 판단하세요. 필요하면 제공된 DB tool을 호출해 추가 데이터를 조회하세요. 특히 get_ticker_context로 종목의 24h/7d 가격 변동, 거래량 추이, 최근 뉴스를 확인하세요. [현재 포지션] 종목 : {ticker} 진입가 : {entry_price:,.0f}원 현재가 : {current_price:,.0f}원 ({pnl_pct:+.2f}%) 보유시간: {elapsed_min:.0f}분 현재 지정가: {current_target:,.0f}원 (현재가 대비 {target_gap:+.2f}%, 미체결) {market_section} [최근 {INPUT_BARS}봉 (20초봉)] {bar_desc} [운용 정책 참고 — 최종 판단은 당신이 결정] - 단기 거래량 가속 신호 진입 후 cascade 청산 전략 (지정가 단계적 조정) - 수익 목표: 진입가 대비 +0.5% ~ +2% 구간 - 체결 가능성이 낮으면 현실적인 목표가로 조정 권장 - 상승 여력이 있으면 hold 권장 반드시 아래 JSON 형식으로만 응답하세요. 설명이나 다른 텍스트를 절대 포함하지 마세요. 매도 지정가를 설정할 경우: {{"action": "sell", "price": 숫자, "confidence": "high|medium|low", "reason": "판단 근거 한줄 요약", "market_status": "상승|하락|횡보|급등|급락", "watch_needed": false}} 현재 주문을 유지할 경우: {{"action": "hold", "reason": "유지 근거 한줄 요약", "market_status": "상승|하락|횡보|급등|급락", "watch_needed": true/false}} watch_needed: 관망이 필요한 상황이면 true (급변동 예상, 불확실성 높음 등)""" # ── 공통 LLM 호출 ──────────────────────────────────────────────────────────── def _call_llm(prompt: str, ticker: str) -> Optional[dict]: """OpenRouter API를 호출하고 JSON 응답을 반환. 실패 시 None.""" import requests as _req import re api_key = os.environ.get('OPENROUTER_API_KEY', '') if not api_key: log.debug('[LLM] OPENROUTER_API_KEY 없음') return None model = os.environ.get('LLM_MODEL', 'anthropic/claude-haiku-4.5') headers = { 'Authorization': f'Bearer {api_key}', 'Content-Type': 'application/json', } messages = [{'role': 'user', 'content': prompt}] try: for _ in range(5): body = { 'model': model, 'max_tokens': 512, 'tools': _TOOLS, 'messages': messages, 'response_format': {'type': 'json_object'}, } resp = _req.post( 'https://openrouter.ai/api/v1/chat/completions', headers=headers, json=body, timeout=30, ) resp.raise_for_status() result = resp.json() choice = result['choices'][0] message = choice['message'] tool_calls = message.get('tool_calls') if tool_calls: messages.append(message) for tc in tool_calls: fn_name = tc['function']['name'] fn_args = json.loads(tc['function']['arguments']) fn_result = _execute_tool(fn_name, fn_args) log.info(f'[LLM-Tool] {ticker} {fn_name}({fn_args}) 호출') messages.append({ 'role': 'tool', 'tool_call_id': tc['id'], 'content': fn_result, }) continue raw = (message.get('content') or '').strip() if not raw: log.warning(f'[LLM] {ticker} 빈 응답') return None # 코드블록 안 JSON 추출 if '```' in raw: m = re.search(r'```(?:json)?\s*(.*?)\s*```', raw, re.DOTALL) if m: raw = m.group(1) # 텍스트에 섞인 JSON 객체 추출 try: return json.loads(raw) except json.JSONDecodeError as e: m = re.search(r'\{[^{}]*"action"\s*:\s*"[^"]+?"[^{}]*\}', raw, re.DOTALL) if m: try: return json.loads(m.group(0)) except json.JSONDecodeError: pass log.warning(f'[LLM] {ticker} JSON 파싱 실패: {e} raw={raw[:200]}') return None else: log.warning(f'[LLM] {ticker} tool 루프 초과') return None except Exception as e: log.warning(f'[LLM] {ticker} 오류: {e}') return None # ── 매수 판단 ──────────────────────────────────────────────────────────────── def get_entry_price( ticker: str, signal: dict, bar_list: list[dict], current_price: float, fng: int = 50, num_positions: int = 0, max_positions: int = 3, ) -> Optional[float]: """LLM에게 매수 여부 + 지정가를 물어본다. Args: ticker: 종목 코드 signal: 시그널 정보 {vol_ratio, prices, ...} bar_list: 최근 20초봉 리스트 current_price: 현재 가격 fng: 현재 F&G 지수 num_positions: 현재 보유 포지션 수 max_positions: 최대 포지션 수 Returns: float → 지정가 매수 가격 None → 매수하지 않음 """ bar_desc = _describe_bars(bar_list, current_price) mkt_ctx = _get_market_context(ticker) vol_ratio = signal.get('vol_ratio', 0) market_section = f'\n{mkt_ctx}\n' if mkt_ctx else '' prompt = f"""당신은 암호화폐 단기 트레이더입니다. 아래 시그널을 분석해 **매수 여부와 지정가 매수 가격**을 판단하세요. 반드시 제공된 DB tool을 호출해 추가 데이터를 조회하세요: - get_btc_trend: BTC 추세 확인 (필수 — BTC 하락 시 알트 매수 위험) - get_ticker_context: 종목 24h/7d 변동, 뉴스 확인 - get_trade_history: 이 종목 과거 거래 성과 확인 - get_ohlcv: 1분봉으로 지지/저항선 확인 [시그널 감지] 종목 : {ticker} 현재가 : {current_price:,.0f}원 거래량비: {vol_ratio:.1f}x (61봉 평균 대비) F&G지수: {fng} ({'공포' if fng <= 40 else '중립' if fng <= 50 else '탐욕'}) 포지션 : {num_positions}/{max_positions} {market_section} [최근 {INPUT_BARS}봉 (20초봉)] {bar_desc} [판단 기준] - 거래량 급증이 진짜 매집 신호인지, 일시적 노이즈인지 구분 - BTC가 하락 중이면 알트코인 매수 자제 - 최근 이 종목에서 연패 중이면 신중하게 - 현재가보다 약간 낮은 지정가를 설정해 유리한 가격에 매수 - 상승 추세가 이미 많이 진행됐으면 진입 자제 반드시 아래 JSON 형식으로만 응답하세요. 설명이나 다른 텍스트를 절대 포함하지 마세요. 매수할 경우: {{"action": "buy", "price": 숫자, "confidence": "high|medium|low", "reason": "판단 근거 한줄 요약", "market_status": "상승|하락|횡보|급등|급락"}} 매수하지 않을 경우: {{"action": "skip", "reason": "매수하지 않는 이유 한줄 요약", "market_status": "상승|하락|횡보|급등|급락"}}""" data = _call_llm(prompt, ticker) if data is None: log.info(f'[LLM매수] {ticker} → LLM 오류/무응답') return None reason = data.get('reason', '') status = data.get('market_status', '') confidence = data.get('confidence', '?') if data.get('action') == 'skip': log.info(f'[LLM매수] {ticker} → skip | {confidence} | {status} | {reason}') return data # action=skip if data.get('action') == 'buy': data['price'] = float(data['price']) log.info(f'[LLM매수] {ticker} → buy {data["price"]:,.0f}원 | {confidence} | {status} | {reason}') return data # action=buy, price=float log.warning(f'[LLM매수] {ticker} 알 수 없는 action: {data}') return None # ── 매도 판단 ──────────────────────────────────────────────────────────────── def get_exit_price( ticker: str, pos: dict, bar_list: list[dict], current_price: float, ) -> Optional[float]: """LLM에게 매도 목표가를 물어본다. (DB tool 사용 가능) Args: ticker: 종목 코드 (예: 'KRW-XRP') pos: positions[ticker] 딕셔너리 bar_list: list(bars[ticker]) — 최신봉이 마지막 current_price: 현재 틱 가격 Returns: float → 새 지정가 (현재 주문가와 MIN_CHANGE_R 이상 차이) None → hold 또는 오류 """ entry_price = pos['entry_price'] elapsed_min = (datetime.now() - pos['entry_ts']).total_seconds() / 60 current_target = pos.get('sell_price') or entry_price * 1.005 bar_desc = _describe_bars(bar_list, current_price) mkt_ctx = _get_market_context(ticker) prompt = _build_prompt( ticker, entry_price, current_price, elapsed_min, current_target, bar_desc, market_context=mkt_ctx, ) data = _call_llm(prompt, ticker) if data is None: log.info(f'[LLM매도] {ticker} → LLM 오류/무응답 → cascade fallback') return None reason = data.get('reason', '') status = data.get('market_status', '') confidence = data.get('confidence', '?') watch = data.get('watch_needed', False) if data.get('action') == 'hold': log.info(f'[LLM매도] {ticker} → hold | {confidence} | {status} | watch={watch} | {reason}') return data # action=hold data['price'] = float(data['price']) pnl_from_entry = (data['price'] - entry_price) / entry_price * 100 log.info( f'[LLM매도] {ticker} 지정가 교체: {current_target:,.0f} → {data["price"]:,.0f}원 ' f'(진입 대비 {pnl_from_entry:+.2f}%) | {confidence} | {status} | watch={watch} | {reason}' ) return data # action=sell, price=float