자동매매 로그 모니터링 구축

왜 자동매매 모니터링이 필요한가

자동매매 봇은 24시간 무인으로 동작합니다. 하지만 봇이 돌아간다고 돈을 버는 것은 아닙니다. API 장애, 슬리피지 급증, 전략 드로다운 등 예상치 못한 상황이 언제든 발생할 수 있습니다. 체계적인 로그 수집과 실시간 모니터링 시스템이 없다면, 문제를 인지했을 때는 이미 큰 손실이 발생한 뒤입니다.

이 글에서는 파이썬 자동매매 봇에 구조화된 로그 수집 → 중앙 저장 → 실시간 알림 → 대시보드까지 전체 모니터링 파이프라인을 구축하는 방법을 다룹니다.

1단계: 구조화된 로그 설계

자동매매 로그는 단순 텍스트가 아닌 JSON 구조화 로그로 남겨야 합니다. 이후 검색, 필터링, 대시보드 시각화가 가능해집니다.

import logging
import json
from datetime import datetime, timezone

class StructuredFormatter(logging.Formatter):
    """자동매매용 JSON 구조화 로그 포맷터"""
    
    def format(self, record):
        log_entry = {
            "ts": datetime.now(timezone.utc).isoformat(),
            "level": record.levelname,
            "module": record.module,
            "msg": record.getMessage(),
        }
        # 추가 필드 (주문, 포지션 등)
        if hasattr(record, 'extra_data'):
            log_entry.update(record.extra_data)
        return json.dumps(log_entry, ensure_ascii=False)

def setup_logger(name: str, log_file: str) -> logging.Logger:
    logger = logging.getLogger(name)
    logger.setLevel(logging.DEBUG)
    
    # 파일 핸들러 (JSON)
    fh = logging.FileHandler(log_file)
    fh.setFormatter(StructuredFormatter())
    logger.addHandler(fh)
    
    # 콘솔 핸들러 (사람용)
    ch = logging.StreamHandler()
    ch.setFormatter(logging.Formatter(
        '%(asctime)s %(levelname)s %(message)s'
    ))
    logger.addHandler(ch)
    
    return logger

핵심 로그 이벤트 정의

자동매매 시스템에서 반드시 기록해야 할 이벤트를 체계적으로 분류합니다.

from enum import Enum
from dataclasses import dataclass, asdict

class EventType(Enum):
    ORDER_PLACED = "order_placed"
    ORDER_FILLED = "order_filled"
    ORDER_FAILED = "order_failed"
    POSITION_OPENED = "position_opened"
    POSITION_CLOSED = "position_closed"
    SIGNAL_GENERATED = "signal_generated"
    RISK_ALERT = "risk_alert"
    API_ERROR = "api_error"
    BALANCE_UPDATE = "balance_update"
    HEARTBEAT = "heartbeat"

@dataclass
class TradeLog:
    event: EventType
    symbol: str
    side: str = ""
    price: float = 0.0
    amount: float = 0.0
    pnl: float = 0.0
    pnl_pct: float = 0.0
    balance: float = 0.0
    latency_ms: float = 0.0
    error: str = ""
    
    def to_dict(self) -> dict:
        d = asdict(self)
        d['event'] = self.event.value
        return d

class TradingLogger:
    """자동매매 전용 로거"""
    
    def __init__(self, bot_name: str):
        self.logger = setup_logger(
            bot_name, f"/var/log/trading/{bot_name}.jsonl"
        )
        self.bot_name = bot_name
    
    def log_trade(self, trade: TradeLog):
        record = self.logger.makeRecord(
            self.bot_name, logging.INFO, "", 0,
            f"{trade.event.value}: {trade.symbol}", 
            None, None
        )
        record.extra_data = trade.to_dict()
        self.logger.handle(record)
    
    def log_order(self, symbol, side, price, amount, 
                  latency_ms=0):
        self.log_trade(TradeLog(
            event=EventType.ORDER_FILLED,
            symbol=symbol, side=side,
            price=price, amount=amount,
            latency_ms=latency_ms
        ))
    
    def log_pnl(self, symbol, pnl, pnl_pct, balance):
        self.log_trade(TradeLog(
            event=EventType.POSITION_CLOSED,
            symbol=symbol, pnl=pnl,
            pnl_pct=pnl_pct, balance=balance
        ))
    
    def log_error(self, symbol, error_msg):
        self.log_trade(TradeLog(
            event=EventType.API_ERROR,
            symbol=symbol, error=error_msg
        ))

2단계: 실시간 메트릭 수집

로그 외에 시계열 메트릭을 수집하면 성과 추이와 시스템 상태를 실시간으로 파악할 수 있습니다. Prometheus 호환 메트릭 수집기를 구현합니다.

from collections import defaultdict
import time
import threading

class TradingMetrics:
    """자동매매 실시간 메트릭 수집기"""
    
    def __init__(self):
        self._lock = threading.Lock()
        self.counters = defaultdict(int)
        self.gauges = defaultdict(float)
        self.histograms = defaultdict(list)
        self._start_time = time.time()
    
    def inc_counter(self, name: str, value: int = 1):
        with self._lock:
            self.counters[name] += value
    
    def set_gauge(self, name: str, value: float):
        with self._lock:
            self.gauges[name] = value
    
    def observe(self, name: str, value: float):
        with self._lock:
            self.histograms[name].append(value)
            # 최근 1000개만 유지
            if len(self.histograms[name]) > 1000:
                self.histograms[name] = 
                    self.histograms[name][-1000:]
    
    def get_summary(self) -> dict:
        """현재 메트릭 요약"""
        import numpy as np
        with self._lock:
            summary = {
                'uptime_hours': (
                    time.time() - self._start_time
                ) / 3600,
                'counters': dict(self.counters),
                'gauges': dict(self.gauges),
            }
            for name, values in self.histograms.items():
                if values:
                    arr = np.array(values)
                    summary[f'{name}_p50'] = float(
                        np.percentile(arr, 50)
                    )
                    summary[f'{name}_p95'] = float(
                        np.percentile(arr, 95)
                    )
                    summary[f'{name}_p99'] = float(
                        np.percentile(arr, 99)
                    )
            return summary

# 사용 예시
metrics = TradingMetrics()

# 주문 체결 시
metrics.inc_counter('orders_total')
metrics.inc_counter('orders_buy')
metrics.observe('order_latency_ms', 45.2)

# 잔고 갱신
metrics.set_gauge('balance_usdt', 10523.50)
metrics.set_gauge('unrealized_pnl', -120.30)
metrics.set_gauge('open_positions', 3)

3단계: 알림 시스템 구축

이상 상황을 즉시 감지하고 알림을 보내는 시스템입니다. 자동매매 봇 설계에서 알림은 필수 컴포넌트입니다.

import aiohttp
import asyncio
from dataclasses import dataclass
from typing import Optional

@dataclass
class AlertRule:
    name: str
    condition: str        # 'mdd_exceed', 'api_error_spike' 등
    threshold: float
    cooldown_sec: int = 300  # 동일 알림 최소 간격
    last_fired: float = 0

class AlertManager:
    """자동매매 알림 매니저"""
    
    def __init__(self, telegram_token: str, chat_id: str,
                 discord_webhook: Optional[str] = None):
        self.telegram_token = telegram_token
        self.chat_id = chat_id
        self.discord_webhook = discord_webhook
        self.rules: list[AlertRule] = []
        self._setup_default_rules()
    
    def _setup_default_rules(self):
        self.rules = [
            AlertRule('MDD 초과', 'mdd_exceed', -0.05, 600),
            AlertRule('API 에러 급증', 'api_error_spike', 5, 300),
            AlertRule('잔고 급감', 'balance_drop', -0.03, 600),
            AlertRule('주문 지연', 'latency_spike', 3000, 300),
            AlertRule('봇 무응답', 'heartbeat_miss', 180, 120),
        ]
    
    def check_alerts(self, metrics: TradingMetrics,
                     current_time: float):
        """메트릭 기반 알림 조건 확인"""
        summary = metrics.get_summary()
        
        for rule in self.rules:
            if current_time - rule.last_fired < rule.cooldown_sec:
                continue
            
            triggered = False
            value = 0.0
            
            if rule.condition == 'mdd_exceed':
                balance = summary['gauges'].get(
                    'balance_usdt', 0
                )
                peak = summary['gauges'].get(
                    'peak_balance', balance
                )
                if peak > 0:
                    mdd = (balance - peak) / peak
                    if mdd < rule.threshold:
                        triggered = True
                        value = mdd
            
            elif rule.condition == 'latency_spike':
                p95 = summary.get('order_latency_ms_p95', 0)
                if p95 > rule.threshold:
                    triggered = True
                    value = p95
            
            elif rule.condition == 'api_error_spike':
                errors = summary['counters'].get(
                    'api_errors_1min', 0
                )
                if errors > rule.threshold:
                    triggered = True
                    value = errors
            
            if triggered:
                rule.last_fired = current_time
                asyncio.create_task(
                    self._send_alert(rule, value)
                )
    
    async def _send_alert(self, rule: AlertRule, value: float):
        """텔레그램 + Discord 동시 발송"""
        msg = f"🚨 {rule.name}n값: {value:.4f}n임계치: {rule.threshold}"
        
        async with aiohttp.ClientSession() as session:
            # 텔레그램
            url = (f"https://api.telegram.org/"
                   f"bot{self.telegram_token}/sendMessage")
            await session.post(url, json={
                'chat_id': self.chat_id,
                'text': msg,
                'parse_mode': 'HTML'
            })
            
            # Discord (선택)
            if self.discord_webhook:
                await session.post(
                    self.discord_webhook,
                    json={'content': msg}
                )

4단계: 모니터링 대시보드

수집한 메트릭을 웹 대시보드로 시각화합니다. Flask 기반 경량 대시보드를 구현합니다.

from flask import Flask, jsonify, render_template_string
import threading

DASHBOARD_HTML = """
<!DOCTYPE html>
<html>
<head><title>Trading Monitor</title>
<meta http-equiv="refresh" content="10">
<style>
  body { font-family: monospace; background: #1a1a2e; 
         color: #eee; padding: 20px; }
  .card { background: #16213e; border-radius: 8px; 
          padding: 15px; margin: 10px; 
          display: inline-block; min-width: 200px; }
  .positive { color: #00d2ff; }
  .negative { color: #ff6b6b; }
  h2 { color: #e94560; }
</style>
</head>
<body>
  <h2>📊 Trading Bot Monitor</h2>
  <div class="card">
    <h3>잔고</h3>
    <p class="{{ 'positive' if pnl >= 0 else 'negative' }}">
      ${{ "%.2f"|format(balance) }}
    </p>
  </div>
  <div class="card">
    <h3>오늘 PnL</h3>
    <p class="{{ 'positive' if pnl >= 0 else 'negative' }}">
      ${{ "%.2f"|format(pnl) }} ({{ "%.2f"|format(pnl_pct) }}%)
    </p>
  </div>
  <div class="card">
    <h3>체결 건수</h3>
    <p>{{ orders_total }}</p>
  </div>
  <div class="card">
    <h3>지연시간 P95</h3>
    <p>{{ "%.1f"|format(latency_p95) }}ms</p>
  </div>
</body>
</html>
"""

def create_dashboard(metrics: TradingMetrics, 
                     port: int = 8080):
    app = Flask(__name__)
    
    @app.route('/api/metrics')
    def api_metrics():
        return jsonify(metrics.get_summary())
    
    @app.route('/')
    def dashboard():
        s = metrics.get_summary()
        return render_template_string(DASHBOARD_HTML,
            balance=s['gauges'].get('balance_usdt', 0),
            pnl=s['gauges'].get('daily_pnl', 0),
            pnl_pct=s['gauges'].get('daily_pnl_pct', 0),
            orders_total=s['counters'].get('orders_total', 0),
            latency_p95=s.get('order_latency_ms_p95', 0),
        )
    
    thread = threading.Thread(
        target=lambda: app.run(
            host='0.0.0.0', port=port, debug=False
        ),
        daemon=True
    )
    thread.start()
    return app

5단계: 헬스체크와 자동 복구

봇이 멈추거나 비정상 상태에 빠졌을 때 자동으로 감지하고 복구하는 watchdog을 구현합니다.

import subprocess
import os
import signal

class BotWatchdog:
    """봇 헬스체크 및 자동 재시작"""
    
    def __init__(self, bot_name: str, 
                 max_restart: int = 3,
                 heartbeat_timeout: int = 180):
        self.bot_name = bot_name
        self.max_restart = max_restart
        self.heartbeat_timeout = heartbeat_timeout
        self.restart_count = 0
        self.last_heartbeat = time.time()
    
    def receive_heartbeat(self):
        """봇에서 주기적으로 호출"""
        self.last_heartbeat = time.time()
        self.restart_count = 0  # 정상 동작 시 카운트 리셋
    
    def check_health(self) -> bool:
        elapsed = time.time() - self.last_heartbeat
        if elapsed > self.heartbeat_timeout:
            return False
        return True
    
    def restart_bot(self, bot_process):
        """봇 프로세스 재시작"""
        if self.restart_count >= self.max_restart:
            # 최대 재시작 초과 → 긴급 알림만
            raise RuntimeError(
                f"{self.bot_name} 재시작 {self.max_restart}회 초과"
            )
        
        self.restart_count += 1
        
        # 기존 프로세스 종료
        if bot_process and bot_process.poll() is None:
            os.kill(bot_process.pid, signal.SIGTERM)
            bot_process.wait(timeout=10)
        
        # 재시작
        new_process = subprocess.Popen(
            ['python3', f'/opt/trading/{self.bot_name}/main.py'],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE
        )
        
        self.last_heartbeat = time.time()
        return new_process

모니터링 체크리스트

운영 환경에서 빠뜨리기 쉬운 모니터링 항목을 정리합니다. 슬리피지 관리와 함께 적용하면 리스크를 크게 줄일 수 있습니다.

항목 메트릭 알림 임계치 확인 주기
잔고 balance_usdt 전일 대비 -3% 1분
MDD max_drawdown -5% 초과 1분
주문 지연 order_latency_p95 3초 초과 10초
API 에러율 api_errors_1min 분당 5회 초과 1분
봇 헬스 heartbeat_age 3분 무응답 30초
슬리피지 slippage_bps_p95 10bps 초과 체결마다

정리

자동매매 모니터링 시스템은 구조화 로그 → 메트릭 수집 → 알림 → 대시보드 → 자동 복구 5단계로 구축합니다. 전략 개발만큼이나 모니터링 인프라가 중요합니다. 봇이 돈을 벌어주는 동안 문제를 즉시 감지하고 대응할 수 있는 시스템이 갖춰져야 안정적인 자동매매 운영이 가능합니다. 코드를 먼저 짜고 로그는 나중에 하겠다는 생각은 버리세요. 로그가 곧 전략의 눈입니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux