diff --git a/core/llm_advisor.py b/core/llm_advisor.py new file mode 100644 index 0000000..d036345 --- /dev/null +++ b/core/llm_advisor.py @@ -0,0 +1,439 @@ +"""OpenRouter LLM 기반 매도 목표가 어드바이저. + +1분 주기로 보유 포지션의 OHLCV 흐름을 분석해 최적 지정가 매도 목표가를 반환. +LLM이 주(primary), 기존 cascade 규칙이 fallback. + +프롬프트에 포함되는 시장 데이터: + - 오늘 일봉 (고가/저가/거래량) + - 최근 4시간 1시간봉 (가격/볼륨 흐름) + - 최근 20봉 20초봉 (단기 패턴) + +LLM에게 제공하는 DB Tool (OpenAI function calling): + - get_price_ticks(ticker, minutes): Oracle price_tick 테이블 (최근 N분 가격 틱) + - get_ohlcv(ticker, limit): Oracle backtest_ohlcv 1분봉 (지지/저항 파악용) + - get_ticker_context(ticker): 종목 평판 정보 (가격 변동, 뉴스) + +반환값: + float → 새 지정가 목표가 + None → hold (현재 주문 유지) 또는 오류 +""" +from __future__ import annotations + +import json +import logging +import os +from datetime import datetime +from typing import Optional + +log = logging.getLogger(__name__) + +# 프롬프트에 포함할 봉 수 +INPUT_BARS = 20 + + +# ── Oracle DB Tools ────────────────────────────────────────────────────────── + +def _get_conn(): + """Oracle ADB 연결 (.env 기반).""" + import oracledb + kwargs = dict( + user = os.environ['ORACLE_USER'], + password = os.environ['ORACLE_PASSWORD'], + dsn = os.environ['ORACLE_DSN'], + ) + if w := os.environ.get('ORACLE_WALLET'): + kwargs['config_dir'] = w + return oracledb.connect(**kwargs) + + +def _tool_get_price_ticks(ticker: str, minutes: int = 10) -> str: + """Oracle price_tick 테이블에서 최근 N분 가격 틱 조회.""" + try: + conn = _get_conn() + cur = conn.cursor() + cur.execute( + """SELECT ts, price + FROM price_tick + WHERE ticker = :t + AND ts >= SYSTIMESTAMP - NUMTODSINTERVAL(:m, 'MINUTE') + ORDER BY ts DESC + FETCH FIRST 100 ROWS ONLY""", + {'t': ticker, 'm': minutes}, + ) + rows = cur.fetchall() + conn.close() + if not rows: + return f"{ticker} 최근 {minutes}분 틱 데이터 없음" + lines = [f" {r[0].strftime('%H:%M:%S')} {float(r[1]):>12,.2f}원" for r in rows] + return f"{ticker} 최근 {minutes}분 가격 틱 ({len(rows)}건):\n" + "\n".join(lines) + except Exception as e: + return f"DB 오류: {e}" + + +def _tool_get_ohlcv(ticker: str, limit: int = 30) -> str: + """Oracle backtest_ohlcv 1분봉 최근 N개 조회 (지지/저항 파악용).""" + try: + conn = _get_conn() + cur = conn.cursor() + cur.execute( + """SELECT ts, open_p, high_p, low_p, close_p, volume_p + FROM backtest_ohlcv + WHERE ticker = :t + AND interval_cd = 'minute1' + ORDER BY ts DESC + FETCH FIRST :n ROWS ONLY""", + {'t': ticker, 'n': limit}, + ) + rows = cur.fetchall() + conn.close() + if not rows: + return f"{ticker} 1분봉 데이터 없음" + lines = [ + f" {r[0].strftime('%H:%M')} O{float(r[1]):>10,.0f} H{float(r[2]):>10,.0f}" + f" L{float(r[3]):>10,.0f} C{float(r[4]):>10,.0f} V{float(r[5]):.0f}" + for r in reversed(rows) + ] + return f"{ticker} 1분봉 최근 {len(rows)}개:\n" + "\n".join(lines) + except Exception as e: + return f"DB 오류: {e}" + + +# ── 종목 컨텍스트 조회 ──────────────────────────────────────────────────────── + +def _tool_get_context(ticker: str) -> str: + """ticker_context 테이블에서 종목의 가격 통계 + 뉴스 조회.""" + try: + conn = _get_conn() + cur = conn.cursor() + cur.execute( + "SELECT context_type, content FROM ticker_context WHERE ticker = :t", + {'t': ticker}, + ) + rows = cur.fetchall() + conn.close() + if not rows: + return f"{ticker} 컨텍스트 데이터 없음" + parts = [] + for ctx_type, content in rows: + parts.append(f"[{ctx_type}]\n{content}") + return f"{ticker} 종목 컨텍스트:\n" + "\n\n".join(parts) + except Exception as e: + return f"DB 오류: {e}" + + +# ── Tool 정의 (OpenAI function calling 형식) ───────────────────────────────── + +_TOOLS = [ + { + 'type': 'function', + 'function': { + 'name': 'get_price_ticks', + 'description': ( + 'Oracle DB에서 특정 종목의 최근 N분간 가격 틱 데이터를 조회합니다. ' + '단기 가격 흐름과 지지/저항 수준 파악에 사용하세요.' + ), + 'parameters': { + 'type': 'object', + 'properties': { + 'ticker': {'type': 'string', 'description': '종목 코드 (예: KRW-XRP)'}, + 'minutes': {'type': 'integer', 'description': '최근 N분 데이터 (기본 10)'}, + }, + 'required': ['ticker'], + }, + }, + }, + { + 'type': 'function', + 'function': { + 'name': 'get_ohlcv', + 'description': ( + 'Oracle DB에서 특정 종목의 1분봉 OHLCV 데이터를 조회합니다. ' + '지지선/저항선 파악과 추세 분석에 사용하세요.' + ), + 'parameters': { + 'type': 'object', + 'properties': { + 'ticker': {'type': 'string', 'description': '종목 코드 (예: KRW-XRP)'}, + 'limit': {'type': 'integer', 'description': '조회할 봉 수 (기본 30)'}, + }, + 'required': ['ticker'], + }, + }, + }, + { + 'type': 'function', + 'function': { + 'name': 'get_ticker_context', + 'description': ( + '종목의 평판 정보를 조회합니다. 24h/7d 가격 변동률, 거래량 추이, ' + '최근 뉴스 등 중장기 컨텍스트를 제공합니다. ' + '매도 판단 시 시장 분위기와 종목 상황을 파악하는 데 사용하세요.' + ), + 'parameters': { + 'type': 'object', + 'properties': { + 'ticker': {'type': 'string', 'description': '종목 코드 (예: KRW-XRP)'}, + }, + 'required': ['ticker'], + }, + }, + }, +] + + +def _execute_tool(tool_name: str, tool_input: dict) -> str: + """LLM이 요청한 tool을 실행하고 결과를 반환.""" + if tool_name == 'get_price_ticks': + return _tool_get_price_ticks( + ticker = tool_input['ticker'], + minutes = tool_input.get('minutes', 10), + ) + if tool_name == 'get_ohlcv': + return _tool_get_ohlcv( + ticker = tool_input['ticker'], + limit = tool_input.get('limit', 30), + ) + if tool_name == 'get_ticker_context': + return _tool_get_context(ticker=tool_input['ticker']) + return f'알 수 없는 tool: {tool_name}' + + +# ── 시장 컨텍스트 수집 ─────────────────────────────────────────────────────── + +def _get_market_context(ticker: str) -> str: + """오늘 일봉 요약 + 최근 4시간 1시간봉을 LLM 프롬프트용 텍스트로 반환. + + pyupbit API 호출 실패 시 빈 문자열 반환 (graceful degradation). + """ + try: + import pyupbit + + lines: list[str] = [] + + # 오늘 일봉 (오늘 + 전일 2개) + day_df = pyupbit.get_ohlcv(ticker, interval='day', count=2) + if day_df is not None and not day_df.empty: + today = day_df.iloc[-1] + prev = day_df.iloc[-2] if len(day_df) > 1 else None + vol_note = '' + if prev is not None and prev['volume'] > 0: + vol_note = f' (전일 대비 {today["volume"] / prev["volume"]:.1f}x)' + lines.append('[오늘 일봉]') + lines.append(f' 고가 {today["high"]:>12,.0f}원 저가 {today["low"]:>12,.0f}원') + lines.append(f' 시가 {today["open"]:>12,.0f}원 거래량 {today["volume"]:,.0f}{vol_note}') + + # 최근 4시간 1시간봉 + h1_df = pyupbit.get_ohlcv(ticker, interval='minute60', count=4) + if h1_df is not None and not h1_df.empty: + lines.append(f'[최근 {len(h1_df)}시간봉]') + for ts, row in h1_df.iterrows(): + lines.append( + f' {ts.strftime("%H:%M")} ' + f'고{row["high"]:>10,.0f} 저{row["low"]:>10,.0f} ' + f'종{row["close"]:>10,.0f} 거래량{row["volume"]:,.0f}' + ) + + return '\n'.join(lines) + + except Exception as e: + log.debug(f'[LLM] 시장 컨텍스트 조회 실패: {e}') + return '' + + +# ── 프롬프트 빌더 ───────────────────────────────────────────────────────────── + +def _describe_bars(bar_list: list[dict], current_price: float) -> str: + """봉 데이터를 LLM이 읽기 쉬운 텍스트로 변환.""" + recent = bar_list[-INPUT_BARS:] if len(bar_list) >= INPUT_BARS else bar_list + if not recent: + return '봉 데이터 없음' + + lines = [] + for b in recent: + ts_str = b['ts'].strftime('%H:%M:%S') if isinstance(b.get('ts'), datetime) else '' + lines.append( + f' {ts_str} 종가{b["close"]:>10,.0f} 고가{b["high"]:>10,.0f}' + f' 저가{b["low"]:>10,.0f}' + ) + + highs5 = [b['high'] for b in recent[-5:]] + closes5 = [b['close'] for b in recent[-5:]] + period_high = max(b['high'] for b in recent) + from_peak = (current_price - period_high) / period_high * 100 + + trend_h = '하락▼' if highs5[-1] < highs5[0] else '상승▲' if highs5[-1] > highs5[0] else '횡보─' + trend_c = '하락▼' if closes5[-1] < closes5[0] else '상승▲' if closes5[-1] > closes5[0] else '횡보─' + + summary = ( + f'\n[패턴 요약]\n' + f'- 고가 추세: {trend_h} ({highs5[0]:,.0f}→{highs5[-1]:,.0f})\n' + f'- 종가 추세: {trend_c} ({closes5[0]:,.0f}→{closes5[-1]:,.0f})\n' + f'- 구간 최고가: {period_high:,.0f}원 현재가 대비: {from_peak:+.2f}%' + ) + return '\n'.join(lines) + summary + + +def _build_prompt( + ticker: str, + entry_price: float, + current_price: float, + elapsed_min: float, + current_target: float, + bar_desc: str, + market_context: str = '', +) -> str: + pnl_pct = (current_price - entry_price) / entry_price * 100 + target_gap = (current_target - current_price) / current_price * 100 + + market_section = f'\n{market_context}\n' if market_context else '' + + return f"""당신은 암호화폐 단기 트레이더입니다. +아래 포지션과 가격 흐름을 분석해 **지정가 매도 목표가**를 판단하세요. +필요하면 제공된 DB tool을 호출해 추가 데이터를 조회하세요. +특히 get_ticker_context로 종목의 24h/7d 가격 변동, 거래량 추이, 최근 뉴스를 확인하세요. + +[현재 포지션] +종목 : {ticker} +진입가 : {entry_price:,.0f}원 +현재가 : {current_price:,.0f}원 ({pnl_pct:+.2f}%) +보유시간: {elapsed_min:.0f}분 +현재 지정가: {current_target:,.0f}원 (현재가 대비 {target_gap:+.2f}%, 미체결) +{market_section} +[최근 {INPUT_BARS}봉 (20초봉)] +{bar_desc} + +[운용 정책 참고 — 최종 판단은 당신이 결정] +- 단기 거래량 가속 신호 진입 후 cascade 청산 전략 (지정가 단계적 조정) +- 수익 목표: 진입가 대비 +0.5% ~ +2% 구간 +- 체결 가능성이 낮으면 현실적인 목표가로 조정 권장 +- 상승 여력이 있으면 hold 권장 + +반드시 아래 JSON 형식으로만 응답하세요. 설명이나 다른 텍스트를 절대 포함하지 마세요. + +매도 지정가를 설정할 경우: +{{"action": "sell", "price": 숫자, "confidence": "high|medium|low", "reason": "판단 근거 한줄 요약", "market_status": "상승|하락|횡보|급등|급락", "watch_needed": false}} + +현재 주문을 유지할 경우: +{{"action": "hold", "reason": "유지 근거 한줄 요약", "market_status": "상승|하락|횡보|급등|급락", "watch_needed": true/false}} + +watch_needed: 관망이 필요한 상황이면 true (급변동 예상, 불확실성 높음 등)""" + + +# ── 메인 함수 ───────────────────────────────────────────────────────────────── + +def get_exit_price( + ticker: str, + pos: dict, + bar_list: list[dict], + current_price: float, +) -> Optional[float]: + """LLM에게 매도 목표가를 물어본다. (DB tool 사용 가능) + + Args: + ticker: 종목 코드 (예: 'KRW-XRP') + pos: positions[ticker] 딕셔너리 + bar_list: list(bars[ticker]) — 최신봉이 마지막 + current_price: 현재 틱 가격 + + Returns: + float → 새 지정가 (현재 주문가와 MIN_CHANGE_R 이상 차이) + None → hold 또는 오류 + """ + import requests as _req + + api_key = os.environ.get('OPENROUTER_API_KEY', '') + if not api_key: + log.debug('[LLM] OPENROUTER_API_KEY 없음 → cascade fallback') + return None + + model = os.environ.get('LLM_MODEL', 'anthropic/claude-haiku-4-5-20251001') + + try: + entry_price = pos['entry_price'] + elapsed_min = (datetime.now() - pos['entry_ts']).total_seconds() / 60 + current_target = pos.get('sell_price') or entry_price * 1.005 + + bar_desc = _describe_bars(bar_list, current_price) + mkt_ctx = _get_market_context(ticker) + prompt = _build_prompt( + ticker, entry_price, current_price, + elapsed_min, current_target, bar_desc, + market_context=mkt_ctx, + ) + + headers = { + 'Authorization': f'Bearer {api_key}', + 'Content-Type': 'application/json', + } + messages = [{'role': 'user', 'content': prompt}] + + # Tool use 루프: LLM이 tool을 요청하면 실행 후 결과 전달 + for _ in range(5): # 최대 5회 tool 호출 + body = { + 'model': model, + 'max_tokens': 512, + 'tools': _TOOLS, + 'messages': messages, + 'response_format': {'type': 'json_object'}, + } + resp = _req.post( + 'https://openrouter.ai/api/v1/chat/completions', + headers=headers, json=body, timeout=30, + ) + resp.raise_for_status() + result = resp.json() + + choice = result['choices'][0] + message = choice['message'] + + # tool_calls가 있으면 실행 + tool_calls = message.get('tool_calls') + if tool_calls: + messages.append(message) # assistant 메시지 추가 + for tc in tool_calls: + fn_name = tc['function']['name'] + fn_args = json.loads(tc['function']['arguments']) + fn_result = _execute_tool(fn_name, fn_args) + log.info(f'[LLM-Tool] {ticker} {fn_name}({fn_args}) 호출') + messages.append({ + 'role': 'tool', + 'tool_call_id': tc['id'], + 'content': fn_result, + }) + continue # 다시 LLM에게 결과 전달 + + # 최종 텍스트 응답 + raw = (message.get('content') or '').strip() + if not raw: + log.warning(f'[LLM] {ticker} 빈 응답 → cascade fallback') + return None + + # JSON 추출 (```json 블록이나 순수 JSON 모두 처리) + if '```' in raw: + import re + m = re.search(r'```(?:json)?\s*(.*?)\s*```', raw, re.DOTALL) + raw = m.group(1) if m else raw + data = json.loads(raw) + break + else: + log.warning(f'[LLM] {ticker} tool 루프 초과 → cascade fallback') + return None + + reason = data.get('reason', '') + status = data.get('market_status', '') + + if data.get('action') == 'hold': + log.info(f'[LLM] {ticker} → hold | {status} | {reason}') + return None + + suggested = float(data['price']) + confidence = data.get('confidence', '?') + log.info(f'[LLM] {ticker} 지정가 교체: {current_target:,.0f} → {suggested:,.0f}원 | {confidence} | {status} | {reason}') + return suggested + + except json.JSONDecodeError as e: + log.warning(f'[LLM] {ticker} JSON 파싱 실패: {e} raw={raw[:100]} → cascade fallback') + return None + except Exception as e: + log.warning(f'[LLM] {ticker} 오류: {e} → cascade fallback') + return None diff --git a/daemons/context_collector.py b/daemons/context_collector.py new file mode 100644 index 0000000..21cdfd8 --- /dev/null +++ b/daemons/context_collector.py @@ -0,0 +1,226 @@ +"""종목 컨텍스트 수집 데몬. + +1시간마다 각 종목의 평판 정보를 수집해 Oracle ticker_context 테이블에 저장. +LLM 어드바이저가 매도 판단 시 참조. + +수집 항목: + - price_stats: 24h/7d 가격 변동률, 거래량 추이 + - news: SearXNG 웹 검색 뉴스 요약 + +실행: + .venv/bin/python3 daemons/context_collector.py +로그: + /tmp/context_collector.log +""" +import sys, os, time, logging, json, requests +from datetime import datetime + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from dotenv import load_dotenv +load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) + +import pyupbit +import oracledb + +TICKERS = [ + 'KRW-XRP', 'KRW-BTC', 'KRW-ETH', 'KRW-SOL', 'KRW-DOGE', + 'KRW-ADA', 'KRW-SUI', 'KRW-NEAR', 'KRW-KAVA', 'KRW-SXP', + 'KRW-AKT', 'KRW-SONIC', 'KRW-IP', 'KRW-ORBS', 'KRW-VIRTUAL', + 'KRW-BARD', 'KRW-XPL', 'KRW-KITE', 'KRW-ENSO', 'KRW-0G', + 'KRW-MANTRA', 'KRW-EDGE', 'KRW-CFG', 'KRW-ARDR', 'KRW-SIGN', + 'KRW-AZTEC', 'KRW-ATH', 'KRW-HOLO', 'KRW-BREV', 'KRW-SHIB', +] + +COLLECT_INTERVAL = 3600 # 1시간 +SEARXNG_URL = 'https://searxng.cloud-handson.com/search' + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + handlers=[ + logging.FileHandler('/tmp/context_collector.log'), + logging.StreamHandler(sys.stdout), + ] +) +log = logging.getLogger(__name__) + +# 코인명 매핑 (검색 키워드용) +COIN_NAMES = { + 'KRW-BTC': 'Bitcoin', 'KRW-ETH': 'Ethereum', 'KRW-XRP': 'Ripple XRP', + 'KRW-SOL': 'Solana', 'KRW-DOGE': 'Dogecoin', 'KRW-ADA': 'Cardano', + 'KRW-SUI': 'SUI', 'KRW-NEAR': 'NEAR Protocol', 'KRW-KAVA': 'KAVA', + 'KRW-SXP': 'Solar SXP', 'KRW-AKT': 'Akash', 'KRW-SONIC': 'Sonic', + 'KRW-IP': 'Story IP', 'KRW-ORBS': 'ORBS', 'KRW-VIRTUAL': 'Virtuals Protocol', + 'KRW-BARD': 'BARD', 'KRW-XPL': 'XPL', 'KRW-KITE': 'KITE', + 'KRW-ENSO': 'ENSO', 'KRW-0G': '0G', 'KRW-MANTRA': 'MANTRA OM', + 'KRW-EDGE': 'EDGE', 'KRW-CFG': 'Centrifuge', 'KRW-ARDR': 'Ardor', + 'KRW-SIGN': 'SIGN', 'KRW-AZTEC': 'AZTEC', 'KRW-ATH': 'Aethir', + 'KRW-HOLO': 'Holochain', 'KRW-BREV': 'BREV', 'KRW-SHIB': 'Shiba Inu', +} + + +def get_conn(): + kwargs = dict(user=os.environ["ORACLE_USER"], password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"]) + if w := os.environ.get("ORACLE_WALLET"): + kwargs["config_dir"] = w + return oracledb.connect(**kwargs) + + +def upsert_context(conn, ticker: str, context_type: str, content: str): + cur = conn.cursor() + cur.execute( + """MERGE INTO ticker_context tc + USING (SELECT :t AS ticker, :ct AS context_type FROM dual) s + ON (tc.ticker = s.ticker AND tc.context_type = s.context_type) + WHEN MATCHED THEN + UPDATE SET content = :c, updated_at = SYSTIMESTAMP + WHEN NOT MATCHED THEN + INSERT (ticker, context_type, content) + VALUES (:t, :ct, :c)""", + {'t': ticker, 'ct': context_type, 'c': content}, + ) + + +def collect_price_stats(conn): + """각 종목의 24h/7d 가격 변동률 + 거래량 추이를 수집.""" + log.info("[price_stats] 수집 시작") + count = 0 + for ticker in TICKERS: + try: + # 일봉 7개 (7일치) + df_day = pyupbit.get_ohlcv(ticker, interval='day', count=8) + if df_day is None or len(df_day) < 2: + continue + + today = df_day.iloc[-1] + yesterday = df_day.iloc[-2] + + # 24h 변동률 + chg_24h = (today['close'] - yesterday['close']) / yesterday['close'] * 100 + + # 7d 변동률 + if len(df_day) >= 8: + week_ago = df_day.iloc[-8] + chg_7d = (today['close'] - week_ago['close']) / week_ago['close'] * 100 + else: + chg_7d = None + + # 거래량 추이 (최근 3일 vs 이전 3일) + if len(df_day) >= 7: + recent_vol = df_day['volume'].iloc[-3:].mean() + prev_vol = df_day['volume'].iloc[-6:-3].mean() + vol_trend = recent_vol / prev_vol if prev_vol > 0 else 1.0 + else: + vol_trend = None + + # 1시간봉 최근 24개 (24시간) + df_h1 = pyupbit.get_ohlcv(ticker, interval='minute60', count=24) + h1_high = h1_low = None + if df_h1 is not None and not df_h1.empty: + h1_high = float(df_h1['high'].max()) + h1_low = float(df_h1['low'].min()) + + stats = { + 'price': float(today['close']), + 'chg_24h_pct': round(chg_24h, 2), + 'chg_7d_pct': round(chg_7d, 2) if chg_7d is not None else None, + 'vol_today': float(today['volume']), + 'vol_trend_3d': round(vol_trend, 2) if vol_trend is not None else None, + 'h24_high': h1_high, + 'h24_low': h1_low, + 'updated': datetime.now().strftime('%Y-%m-%d %H:%M'), + } + upsert_context(conn, ticker, 'price_stats', json.dumps(stats, ensure_ascii=False)) + count += 1 + time.sleep(0.2) + except Exception as e: + log.warning(f"[price_stats] {ticker} 오류: {e}") + + conn.commit() + log.info(f"[price_stats] 완료 {count}/{len(TICKERS)} 종목") + + +def search_news(coin_name: str, max_results: int = 5) -> list[dict]: + """SearXNG로 코인 뉴스 검색.""" + try: + resp = requests.get( + SEARXNG_URL, + params={ + 'q': f'{coin_name} crypto news', + 'format': 'json', + 'categories': 'news', + 'language': 'ko-KR', + 'time_range': 'week', + }, + timeout=15, + ) + resp.raise_for_status() + data = resp.json() + results = [] + for r in data.get('results', [])[:max_results]: + results.append({ + 'title': r.get('title', ''), + 'url': r.get('url', ''), + 'content': r.get('content', '')[:200], + 'date': r.get('publishedDate', ''), + }) + return results + except Exception as e: + log.warning(f"[news] {coin_name} 검색 오류: {e}") + return [] + + +def collect_news(conn): + """각 종목의 최근 뉴스를 수집.""" + log.info("[news] 수집 시작") + count = 0 + for ticker in TICKERS: + coin_name = COIN_NAMES.get(ticker, ticker.split('-')[1]) + try: + articles = search_news(coin_name) + if not articles: + continue + + news_data = { + 'coin': coin_name, + 'articles': articles, + 'updated': datetime.now().strftime('%Y-%m-%d %H:%M'), + } + upsert_context(conn, ticker, 'news', json.dumps(news_data, ensure_ascii=False)) + count += 1 + time.sleep(1) # 검색 API rate limit 배려 + except Exception as e: + log.warning(f"[news] {ticker} 오류: {e}") + + conn.commit() + log.info(f"[news] 완료 {count}/{len(TICKERS)} 종목") + + +def main(): + log.info("=== context_collector 시작 (1시간 간격) ===") + log.info(f"대상: {len(TICKERS)}개 종목") + conn = get_conn() + + while True: + t0 = time.time() + try: + collect_price_stats(conn) + collect_news(conn) + except oracledb.DatabaseError as e: + log.error(f"DB 오류: {e} — 재연결") + try: + conn.close() + except: + pass + conn = get_conn() + except Exception as e: + log.error(f"오류: {e}") + + elapsed = time.time() - t0 + log.info(f"[완료] {elapsed:.0f}초 소요, {COLLECT_INTERVAL}초 후 다음 수집") + time.sleep(max(60, COLLECT_INTERVAL - elapsed)) + + +if __name__ == '__main__': + main() diff --git a/daemons/fetch_1min_history.py b/daemons/fetch_1min_history.py new file mode 100644 index 0000000..14a3b7c --- /dev/null +++ b/daemons/fetch_1min_history.py @@ -0,0 +1,169 @@ +"""1분봉 장기 히스토리 fetch 데몬. + +주요 5종목(BTC/ETH/XRP/SOL/DOGE)의 1분봉을 2년치까지 소급 수집. +백그라운드에서 조용히 실행 — API 딜레이 충분히 줘서 다른 작업 방해 안 함. +재시작 시 DB에 이미 있는 범위는 건너뜀. + +실행: + .venv/bin/python3 daemons/fetch_1min_history.py [--tickers BTC ETH] [--days 730] +로그: + /tmp/fetch_1min_history.log +""" +import sys, os, time, argparse, logging +from datetime import datetime, timedelta +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from dotenv import load_dotenv +load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) + +import pyupbit +import oracledb + +# ── 설정 ────────────────────────────────────────────────────────────────────── +DEFAULT_TICKERS = ['KRW-BTC', 'KRW-ETH', 'KRW-XRP', 'KRW-SOL', 'KRW-DOGE'] +BATCH = 200 # API 1회 요청 봉수 (Upbit 최대 200) +DELAY = 0.4 # API 호출 간격 (초) — 넉넉히 줘서 rate limit 회피 +RETRY_WAIT = 5.0 # 오류 시 대기 (초) + +# ── 로깅 설정 ───────────────────────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + handlers=[ + logging.FileHandler('/tmp/fetch_1min_history.log'), + logging.StreamHandler(sys.stdout), + ] +) +log = logging.getLogger(__name__) + + +def _get_conn(): + kwargs = dict(user=os.environ["ORACLE_USER"], + password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"]) + wallet = os.environ.get("ORACLE_WALLET") + if wallet: + kwargs["config_dir"] = wallet + return oracledb.connect(**kwargs) + + +def get_oldest_ts(conn, ticker: str): + """DB에 있는 해당 ticker 1분봉의 가장 오래된 ts 반환. 없으면 None.""" + cur = conn.cursor() + cur.execute( + "SELECT MIN(ts) FROM backtest_ohlcv " + "WHERE ticker=:t AND interval_cd='minute1'", + {"t": ticker} + ) + row = cur.fetchone() + return row[0] if row and row[0] else None + + +def insert_batch(conn, ticker: str, rows: list) -> int: + """rows: [(ts, open, high, low, close, volume), ...] — bulk insert, 중복 무시.""" + if not rows: + return 0 + cur = conn.cursor() + cur.executemany( + "INSERT INTO backtest_ohlcv " + "(ticker,interval_cd,ts,open_p,high_p,low_p,close_p,volume_p) " + "VALUES (:1,'minute1',:2,:3,:4,:5,:6,:7)", + [(ticker, ts, o, h, l, c, v) for ts, o, h, l, c, v in rows], + batcherrors=True, + ) + errors = cur.getbatcherrors() + conn.commit() + return len(rows) - len(errors) + + +def fetch_ticker(conn, ticker: str, cutoff: datetime) -> int: + """ticker의 cutoff까지 1분봉 소급 fetch. + DB에 이미 있는 범위는 batcherrors로 자동 스킵. + """ + oldest_in_db = get_oldest_ts(conn, ticker) + if oldest_in_db and oldest_in_db <= cutoff: + log.info(f"{ticker}: DB에 이미 {oldest_in_db.date()} 까지 있음 → 스킵") + return 0 + + # datetime.now()에서 시작해 cutoff까지 역방향 fetch + # 중복은 DB unique constraint + batcherrors가 처리 + to_dt = datetime.now() + total = 0 + batch_n = 0 + + if oldest_in_db: + log.info(f"{ticker}: DB 최솟값={oldest_in_db.date()}, {cutoff.date()} 까지 소급 시작") + else: + log.info(f"{ticker}: DB에 데이터 없음, {cutoff.date()} 까지 전체 fetch 시작") + + while to_dt > cutoff: + # pyupbit의 to 파라미터는 UTC로 해석됨 — KST에서 9시간 빼서 전달 + to_utc = to_dt - timedelta(hours=9) + to_str = to_utc.strftime('%Y-%m-%d %H:%M:%S') + try: + df = pyupbit.get_ohlcv(ticker, interval='minute1', count=BATCH, to=to_str) + time.sleep(DELAY) + except Exception as e: + log.warning(f"{ticker} API 오류: {e} → {RETRY_WAIT}s 후 재시도") + time.sleep(RETRY_WAIT) + continue + + if df is None or len(df) == 0: + log.info(f"{ticker}: API 데이터 소진 ({to_str})") + break + + rows = [ + (ts.to_pydatetime(), float(r['open']), float(r['high']), + float(r['low']), float(r['close']), float(r['volume'])) + for ts, r in df.iterrows() + ] + n = insert_batch(conn, ticker, rows) + total += n + batch_n += 1 + oldest = df.index[0].to_pydatetime() + + if batch_n % 50 == 0: + log.info(f" {ticker} 배치{batch_n:04d}: {oldest.date()} | 신규 누적 {total:,}행") + + to_dt = oldest - timedelta(minutes=1) + + if oldest <= cutoff: + break + + log.info(f"{ticker}: 완료 — 신규 {total:,}행 (배치 {batch_n}회)") + return total + + +def main(): + parser = argparse.ArgumentParser() + parser.add_argument('--tickers', nargs='+', default=DEFAULT_TICKERS, + help='수집 티커 (예: KRW-BTC KRW-ETH)') + parser.add_argument('--days', type=int, default=730, + help='소급 일수 (기본 730일 = 2년)') + args = parser.parse_args() + + cutoff = datetime.now() - timedelta(days=args.days) + log.info(f"=== 1분봉 히스토리 데몬 시작 ===") + log.info(f"대상: {args.tickers}") + log.info(f"목표: {cutoff.date()} ({args.days}일) 까지 소급") + + conn = _get_conn() + grand_total = 0 + t_start = time.time() + + for ticker in args.tickers: + t0 = time.time() + try: + n = fetch_ticker(conn, ticker, cutoff) + grand_total += n + elapsed = time.time() - t0 + log.info(f"{ticker}: {n:,}행 저장 ({elapsed/60:.1f}분)") + except Exception as e: + log.error(f"{ticker}: 오류 — {e}") + + conn.close() + total_min = (time.time() - t_start) / 60 + log.info(f"=== 완료: 총 {grand_total:,}행 / {total_min:.0f}분 ===") + + +if __name__ == '__main__': + main() diff --git a/daemons/live_trader.py b/daemons/live_trader.py new file mode 100644 index 0000000..4ce1eef --- /dev/null +++ b/daemons/live_trader.py @@ -0,0 +1,363 @@ +"""실시간 1분봉 볼륨 가속 트레이더. + +4봉 연속 가격+볼륨 가속 시그널(VOL≥8x) 감지 후 실제 매수/매도 + Telegram 알림. + +실행: + .venv/bin/python3 daemons/live_trader.py +로그: + /tmp/live_trader.log +""" +import sys, os, time, logging, requests +from datetime import datetime +from typing import Optional + +import pandas as pd + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from dotenv import load_dotenv +load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) + +import pyupbit + +# ── 전략 파라미터 ────────────────────────────────────────────────────────────── +TICKERS = [ + 'KRW-XRP', 'KRW-BTC', 'KRW-ETH', 'KRW-SOL', 'KRW-DOGE', + 'KRW-ADA', 'KRW-SUI', 'KRW-NEAR', 'KRW-KAVA', 'KRW-SXP', + 'KRW-AKT', 'KRW-SONIC', 'KRW-IP', 'KRW-ORBS', 'KRW-VIRTUAL', + 'KRW-BARD', 'KRW-XPL', 'KRW-KITE', 'KRW-ENSO', 'KRW-0G', +] +VOL_LOOKBACK = 61 +ATR_LOOKBACK = 28 +FETCH_BARS = 100 +VOL_MIN = 8.0 +ATR_MULT = 1.0 +ATR_MIN_R = 0.030 # 3.0% (ATR 계산용, ⑤ trail에서는 미사용) +ATR_MAX_R = 0.050 # 5.0% + +# ── Cascade 청산 파라미터 ────────────────────────────────────────────────────── +# (시작분, 종료분, limit 수익률) +CASCADE_STAGES = [ + (0, 2, 0.020), # ① 0~ 2분: 현재가 >= 진입가×1.020 → 청산 + (2, 5, 0.010), # ② 2~ 5분: 현재가 >= 진입가×1.010 → 청산 + (5, 35, 0.005), # ③ 5~35분: 현재가 >= 진입가×1.005 → 청산 + (35, 155, 0.001), # ④ 35~155분: 현재가 >= 진입가×1.001 → 청산 (본전) +] +TRAIL_STOP_R = 0.004 # ⑤ 155분~: Trail Stop 0.4% + +MAX_POS = int(os.environ.get('MAX_POSITIONS', 3)) +PER_POS = int(os.environ.get('MAX_BUDGET', 15_000_000)) // MAX_POS +FEE = 0.0005 + +POLL_SEC = 65 +API_DELAY = 0.12 +TIMEOUT_BARS = 240 # 4시간: ⑤ Trail 구간에서 본전 이하 시 청산 + +SIM_MODE = os.environ.get('SIMULATION_MODE', 'true').lower() == 'true' + +# ── Upbit 클라이언트 ─────────────────────────────────────────────────────────── +upbit = pyupbit.Upbit(os.environ['ACCESS_KEY'], os.environ['SECRET_KEY']) + +# ── Telegram ────────────────────────────────────────────────────────────────── +TG_TOKEN = os.environ.get('TELEGRAM_TRADE_TOKEN', '') +TG_CHAT_ID = os.environ.get('TELEGRAM_CHAT_ID', '') + +def tg(msg: str) -> None: + if not TG_TOKEN or not TG_CHAT_ID: + return + try: + requests.post( + f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage', + json={'chat_id': TG_CHAT_ID, 'text': msg, 'parse_mode': 'HTML'}, + timeout=5, + ) + except Exception as e: + log.warning(f'Telegram 전송 실패: {e}') + +# ── 로깅 ────────────────────────────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + handlers=[ + logging.FileHandler('/tmp/live_trader.log'), + logging.StreamHandler(sys.stdout), + ] +) +log = logging.getLogger(__name__) + + +# ── 지표 계산 ───────────────────────────────────────────────────────────────── +def compute_indicators(df: pd.DataFrame) -> pd.DataFrame: + vol_ma = df['volume'].rolling(VOL_LOOKBACK, min_periods=30).mean().shift(2) + df = df.copy() + df['vr'] = df['volume'] / vol_ma + + prev_close = df['close'].shift(1) + tr = pd.concat([ + df['high'] - df['low'], + (df['high'] - prev_close).abs(), + (df['low'] - prev_close).abs(), + ], axis=1).max(axis=1) + df['atr_raw'] = tr.rolling(ATR_LOOKBACK, min_periods=10).mean() / prev_close + return df + + +def check_signal(df: pd.DataFrame) -> Optional[dict]: + """마지막 3봉(완성봉) 가격+볼륨 가속 조건 체크.""" + if len(df) < VOL_LOOKBACK + 10: + return None + + b = df.iloc[-4:-1] # 완성된 마지막 3봉 + if len(b) < 3: + return None + + c = b['close'].values + o = b['open'].values + vr = b['vr'].values + + if not all(c[i] > o[i] for i in range(3)): return None # 양봉 + if not (c[2] > c[1] > c[0]): return None # 가격 가속 + if not (vr[2] > vr[1] > vr[0]): return None # 볼륨 가속 + if vr[2] < VOL_MIN: return None # VOL 임계값 + + atr_raw = float(b['atr_raw'].iloc[-1]) if not pd.isna(b['atr_raw'].iloc[-1]) else 0.0 + return { + 'sig_ts': b.index[-1], + 'sig_price': float(c[2]), + 'vr': list(vr), + 'prices': list(c), + 'atr_raw': atr_raw, + } + + +# ── 주문 ────────────────────────────────────────────────────────────────────── +def do_buy(ticker: str, krw_amount: int) -> Optional[float]: + """시장가 매수. 실제 체결 수량 반환. 실패 시 None.""" + if SIM_MODE: + current = pyupbit.get_current_price(ticker) + qty = krw_amount * (1 - FEE) / current + log.info(f"[SIM 매수] {ticker} {krw_amount:,}원 → {qty:.6f}개 @ {current:,.0f}") + return qty + + try: + krw_bal = upbit.get_balance("KRW") + if krw_bal is None or krw_bal < krw_amount: + log.warning(f"KRW 잔고 부족: {krw_bal:,.0f}원 < {krw_amount:,}원") + return None + + order = upbit.buy_market_order(ticker, krw_amount) + if not order or 'error' in str(order): + log.error(f"매수 주문 실패: {order}") + return None + + # 체결 대기 후 실제 보유량 조회 + time.sleep(1.5) + coin = ticker.split('-')[1] + qty = upbit.get_balance(coin) + log.info(f"[매수 완료] {ticker} {krw_amount:,}원 → {qty:.6f}개 uuid={order.get('uuid','')[:8]}") + return qty if qty and qty > 0 else None + + except Exception as e: + log.error(f"매수 오류 {ticker}: {e}") + return None + + +def do_sell(ticker: str, qty: float) -> Optional[float]: + """시장가 매도. 체결가(추정) 반환. 실패 시 None.""" + if SIM_MODE: + current = pyupbit.get_current_price(ticker) + log.info(f"[SIM 매도] {ticker} {qty:.6f}개 @ {current:,.0f}") + return current + + try: + order = upbit.sell_market_order(ticker, qty) + if not order or 'error' in str(order): + log.error(f"매도 주문 실패: {order}") + return None + + time.sleep(1.5) + current = pyupbit.get_current_price(ticker) + log.info(f"[매도 완료] {ticker} {qty:.6f}개 uuid={order.get('uuid','')[:8]}") + return current + + except Exception as e: + log.error(f"매도 오류 {ticker}: {e}") + return None + + +# ── 포지션 관리 ─────────────────────────────────────────────────────────────── +positions: dict = {} + + +def enter_position(ticker: str, sig: dict, entry_price: float) -> None: + ar = sig['atr_raw'] + atr_stop = max(ATR_MIN_R, min(ATR_MAX_R, ar * ATR_MULT)) if ar > 0 else ATR_MAX_R + + qty = do_buy(ticker, PER_POS) + if qty is None: + log.warning(f"[진입 실패] {ticker} — 매수 주문 오류") + return + + positions[ticker] = { + 'entry_price': entry_price, + 'entry_ts': datetime.now(), + 'running_peak': entry_price, + 'qty': qty, + 'vr': sig['vr'], + 'prices': sig['prices'], + } + + log.info(f"[진입] {ticker} {entry_price:,.0f}원 vol {sig['vr'][2]:.1f}x cascade①2%②1%③0.5%④0.1%⑤trail0.4%") + tg( + f"🟢 매수 완료 {ticker}\n" + f"체결가: {entry_price:,.0f}원 수량: {qty:.6f}\n" + f"전략: ①2% ②1% ③0.5% ④0.1%(본전) ⑤Trail0.4%\n" + f"{'[시뮬]' if SIM_MODE else '[실거래]'}" + ) + + +def _do_exit(ticker: str, current_price: float, reason: str) -> bool: + """공통 청산 처리. reason: 'trail' | 'timeout'""" + pos = positions[ticker] + exit_price = do_sell(ticker, pos['qty']) + if exit_price is None: + exit_price = current_price + + pnl = (exit_price - pos['entry_price']) / pos['entry_price'] * 100 + krw = PER_POS * (pnl / 100) - PER_POS * FEE * 2 + held = int((datetime.now() - pos['entry_ts']).total_seconds() / 60) + + icon = "✅" if pnl > 0 else "🔴" + reason_tag = { + '①2%': '① +2.0% 익절', + '②1%': '② +1.0% 익절', + '③0.5%': '③ +0.5% 익절', + '④0.1%': '④ +0.1% 본전', + '⑤trail': '⑤ 트레일스탑', + 'timeout': '⑤ 타임아웃', + }.get(reason, reason) + msg = ( + f"{icon} 청산 {ticker} [{reason_tag}]\n" + f"진입: {pos['entry_price']:,.0f}원\n" + f"고점: {pos['running_peak']:,.0f}원 ({(pos['running_peak']/pos['entry_price']-1)*100:+.2f}%)\n" + f"청산: {exit_price:,.0f}원\n" + f"PNL: {pnl:+.2f}% ({krw:+,.0f}원) {held}분 보유\n" + f"{'[시뮬]' if SIM_MODE else '[실거래]'}" + ) + log.info( + f"[청산/{reason}] {ticker} {exit_price:,.0f}원 " + f"PNL {pnl:+.2f}% {krw:+,.0f}원 {held}분 보유" + ) + tg(msg) + del positions[ticker] + return True + + +def update_position(ticker: str, current_price: float) -> bool: + """Cascade 청산 체크. 청산 시 True 반환. + + ① 0~ 2분: +2.0% limit + ② 2~ 5분: +1.0% limit + ③ 5~35분: +0.5% limit + ④ 35~155분: +0.1% limit (본전) + ⑤ 155분~: Trail Stop 0.4% + """ + pos = positions[ticker] + ep = pos['entry_price'] + held = int((datetime.now() - pos['entry_ts']).total_seconds() / 60) + + # 항상 고점 갱신 (⑤ trail 진입 시 정확한 고점 기준) + pos['running_peak'] = max(pos['running_peak'], current_price) + + # ①②③④: cascade limit 단계 + stage_labels = {0: '①2%', 2: '②1%', 5: '③0.5%', 35: '④0.1%'} + for start, end, lr in CASCADE_STAGES: + if start <= held < end: + if current_price >= ep * (1 + lr): + return _do_exit(ticker, current_price, stage_labels[start]) + return False + + # ⑤: Trail Stop 0.4% + drop = (pos['running_peak'] - current_price) / pos['running_peak'] + if drop >= TRAIL_STOP_R: + return _do_exit(ticker, current_price, '⑤trail') + + # 타임아웃: 4시간 경과 + 본전 이하 + if held >= TIMEOUT_BARS and current_price <= ep: + return _do_exit(ticker, current_price, 'timeout') + + return False + + +# ── 메인 루프 ───────────────────────────────────────────────────────────────── +def run_once() -> None: + for ticker in TICKERS: + try: + df = pyupbit.get_ohlcv(ticker, interval='minute1', count=FETCH_BARS) + time.sleep(API_DELAY) + except Exception as e: + log.warning(f"{ticker} API 오류: {e}") + continue + + if df is None or len(df) < 30: + continue + + df = compute_indicators(df) + current_price = float(df['close'].iloc[-1]) + + # 열린 포지션: trail stop 체크 + if ticker in positions: + update_position(ticker, current_price) + continue + + # 신규 진입: 시그널 체크 (슬롯 여부와 관계없이 항상 탐지) + sig = check_signal(df) + if sig: + vr = sig['vr'] + pr = sig['prices'] + slot_tag = f"→ 매수 진행" if len(positions) < MAX_POS else f"⚠️ 슬롯 {len(positions)}/{MAX_POS} 꽉 참" + # 시그널 감지 즉시 알림 + tg( + f"🔔 시그널 {ticker}\n" + f"가격: {pr[0]:,.0f}→{pr[1]:,.0f}→{pr[2]:,.0f}\n" + f"볼륨: {vr[0]:.1f}x→{vr[1]:.1f}x→{vr[2]:.1f}x\n" + f"현재가: {current_price:,.0f}원 ATR: {sig['atr_raw']*100:.2f}%\n" + f"{slot_tag}" + ) + log.info(f"[시그널] {ticker} {current_price:,.0f}원 vol {vr[2]:.1f}x {slot_tag}") + if len(positions) < MAX_POS: + enter_position(ticker, sig, current_price) + + pos_str = ', '.join( + f"{t}({p['entry_price']:,.0f}→{p['running_peak']:,.0f}, {((p['running_peak']/p['entry_price'])-1)*100:+.1f}%)" + for t, p in positions.items() + ) or "없음" + log.info(f"[상태] 포지션 {len(positions)}/{MAX_POS}: {pos_str}") + + +def main(): + mode = "🔴 실거래" if not SIM_MODE else "🟡 시뮬레이션" + log.info(f"=== 실시간 트레이더 시작 ({mode}) ===") + log.info(f"전략: 3봉 vol가속 VOL≥{VOL_MIN}x, cascade①2%②1%③0.5%④0.1%⑤Trail{TRAIL_STOP_R*100:.1f}%") + log.info(f"종목: {len(TICKERS)}개 포지션당 {PER_POS:,}원 최대 {MAX_POS}개") + + tg( + f"🚀 트레이더 시작 ({mode})\n" + f"3봉 VOL≥{VOL_MIN}x cascade①2%②1%③0.5%④0.1%⑤Trail{TRAIL_STOP_R*100:.1f}%\n" + f"종목 {len(TICKERS)}개 포지션당 {PER_POS:,}원 최대 {MAX_POS}개" + ) + + while True: + t0 = time.time() + try: + run_once() + except Exception as e: + log.error(f"루프 오류: {e}") + + elapsed = time.time() - t0 + sleep = max(1.0, POLL_SEC - elapsed) + log.info(f"[대기] {sleep:.0f}초 후 다음 체크") + time.sleep(sleep) + + +if __name__ == '__main__': + main() diff --git a/daemons/tick_collector.py b/daemons/tick_collector.py new file mode 100644 index 0000000..cf93cc7 --- /dev/null +++ b/daemons/tick_collector.py @@ -0,0 +1,118 @@ +"""30초마다 전 종목 현재가를 Oracle price_tick 테이블에 적재. ++ 60초마다 backtest_ohlcv 1분봉 최신 데이터 갱신. +""" +import sys, os, time, logging +from datetime import datetime + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from dotenv import load_dotenv +load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) + +import pyupbit +import oracledb + +TICKERS = [ + 'KRW-XRP', 'KRW-BTC', 'KRW-ETH', 'KRW-SOL', 'KRW-DOGE', + 'KRW-ADA', 'KRW-SUI', 'KRW-NEAR', 'KRW-KAVA', 'KRW-SXP', + 'KRW-AKT', 'KRW-SONIC', 'KRW-IP', 'KRW-ORBS', 'KRW-VIRTUAL', + 'KRW-BARD', 'KRW-XPL', 'KRW-KITE', 'KRW-ENSO', 'KRW-0G', + 'KRW-MANTRA', 'KRW-EDGE', 'KRW-CFG', 'KRW-ARDR', 'KRW-SIGN', + 'KRW-AZTEC', 'KRW-ATH', 'KRW-HOLO', 'KRW-BREV', 'KRW-SHIB', +] +INTERVAL = 30 # 초 +OHLCV_INTERVAL = 60 # 1분봉 갱신 주기 (초) + +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + handlers=[ + logging.FileHandler('/tmp/tick_collector.log'), + logging.StreamHandler(sys.stdout), + ] +) +log = logging.getLogger(__name__) + + +def get_conn(): + kwargs = dict(user=os.environ["ORACLE_USER"], password=os.environ["ORACLE_PASSWORD"], + dsn=os.environ["ORACLE_DSN"]) + if w := os.environ.get("ORACLE_WALLET"): + kwargs["config_dir"] = w + return oracledb.connect(**kwargs) + + +def collect(conn): + prices = pyupbit.get_current_price(TICKERS) + if not prices: + log.warning("현재가 조회 실패") + return + + ts = datetime.now().replace(microsecond=0) + rows = [(t, ts, p) for t, p in prices.items() if p] + + cur = conn.cursor() + cur.executemany( + "INSERT INTO price_tick (ticker, ts, price) VALUES (:1, :2, :3)", + rows + ) + conn.commit() + log.info(f"적재 {len(rows)}건 ts={ts}") + + +def refresh_ohlcv(conn): + """backtest_ohlcv 1분봉을 최근 5개씩 갱신 (중복 무시).""" + total = 0 + for ticker in TICKERS: + try: + df = pyupbit.get_ohlcv(ticker, interval='minute1', count=5) + if df is None or df.empty: + continue + rows = [ + (ticker, 'minute1', ts.to_pydatetime(), + float(r['open']), float(r['high']), float(r['low']), + float(r['close']), float(r['volume'])) + for ts, r in df.iterrows() + ] + cur = conn.cursor() + cur.executemany( + "INSERT INTO backtest_ohlcv " + "(ticker,interval_cd,ts,open_p,high_p,low_p,close_p,volume_p) " + "VALUES (:1,:2,:3,:4,:5,:6,:7,:8)", + rows, batcherrors=True, + ) + inserted = len(rows) - len(cur.getbatcherrors()) + total += inserted + time.sleep(0.15) + except Exception as e: + log.warning(f"[ohlcv] {ticker} 오류: {e}") + conn.commit() + if total > 0: + log.info(f"[ohlcv] 1분봉 갱신 {total}건") + + +def main(): + log.info("=== tick_collector 시작 (30초 간격 + 1분봉 갱신) ===") + conn = get_conn() + last_ohlcv = 0 + while True: + t0 = time.time() + try: + collect(conn) + # 1분봉 갱신 (OHLCV_INTERVAL마다) + if t0 - last_ohlcv >= OHLCV_INTERVAL: + refresh_ohlcv(conn) + last_ohlcv = t0 + except oracledb.DatabaseError as e: + log.error(f"DB 오류: {e} — 재연결") + try: conn.close() + except: pass + conn = get_conn() + except Exception as e: + log.error(f"오류: {e}") + + elapsed = time.time() - t0 + time.sleep(max(1.0, INTERVAL - elapsed)) + + +if __name__ == '__main__': + main() diff --git a/daemons/tick_trader.py b/daemons/tick_trader.py new file mode 100644 index 0000000..b163811 --- /dev/null +++ b/daemons/tick_trader.py @@ -0,0 +1,591 @@ +"""WebSocket 기반 20초봉 트레이더. + +구조: + WebSocket → trade tick 수신 → 20초봉 집계 → 3봉 가속 시그널(VOL≥8x) → cascade 청산 + +cascade (초 기준): + ① 0~ 40초: +2.0% 지정가 + ② 40~ 100초: +1.0% 지정가 + ③ 100~ 300초: +0.5% 지정가 + ④ 300~3500초: +0.1% 지정가 + ⑤ 3500초~: Trail Stop 0.8% 시장가 + +실행: + .venv/bin/python3 daemons/tick_trader.py +로그: + /tmp/tick_trader.log +""" +import sys, os, time, logging, threading, requests, math +from datetime import datetime, timedelta +from collections import deque, defaultdict +from typing import Optional + +sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +from dotenv import load_dotenv +load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) + +from core.llm_advisor import get_exit_price + +import pyupbit + +# ── 전략 파라미터 ────────────────────────────────────────────────────────────── +TICKERS = [ + 'KRW-XRP', 'KRW-BTC', 'KRW-ETH', 'KRW-SOL', 'KRW-DOGE', + 'KRW-ADA', 'KRW-SUI', 'KRW-NEAR', 'KRW-KAVA', 'KRW-SXP', + 'KRW-AKT', 'KRW-SONIC', 'KRW-IP', 'KRW-ORBS', 'KRW-VIRTUAL', + 'KRW-BARD', 'KRW-XPL', 'KRW-KITE', 'KRW-ENSO', 'KRW-0G', + 'KRW-MANTRA', 'KRW-EDGE', 'KRW-CFG', 'KRW-ARDR', 'KRW-SIGN', + 'KRW-AZTEC', 'KRW-ATH', 'KRW-HOLO', 'KRW-BREV', 'KRW-SHIB', +] + +BAR_SEC = 20 # 봉 주기 (초) +VOL_LOOKBACK = 61 # 거래량 평균 기준 봉 수 +ATR_LOOKBACK = 28 # ATR 계산 봉 수 +VOL_MIN = 8.0 # 거래량 배수 임계값 + +MAX_POS = int(os.environ.get('MAX_POSITIONS', 3)) +PER_POS = int(os.environ.get('MAX_BUDGET', 15_000_000)) // MAX_POS +FEE = 0.0005 + +# cascade 청산 (초 기준) — 지정가 매도 +CASCADE_STAGES = [ + (0, 40, 0.020, '①'), # 2봉 + (40, 100, 0.010, '②'), # 3봉 + (100, 300, 0.005, '③'), # 10봉 + (300, 3500, 0.001, '④'), # 160봉 +] +TRAIL_STOP_R = 0.008 +TIMEOUT_SECS = 14400 # 4시간 +LLM_INTERVAL = 60 # LLM 호출 간격 (초) +LLM_MIN_ELAPSED = 60 # 진입 후 최소 N초 이후부터 LLM 활성 + +SIM_MODE = os.environ.get('SIMULATION_MODE', 'true').lower() == 'true' + +upbit_client = pyupbit.Upbit(os.environ['ACCESS_KEY'], os.environ['SECRET_KEY']) + +TG_TOKEN = os.environ.get('TELEGRAM_TRADE_TOKEN', '') +TG_CHAT_ID = os.environ.get('TELEGRAM_CHAT_ID', '') + +# ── 로깅 ────────────────────────────────────────────────────────────────────── +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s %(levelname)s %(message)s', + handlers=[ + logging.FileHandler('/tmp/tick_trader.log'), + ] +) +log = logging.getLogger(__name__) + + +def tg(msg: str) -> None: + if not TG_TOKEN or not TG_CHAT_ID: + return + try: + requests.post( + f'https://api.telegram.org/bot{TG_TOKEN}/sendMessage', + json={'chat_id': TG_CHAT_ID, 'text': msg, 'parse_mode': 'HTML'}, + timeout=5, + ) + except Exception as e: + log.warning(f'Telegram 전송 실패: {e}') + + +# ── 20초봉 집계 ─────────────────────────────────────────────────────────────── +bars: dict = defaultdict(lambda: deque(maxlen=VOL_LOOKBACK + 10)) +cur_bar: dict = {} +bar_lock = threading.Lock() + + +def _new_bar(price: float, volume: float, ts: datetime) -> dict: + return {'open': price, 'high': price, 'low': price, + 'close': price, 'volume': volume, 'ts': ts} + + +def on_tick(ticker: str, price: float, volume: float) -> None: + with bar_lock: + if ticker not in cur_bar: + cur_bar[ticker] = _new_bar(price, volume, datetime.now()) + return + b = cur_bar[ticker] + b['high'] = max(b['high'], price) + b['low'] = min(b['low'], price) + b['close'] = price + b['volume'] += volume + + +def finalize_bars() -> None: + """BAR_SEC마다 봉 확정 + 지정가 체결 확인.""" + while True: + time.sleep(BAR_SEC) + now = datetime.now() + with bar_lock: + for ticker in list(cur_bar.keys()): + b = cur_bar[ticker] + if b['volume'] == 0: + continue + bars[ticker].append(b) + cur_bar[ticker] = _new_bar(b['close'], 0, now) + check_and_enter(ticker) + # 봉 확정 후 지정가 체결 확인 (bar_lock 밖에서) + check_filled_positions() + + +# ── 지표 계산 ───────────────────────────────────────────────────────────────── +def calc_vr(bar_list: list, idx: int) -> float: + start = max(0, idx - VOL_LOOKBACK) + end = max(0, idx - 2) + baseline = [bar_list[i]['volume'] for i in range(start, end)] + if not baseline: + return 0.0 + avg = sum(baseline) / len(baseline) + return bar_list[idx]['volume'] / avg if avg > 0 else 0.0 + + +def calc_atr(bar_list: list) -> float: + if len(bar_list) < ATR_LOOKBACK + 2: + return 0.0 + trs = [] + for i in range(-ATR_LOOKBACK - 1, -1): + b = bar_list[i] + bp = bar_list[i - 1] + tr = max(b['high'] - b['low'], + abs(b['high'] - bp['close']), + abs(b['low'] - bp['close'])) + trs.append(tr) + prev_close = bar_list[-2]['close'] + return (sum(trs) / len(trs)) / prev_close if prev_close > 0 else 0.0 + + +# ── 시그널 감지 ─────────────────────────────────────────────────────────────── +def check_and_enter(ticker: str) -> None: + bar_list = list(bars[ticker]) + n = len(bar_list) + + if n < VOL_LOOKBACK + 5: + return + if ticker in positions: + return + if len(positions) >= MAX_POS: + return + + b0, b1, b2 = bar_list[-3], bar_list[-2], bar_list[-1] + + if not all(b['close'] > b['open'] for b in [b0, b1, b2]): + return + if not (b2['close'] > b1['close'] > b0['close']): + return + + vr2 = calc_vr(bar_list, n - 1) + vr1 = calc_vr(bar_list, n - 2) + vr0 = calc_vr(bar_list, n - 3) + + if vr2 < VOL_MIN or not (vr2 > vr1 > vr0): + return + + atr_raw = calc_atr(bar_list) + entry_price = b2['close'] + + log.info(f"[시그널] {ticker} {entry_price:,.0f}원 vol {vr2:.1f}x") + tg( + f"🔔 시그널 {ticker}\n" + f"가격: {b0['close']:,.0f}→{b1['close']:,.0f}→{b2['close']:,.0f}\n" + f"볼륨: {vr0:.1f}x→{vr1:.1f}x→{vr2:.1f}x" + ) + enter_position(ticker, entry_price, atr_raw, [vr0, vr1, vr2]) + + +# ── 주문 ────────────────────────────────────────────────────────────────────── +def do_buy(ticker: str) -> tuple: + """시장가 매수. Returns (qty, avg_price).""" + if SIM_MODE: + price = pyupbit.get_current_price(ticker) + qty = PER_POS * (1 - FEE) / price + log.info(f"[SIM 매수] {ticker} {PER_POS:,}원 → {qty:.6f}개 @ {price:,.0f}") + return qty, price + try: + order = upbit_client.buy_market_order(ticker, PER_POS) + if not order or 'error' in str(order): + log.error(f"매수 실패: {order}") + return None, None + uuid = order.get('uuid') + time.sleep(1.5) + qty = upbit_client.get_balance(ticker.split('-')[1]) + avg_price = _avg_price_from_order(uuid) if uuid else None + if not avg_price: + avg_price = pyupbit.get_current_price(ticker) + return (qty if qty and qty > 0 else None), avg_price + except Exception as e: + log.error(f"매수 오류 {ticker}: {e}") + return None, None + + +def _round_price(price: float) -> float: + """Upbit 주문가격 단위로 내림 처리 (invalid_price_ask 방지).""" + if price >= 2_000_000: unit = 1000 + elif price >= 1_000_000: unit = 500 + elif price >= 500_000: unit = 100 + elif price >= 100_000: unit = 50 + elif price >= 10_000: unit = 10 + elif price >= 1_000: unit = 5 + elif price >= 100: unit = 1 + elif price >= 10: unit = 0.1 + else: unit = 0.01 + return math.floor(price / unit) * unit + + +def submit_limit_sell(ticker: str, qty: float, price: float) -> Optional[str]: + """지정가 매도 주문. Returns UUID.""" + price = _round_price(price) + if SIM_MODE: + return f"sim-{ticker}" + try: + order = upbit_client.sell_limit_order(ticker, price, qty) + if not order or 'error' in str(order): + log.error(f"지정가 매도 제출 실패: {order}") + return None + return order.get('uuid') + except Exception as e: + log.error(f"지정가 매도 오류 {ticker}: {e}") + return None + + +def cancel_order_safe(uuid: Optional[str]) -> None: + if SIM_MODE or not uuid or uuid.startswith('sim-'): + return + try: + upbit_client.cancel_order(uuid) + except Exception as e: + log.warning(f"주문 취소 실패 {uuid}: {e}") + + +def check_order_state(uuid: str) -> tuple: + """Returns (state, avg_price). state: 'done'|'wait'|'cancel'|None""" + try: + detail = upbit_client.get_order(uuid) + if not detail: + return None, None + state = detail.get('state') + avg_price = float(detail.get('avg_price') or 0) or None + return state, avg_price + except Exception as e: + log.warning(f"주문 조회 실패 {uuid}: {e}") + return None, None + + +def _avg_price_from_order(uuid: str) -> Optional[float]: + try: + detail = upbit_client.get_order(uuid) + if not detail: + return None + trades = detail.get('trades', []) + if trades: + total_funds = sum(float(t['funds']) for t in trades) + total_vol = sum(float(t['volume']) for t in trades) + return total_funds / total_vol if total_vol > 0 else None + avg = detail.get('avg_price') + return float(avg) if avg else None + except Exception as e: + log.warning(f"체결가 조회 실패 {uuid}: {e}") + return None + + +def do_sell_market(ticker: str, qty: float) -> Optional[float]: + """Trail Stop / Timeout용 시장가 매도.""" + if SIM_MODE: + price = pyupbit.get_current_price(ticker) + log.info(f"[SIM 시장가매도] {ticker} {qty:.6f}개 @ {price:,.0f}") + return price + try: + order = upbit_client.sell_market_order(ticker, qty) + if not order or 'error' in str(order): + log.error(f"시장가 매도 실패: {order}") + return None + uuid = order.get('uuid') + time.sleep(1.5) + avg_price = _avg_price_from_order(uuid) if uuid else None + return avg_price or pyupbit.get_current_price(ticker) + except Exception as e: + log.error(f"시장가 매도 오류 {ticker}: {e}") + return None + + +# ── 포지션 관리 ─────────────────────────────────────────────────────────────── +positions: dict = {} + + +def enter_position(ticker: str, entry_price: float, atr_raw: float, vr: list) -> None: + qty, actual_price = do_buy(ticker) + if qty is None: + log.warning(f"[진입 실패] {ticker}") + return + + entry_price = actual_price or entry_price + + # ① 지정가 매도 즉시 제출 + _, _, lr, tag = CASCADE_STAGES[0] + target = entry_price * (1 + lr) + sell_uuid = submit_limit_sell(ticker, qty, target) + + positions[ticker] = { + 'entry_price': entry_price, + 'entry_ts': datetime.now(), + 'running_peak': entry_price, + 'qty': qty, + 'stage': 0, + 'sell_uuid': sell_uuid, + 'sell_price': target, + 'llm_last_ts': None, # LLM 마지막 호출 시각 + } + log.info(f"[진입] {ticker} {entry_price:,.0f}원 vol {vr[2]:.1f}x " + f"지정가 {tag} {target:,.0f}원") + tg( + f"🟢 매수 {ticker}\n" + f"체결가: {entry_price:,.0f}원 수량: {qty:.6f}\n" + f"지정가 매도 제출: {tag} {target:,.0f}원 (+{lr*100:.1f}%)\n" + f"{'[시뮬]' if SIM_MODE else '[실거래]'}" + ) + + +def _advance_stage(ticker: str) -> None: + """다음 cascade 단계로 전환. 기존 지정가 취소 후 재주문.""" + pos = positions[ticker] + cancel_order_safe(pos.get('sell_uuid')) + next_stage = pos['stage'] + 1 + pos['stage'] = next_stage + + if next_stage < len(CASCADE_STAGES): + _, _, lr, tag = CASCADE_STAGES[next_stage] + target = pos['entry_price'] * (1 + lr) + uuid = submit_limit_sell(ticker, pos['qty'], target) + pos['sell_uuid'] = uuid + pos['sell_price'] = target + log.info(f"[단계전환] {ticker} → {tag} 목표가 {target:,.0f}원") + else: + pos['sell_uuid'] = None + pos['sell_price'] = None + log.info(f"[단계전환] {ticker} → ⑤ Trail Stop") + + +def _record_exit(ticker: str, exit_price: float, tag: str) -> None: + """체결 완료 후 포지션 종료 처리.""" + pos = positions[ticker] + pnl = (exit_price - pos['entry_price']) / pos['entry_price'] * 100 + krw = PER_POS * (pnl / 100) - PER_POS * FEE * 2 + held = int((datetime.now() - pos['entry_ts']).total_seconds()) + + reason_tag = { + '①': '① +2.0% 익절', '②': '② +1.0% 익절', + '③': '③ +0.5% 익절', '④': '④ +0.1% 본전', + 'trail': '⑤ 트레일스탑', 'timeout': '⑤ 타임아웃', + }.get(tag, tag) + + icon = "✅" if pnl > 0 else "🔴" + log.info(f"[청산/{tag}] {ticker} {exit_price:,.0f}원 PNL {pnl:+.2f}% {krw:+,.0f}원 {held}초 보유") + tg( + f"{icon} 청산 {ticker} [{reason_tag}]\n" + f"진입: {pos['entry_price']:,.0f}원\n" + f"청산: {exit_price:,.0f}원\n" + f"PNL: {pnl:+.2f}% ({krw:+,.0f}원) {held}초 보유\n" + f"{'[시뮬]' if SIM_MODE else '[실거래]'}" + ) + del positions[ticker] + + +def _should_call_llm(pos: dict, elapsed: float) -> bool: + """LLM 호출 조건: 진입 후 LLM_MIN_ELAPSED 초 경과 + LLM_INTERVAL 간격.""" + if elapsed < LLM_MIN_ELAPSED: + return False + last = pos.get('llm_last_ts') + if last is None: + return True + return (datetime.now() - last).total_seconds() >= LLM_INTERVAL + + +def check_filled_positions() -> None: + """20초마다 지정가 체결 확인. + + 흐름: + 1. 체결 완료 확인 + 2. LLM 어드바이저 호출 (1분 주기) → 목표가 반환 시 주문 교체 + 3. LLM hold/오류 시 cascade fallback (단계 시간 초과 → 다음 단계) + """ + for ticker in list(positions.keys()): + if ticker not in positions: + continue + pos = positions[ticker] + uuid = pos.get('sell_uuid') + elapsed = (datetime.now() - pos['entry_ts']).total_seconds() + + if uuid is None: + # Trail Stop 구간 — update_positions(tick)에서 처리 + continue + + stage = pos['stage'] + _, end, _, tag = CASCADE_STAGES[stage] + bar_list = list(bars.get(ticker, [])) + + if SIM_MODE: + # SIM: 최근 봉 고가가 목표가 이상이면 체결 + if bar_list and bar_list[-1]['high'] >= pos['sell_price']: + _record_exit(ticker, pos['sell_price'], tag) + continue + else: + # 실거래: API로 체결 확인 + state, avg_price = check_order_state(uuid) + if state == 'done': + _record_exit(ticker, avg_price or pos['sell_price'], tag) + continue + if state in ('cancel', None): + _advance_stage(ticker) + continue + + # ── LLM 어드바이저 (primary) ────────────────────────────────────── + if _should_call_llm(pos, elapsed): + pos['llm_last_ts'] = datetime.now() + current_price = bar_list[-1]['close'] if bar_list else pos['sell_price'] + new_price = get_exit_price(ticker, pos, bar_list, current_price) + if new_price is not None: + cancel_order_safe(uuid) + new_uuid = submit_limit_sell(ticker, pos['qty'], new_price) + pos['sell_uuid'] = new_uuid + pos['sell_price'] = new_price + pos['llm_active'] = True + continue + else: + pos['llm_active'] = False + + # ── Cascade fallback: LLM 실패 시에만 단계 전환 ────────────────── + if not pos.get('llm_active') and elapsed >= end: + _advance_stage(ticker) + + +def update_positions(current_prices: dict) -> None: + """tick마다 Trail Stop / Timeout 체크 — ③ 종료(300s) 이후에만 동작.""" + stage3_end = CASCADE_STAGES[2][1] # 300초 + + for ticker in list(positions.keys()): + if ticker not in current_prices: + continue + pos = positions[ticker] + price = current_prices[ticker] + elapsed = (datetime.now() - pos['entry_ts']).total_seconds() + + # ③ 이전: peak 추적 안 함, Trail Stop 비활성 + if elapsed < stage3_end: + continue + + # ③ 종료 직후 첫 틱: peak을 현재가로 초기화 (진입가 기준 제거) + if not pos.get('trail_peak_set'): + pos['running_peak'] = price + pos['trail_peak_set'] = True + else: + pos['running_peak'] = max(pos['running_peak'], price) + + # 지정가 주문 중이면 Trail Stop 비활성 + if pos.get('sell_uuid') is not None: + continue + + drop = (pos['running_peak'] - price) / pos['running_peak'] + + if drop >= TRAIL_STOP_R: + exit_price = do_sell_market(ticker, pos['qty']) or price + _record_exit(ticker, exit_price, 'trail') + elif elapsed >= TIMEOUT_SECS and price <= pos['entry_price']: + exit_price = do_sell_market(ticker, pos['qty']) or price + _record_exit(ticker, exit_price, 'timeout') + + +# ── 메인 ────────────────────────────────────────────────────────────────────── +def preload_bars() -> None: + need_min = (VOL_LOOKBACK + 10) // 3 + 1 + log.info(f"[사전적재] REST API 1분봉 {need_min}개로 bars[] 초기화 중...") + loaded = 0 + for ticker in TICKERS: + for attempt in range(3): + try: + df = pyupbit.get_ohlcv(ticker, interval='minute1', count=need_min) + if df is None or df.empty: + time.sleep(0.5) + continue + with bar_lock: + for _, row in df.iterrows(): + o, h, l, c = float(row['open']), float(row['high']), float(row['low']), float(row['close']) + v3 = float(row['volume']) / 3 + ts = row.name.to_pydatetime() + for _ in range(3): + bars[ticker].append({'open': o, 'high': h, 'low': l, 'close': c, 'volume': v3, 'ts': ts}) + loaded += 1 + break + except Exception as e: + log.warning(f"[사전적재] {ticker} 시도{attempt+1} 실패: {e}") + time.sleep(1) + time.sleep(0.2) + log.info(f"[사전적재] 완료 {loaded}/{len(TICKERS)} 티커") + + +def main(): + mode = "🔴 실거래" if not SIM_MODE else "🟡 시뮬레이션" + log.info(f"=== tick_trader 시작 ({mode}) ===") + log.info(f"봉주기: 20초 | VOL >= {VOL_MIN}x | 포지션 최대 {MAX_POS}개 | 1개당 {PER_POS:,}원") + stage_nums = ['①','②','③','④','⑤','⑥'] + stage_desc = ' → '.join( + f"{stage_nums[i]} {s[1]}초 +{s[2]*100:.1f}%" for i, s in enumerate(CASCADE_STAGES) + ) + log.info(f"청산: {stage_desc} → {stage_nums[len(CASCADE_STAGES)]} Trail -{TRAIL_STOP_R*100:.1f}% (지정가→시장가)") + tg( + f"🚀 tick_trader 시작 ({mode})\n" + f"봉주기 20초 | VOL ≥ {VOL_MIN}x | 최대 {MAX_POS}포지션\n" + f"① 40초 +2.0% 지정가\n" + f"② 100초 +1.0% 지정가\n" + f"③ 700초 +0.5% 지정가\n" + f"④ 3100초 +0.1% 지정가\n" + f"⑤ Trail -{TRAIL_STOP_R*100:.1f}% 시장가" + ) + + preload_bars() + + t = threading.Thread(target=finalize_bars, daemon=True) + t.start() + + ws = pyupbit.WebSocketManager("trade", TICKERS) + log.info("WebSocket 연결됨") + + last_pos_log = time.time() + + while True: + try: + data = ws.get() + if data is None: + continue + + ticker = data.get('code') + price = data.get('trade_price') + volume = data.get('trade_volume') + + if not ticker or price is None or volume is None: + continue + + on_tick(ticker, float(price), float(volume)) + + if positions: + update_positions({ticker: float(price)}) + + if time.time() - last_pos_log > 60: + warmed = sum(1 for t in TICKERS if len(bars[t]) >= VOL_LOOKBACK + 5) + if positions: + pos_lines = ' '.join( + f"{t.split('-')[1]} {p['entry_price']:,.0f}→{p['running_peak']:,.0f} [{CASCADE_STAGES[p['stage']][3] if p['stage'] < len(CASCADE_STAGES) else '⑤'}]" + for t, p in positions.items() + ) + log.info(f"[상태] 포지션 {len(positions)}/{MAX_POS} {pos_lines}") + else: + log.info(f"[상태] 포지션 없음 ({warmed}/{len(TICKERS)} 준비완료)") + last_pos_log = time.time() + + except Exception as e: + log.error(f"루프 오류: {e}") + time.sleep(1) + + +if __name__ == '__main__': + main() diff --git a/ecosystem.config.js b/ecosystem.config.js index 25f1ff4..52810f2 100644 --- a/ecosystem.config.js +++ b/ecosystem.config.js @@ -11,5 +11,38 @@ module.exports = { autorestart: true, watch: false, }, + { + name: "tick-collector", + script: "daemons/tick_collector.py", + interpreter: ".venv/bin/python3", + cwd: "/Users/joungmin/workspaces/upbit-trader", + out_file: "logs/tick-collector.log", + error_file: "logs/tick-collector-error.log", + log_date_format: "YYYY-MM-DD HH:mm:ss", + autorestart: true, + watch: false, + }, + { + name: "tick-trader", + script: "daemons/tick_trader.py", + interpreter: ".venv/bin/python3", + cwd: "/Users/joungmin/workspaces/upbit-trader", + out_file: "logs/tick-trader.log", + error_file: "logs/tick-trader-error.log", + log_date_format: "YYYY-MM-DD HH:mm:ss", + autorestart: true, + watch: false, + }, + { + name: "context-collector", + script: "daemons/context_collector.py", + interpreter: ".venv/bin/python3", + cwd: "/Users/joungmin/workspaces/upbit-trader", + out_file: "logs/context-collector.log", + error_file: "logs/context-collector-error.log", + log_date_format: "YYYY-MM-DD HH:mm:ss", + autorestart: true, + watch: false, + }, ], };