"""10초 주기로 Upbit 잔고/미체결 주문을 조회하여 position_sync 테이블 동기화. 상태: PENDING_BUY — 매수 주문 제출됨 (미체결) HOLDING — 보유 중 (매도 주문 없음) PENDING_SELL — 매도 주문 제출됨 (미체결) IDLE — 아무 것도 없음 (행 삭제) tick_trader는 이 테이블을 읽어서 positions/pending_buys를 복구한다. 실행: .venv/bin/python3 daemons/state_sync.py 로그: /tmp/state_sync.log """ import sys, os, time, logging from datetime import datetime sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from dotenv import load_dotenv load_dotenv(os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), '.env')) import pyupbit import oracledb TICKERS = [ 'KRW-XRP', 'KRW-BTC', 'KRW-ETH', 'KRW-SOL', 'KRW-DOGE', 'KRW-ADA', 'KRW-SUI', 'KRW-NEAR', 'KRW-KAVA', 'KRW-SXP', 'KRW-AKT', 'KRW-SONIC', 'KRW-IP', 'KRW-ORBS', 'KRW-VIRTUAL', 'KRW-BARD', 'KRW-XPL', 'KRW-KITE', 'KRW-ENSO', 'KRW-0G', 'KRW-MANTRA', 'KRW-EDGE', 'KRW-CFG', 'KRW-ARDR', 'KRW-SIGN', 'KRW-AZTEC', 'KRW-ATH', 'KRW-HOLO', 'KRW-BREV', 'KRW-SHIB', ] INTERVAL = 10 # 초 logging.basicConfig( level=logging.INFO, format='%(asctime)s %(levelname)s %(message)s', handlers=[ logging.FileHandler('/tmp/state_sync.log'), logging.StreamHandler(sys.stdout), ] ) log = logging.getLogger(__name__) upbit = pyupbit.Upbit(os.environ['ACCESS_KEY'], os.environ['SECRET_KEY']) def get_conn(): kwargs = dict(user=os.environ["ORACLE_USER"], password=os.environ["ORACLE_PASSWORD"], dsn=os.environ["ORACLE_DSN"]) if w := os.environ.get("ORACLE_WALLET"): kwargs["config_dir"] = w return oracledb.connect(**kwargs) def sync_once(conn): """Upbit 실제 상태를 조회하여 position_sync 테이블 갱신.""" cur = conn.cursor() now = datetime.now() # 1. 잔고 조회 → 보유 종목 파악 balances = upbit.get_balances() or [] held = {} # ticker → {qty, avg_price} for b in balances: currency = b.get('currency', '') if currency == 'KRW': continue ticker = f'KRW-{currency}' if ticker not in TICKERS: continue bal = float(b.get('balance', 0)) locked = float(b.get('locked', 0)) total = bal + locked avg = float(b.get('avg_buy_price', 0)) if total > 0 and avg > 0: held[ticker] = {'qty': total, 'avg_price': avg, 'invested': int(total * avg)} # 2. 미체결 주문 조회 → 매수/매도 대기 파악 pending_buys = {} # ticker → {uuid, price, qty} pending_sells = {} # ticker → {uuid, price, qty} for ticker in TICKERS: try: orders = upbit.get_order(ticker, state='wait') or [] if not isinstance(orders, list): continue for o in orders: side = o.get('side') uuid = o.get('uuid') price = float(o.get('price', 0)) rem = float(o.get('remaining_volume', 0)) if price <= 0 or rem <= 0: continue if side == 'bid': pending_buys[ticker] = {'uuid': uuid, 'price': price, 'qty': rem} elif side == 'ask': pending_sells[ticker] = {'uuid': uuid, 'price': price, 'qty': rem} except Exception: pass # 3. 상태 결정 및 DB 반영 active_tickers = set(held.keys()) | set(pending_buys.keys()) | set(pending_sells.keys()) for ticker in active_tickers: if ticker in pending_buys and ticker not in held: state = 'PENDING_BUY' pb = pending_buys[ticker] buy_price = pb['price'] sell_price = None qty = pb['qty'] order_uuid = pb['uuid'] invested = int(qty * buy_price) elif ticker in held and ticker in pending_sells: state = 'PENDING_SELL' h = held[ticker] ps = pending_sells[ticker] buy_price = h['avg_price'] sell_price = ps['price'] qty = h['qty'] order_uuid = ps['uuid'] invested = h['invested'] elif ticker in held: state = 'HOLDING' h = held[ticker] buy_price = h['avg_price'] sell_price = None qty = h['qty'] order_uuid = None invested = h['invested'] else: continue cur.execute( """MERGE INTO position_sync ps USING (SELECT :1 AS ticker FROM dual) src ON (ps.ticker = src.ticker) WHEN MATCHED THEN UPDATE SET state = :2, buy_price = :3, sell_price = :4, qty = :5, order_uuid = :6, invested_krw = :7, updated_at = :8 WHEN NOT MATCHED THEN INSERT (ticker, state, buy_price, sell_price, qty, order_uuid, invested_krw, updated_at) VALUES (:9, :10, :11, :12, :13, :14, :15, :16)""", [ticker, state, buy_price, sell_price, qty, order_uuid, invested, now, ticker, state, buy_price, sell_price, qty, order_uuid, invested, now] ) # 4. 이제 없는 종목은 삭제 if active_tickers: placeholders = ','.join(f"'{t}'" for t in active_tickers) cur.execute(f"DELETE FROM position_sync WHERE ticker NOT IN ({placeholders})") else: cur.execute("DELETE FROM position_sync") conn.commit() if active_tickers: summary = ', '.join(f"{t.split('-')[1]}={cur.execute('SELECT state FROM position_sync WHERE ticker=:1',[t]).fetchone()[0]}" for t in sorted(active_tickers)) log.info(f"[동기화] {summary}") def main(): log.info(f"=== state_sync 시작 (주기 {INTERVAL}초) ===") conn = get_conn() fail_count = 0 while True: try: sync_once(conn) fail_count = 0 except Exception as e: fail_count += 1 log.error(f"[동기화 오류] {e}", exc_info=(fail_count <= 3)) try: conn.close() except Exception: pass try: conn = get_conn() except Exception: pass time.sleep(INTERVAL) if __name__ == '__main__': main()