feat: add hourly position status report via Telegram

Run status reporter thread every 60 minutes, sends current price,
PnL, drop from peak, and holding time for each position.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
joungmin
2026-02-28 10:39:30 +09:00
parent a287e48522
commit 2585d47140
2 changed files with 55 additions and 1 deletions

View File

@@ -49,3 +49,34 @@ def notify_sell(ticker: str, price: float, pnl_pct: float, reason: str) -> None:
def notify_error(message: str) -> None: def notify_error(message: str) -> None:
_send(f"⚠️ <b>[오류]</b>\n{message}") _send(f"⚠️ <b>[오류]</b>\n{message}")
def notify_status(positions: dict) -> None:
"""1시간마다 포지션 현황 요약 전송."""
from datetime import datetime
import pyupbit
now = datetime.now().strftime("%H:%M")
if not positions:
_send(f"📊 <b>[{now} 현황]</b>\n보유 포지션 없음 — 매수 신호 대기 중")
return
lines = [f"📊 <b>[{now} 현황]</b>"]
for ticker, pos in positions.items():
current = pyupbit.get_current_price(ticker)
if not current:
continue
pnl = (current - pos["buy_price"]) / pos["buy_price"] * 100
peak = pos["peak_price"]
drop = (peak - current) / peak * 100
elapsed = (datetime.now() - pos["entry_time"]).total_seconds() / 3600
emoji = "📈" if pnl >= 0 else "📉"
lines.append(
f"\n{emoji} <b>{ticker}</b>\n"
f" 현재가: {current:,.0f}\n"
f" 수익률: {pnl:+.1f}%\n"
f" 최고가 대비: -{drop:.1f}%\n"
f" 보유: {elapsed:.1f}h"
)
_send("\n".join(lines))

25
main.py
View File

@@ -2,6 +2,7 @@
import logging import logging
import threading import threading
import time
from dotenv import load_dotenv from dotenv import load_dotenv
@@ -17,9 +18,25 @@ logging.basicConfig(
) )
from core.monitor import run_monitor from core.monitor import run_monitor
from core.trader import restore_positions from core.notify import notify_status
from core.trader import get_positions, restore_positions
from daemon.runner import run_scanner from daemon.runner import run_scanner
STATUS_INTERVAL = 3600 # 1시간마다 요약 전송
def run_status_reporter(interval: int = STATUS_INTERVAL) -> None:
"""주기적으로 포지션 현황을 Telegram으로 전송."""
logger = logging.getLogger("status")
logger.info(f"상태 리포터 시작 (주기={interval//60}분)")
time.sleep(interval) # 첫 전송은 1시간 후
while True:
try:
notify_status(dict(get_positions()))
except Exception as e:
logger.error(f"상태 리포트 오류: {e}")
time.sleep(interval)
def main() -> None: def main() -> None:
# 재시작 시 기존 잔고 복원 (이중 매수 방지) # 재시작 시 기존 잔고 복원 (이중 매수 방지)
@@ -31,6 +48,12 @@ def main() -> None:
) )
monitor_thread.start() monitor_thread.start()
# 1시간 주기 상태 리포트 스레드
status_thread = threading.Thread(
target=run_status_reporter, daemon=True, name="status"
)
status_thread.start()
# 매수 스캔 루프 (60초 주기, 메인 스레드) # 매수 스캔 루프 (60초 주기, 메인 스레드)
run_scanner() run_scanner()