왜 자동매매 모니터링이 필요한가
자동매매 봇은 24시간 무인으로 동작합니다. 하지만 봇이 돌아간다고 돈을 버는 것은 아닙니다. API 장애, 슬리피지 급증, 전략 드로다운 등 예상치 못한 상황이 언제든 발생할 수 있습니다. 체계적인 로그 수집과 실시간 모니터링 시스템이 없다면, 문제를 인지했을 때는 이미 큰 손실이 발생한 뒤입니다.
이 글에서는 파이썬 자동매매 봇에 구조화된 로그 수집 → 중앙 저장 → 실시간 알림 → 대시보드까지 전체 모니터링 파이프라인을 구축하는 방법을 다룹니다.
1단계: 구조화된 로그 설계
자동매매 로그는 단순 텍스트가 아닌 JSON 구조화 로그로 남겨야 합니다. 이후 검색, 필터링, 대시보드 시각화가 가능해집니다.
import logging
import json
from datetime import datetime, timezone
class StructuredFormatter(logging.Formatter):
"""자동매매용 JSON 구조화 로그 포맷터"""
def format(self, record):
log_entry = {
"ts": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"module": record.module,
"msg": record.getMessage(),
}
# 추가 필드 (주문, 포지션 등)
if hasattr(record, 'extra_data'):
log_entry.update(record.extra_data)
return json.dumps(log_entry, ensure_ascii=False)
def setup_logger(name: str, log_file: str) -> logging.Logger:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
# 파일 핸들러 (JSON)
fh = logging.FileHandler(log_file)
fh.setFormatter(StructuredFormatter())
logger.addHandler(fh)
# 콘솔 핸들러 (사람용)
ch = logging.StreamHandler()
ch.setFormatter(logging.Formatter(
'%(asctime)s %(levelname)s %(message)s'
))
logger.addHandler(ch)
return logger
핵심 로그 이벤트 정의
자동매매 시스템에서 반드시 기록해야 할 이벤트를 체계적으로 분류합니다.
from enum import Enum
from dataclasses import dataclass, asdict
class EventType(Enum):
ORDER_PLACED = "order_placed"
ORDER_FILLED = "order_filled"
ORDER_FAILED = "order_failed"
POSITION_OPENED = "position_opened"
POSITION_CLOSED = "position_closed"
SIGNAL_GENERATED = "signal_generated"
RISK_ALERT = "risk_alert"
API_ERROR = "api_error"
BALANCE_UPDATE = "balance_update"
HEARTBEAT = "heartbeat"
@dataclass
class TradeLog:
event: EventType
symbol: str
side: str = ""
price: float = 0.0
amount: float = 0.0
pnl: float = 0.0
pnl_pct: float = 0.0
balance: float = 0.0
latency_ms: float = 0.0
error: str = ""
def to_dict(self) -> dict:
d = asdict(self)
d['event'] = self.event.value
return d
class TradingLogger:
"""자동매매 전용 로거"""
def __init__(self, bot_name: str):
self.logger = setup_logger(
bot_name, f"/var/log/trading/{bot_name}.jsonl"
)
self.bot_name = bot_name
def log_trade(self, trade: TradeLog):
record = self.logger.makeRecord(
self.bot_name, logging.INFO, "", 0,
f"{trade.event.value}: {trade.symbol}",
None, None
)
record.extra_data = trade.to_dict()
self.logger.handle(record)
def log_order(self, symbol, side, price, amount,
latency_ms=0):
self.log_trade(TradeLog(
event=EventType.ORDER_FILLED,
symbol=symbol, side=side,
price=price, amount=amount,
latency_ms=latency_ms
))
def log_pnl(self, symbol, pnl, pnl_pct, balance):
self.log_trade(TradeLog(
event=EventType.POSITION_CLOSED,
symbol=symbol, pnl=pnl,
pnl_pct=pnl_pct, balance=balance
))
def log_error(self, symbol, error_msg):
self.log_trade(TradeLog(
event=EventType.API_ERROR,
symbol=symbol, error=error_msg
))
2단계: 실시간 메트릭 수집
로그 외에 시계열 메트릭을 수집하면 성과 추이와 시스템 상태를 실시간으로 파악할 수 있습니다. Prometheus 호환 메트릭 수집기를 구현합니다.
from collections import defaultdict
import time
import threading
class TradingMetrics:
"""자동매매 실시간 메트릭 수집기"""
def __init__(self):
self._lock = threading.Lock()
self.counters = defaultdict(int)
self.gauges = defaultdict(float)
self.histograms = defaultdict(list)
self._start_time = time.time()
def inc_counter(self, name: str, value: int = 1):
with self._lock:
self.counters[name] += value
def set_gauge(self, name: str, value: float):
with self._lock:
self.gauges[name] = value
def observe(self, name: str, value: float):
with self._lock:
self.histograms[name].append(value)
# 최근 1000개만 유지
if len(self.histograms[name]) > 1000:
self.histograms[name] =
self.histograms[name][-1000:]
def get_summary(self) -> dict:
"""현재 메트릭 요약"""
import numpy as np
with self._lock:
summary = {
'uptime_hours': (
time.time() - self._start_time
) / 3600,
'counters': dict(self.counters),
'gauges': dict(self.gauges),
}
for name, values in self.histograms.items():
if values:
arr = np.array(values)
summary[f'{name}_p50'] = float(
np.percentile(arr, 50)
)
summary[f'{name}_p95'] = float(
np.percentile(arr, 95)
)
summary[f'{name}_p99'] = float(
np.percentile(arr, 99)
)
return summary
# 사용 예시
metrics = TradingMetrics()
# 주문 체결 시
metrics.inc_counter('orders_total')
metrics.inc_counter('orders_buy')
metrics.observe('order_latency_ms', 45.2)
# 잔고 갱신
metrics.set_gauge('balance_usdt', 10523.50)
metrics.set_gauge('unrealized_pnl', -120.30)
metrics.set_gauge('open_positions', 3)
3단계: 알림 시스템 구축
이상 상황을 즉시 감지하고 알림을 보내는 시스템입니다. 자동매매 봇 설계에서 알림은 필수 컴포넌트입니다.
import aiohttp
import asyncio
from dataclasses import dataclass
from typing import Optional
@dataclass
class AlertRule:
name: str
condition: str # 'mdd_exceed', 'api_error_spike' 등
threshold: float
cooldown_sec: int = 300 # 동일 알림 최소 간격
last_fired: float = 0
class AlertManager:
"""자동매매 알림 매니저"""
def __init__(self, telegram_token: str, chat_id: str,
discord_webhook: Optional[str] = None):
self.telegram_token = telegram_token
self.chat_id = chat_id
self.discord_webhook = discord_webhook
self.rules: list[AlertRule] = []
self._setup_default_rules()
def _setup_default_rules(self):
self.rules = [
AlertRule('MDD 초과', 'mdd_exceed', -0.05, 600),
AlertRule('API 에러 급증', 'api_error_spike', 5, 300),
AlertRule('잔고 급감', 'balance_drop', -0.03, 600),
AlertRule('주문 지연', 'latency_spike', 3000, 300),
AlertRule('봇 무응답', 'heartbeat_miss', 180, 120),
]
def check_alerts(self, metrics: TradingMetrics,
current_time: float):
"""메트릭 기반 알림 조건 확인"""
summary = metrics.get_summary()
for rule in self.rules:
if current_time - rule.last_fired < rule.cooldown_sec:
continue
triggered = False
value = 0.0
if rule.condition == 'mdd_exceed':
balance = summary['gauges'].get(
'balance_usdt', 0
)
peak = summary['gauges'].get(
'peak_balance', balance
)
if peak > 0:
mdd = (balance - peak) / peak
if mdd < rule.threshold:
triggered = True
value = mdd
elif rule.condition == 'latency_spike':
p95 = summary.get('order_latency_ms_p95', 0)
if p95 > rule.threshold:
triggered = True
value = p95
elif rule.condition == 'api_error_spike':
errors = summary['counters'].get(
'api_errors_1min', 0
)
if errors > rule.threshold:
triggered = True
value = errors
if triggered:
rule.last_fired = current_time
asyncio.create_task(
self._send_alert(rule, value)
)
async def _send_alert(self, rule: AlertRule, value: float):
"""텔레그램 + Discord 동시 발송"""
msg = f"🚨 {rule.name}n값: {value:.4f}n임계치: {rule.threshold}"
async with aiohttp.ClientSession() as session:
# 텔레그램
url = (f"https://api.telegram.org/"
f"bot{self.telegram_token}/sendMessage")
await session.post(url, json={
'chat_id': self.chat_id,
'text': msg,
'parse_mode': 'HTML'
})
# Discord (선택)
if self.discord_webhook:
await session.post(
self.discord_webhook,
json={'content': msg}
)
4단계: 모니터링 대시보드
수집한 메트릭을 웹 대시보드로 시각화합니다. Flask 기반 경량 대시보드를 구현합니다.
from flask import Flask, jsonify, render_template_string
import threading
DASHBOARD_HTML = """
<!DOCTYPE html>
<html>
<head><title>Trading Monitor</title>
<meta http-equiv="refresh" content="10">
<style>
body { font-family: monospace; background: #1a1a2e;
color: #eee; padding: 20px; }
.card { background: #16213e; border-radius: 8px;
padding: 15px; margin: 10px;
display: inline-block; min-width: 200px; }
.positive { color: #00d2ff; }
.negative { color: #ff6b6b; }
h2 { color: #e94560; }
</style>
</head>
<body>
<h2>📊 Trading Bot Monitor</h2>
<div class="card">
<h3>잔고</h3>
<p class="{{ 'positive' if pnl >= 0 else 'negative' }}">
${{ "%.2f"|format(balance) }}
</p>
</div>
<div class="card">
<h3>오늘 PnL</h3>
<p class="{{ 'positive' if pnl >= 0 else 'negative' }}">
${{ "%.2f"|format(pnl) }} ({{ "%.2f"|format(pnl_pct) }}%)
</p>
</div>
<div class="card">
<h3>체결 건수</h3>
<p>{{ orders_total }}</p>
</div>
<div class="card">
<h3>지연시간 P95</h3>
<p>{{ "%.1f"|format(latency_p95) }}ms</p>
</div>
</body>
</html>
"""
def create_dashboard(metrics: TradingMetrics,
port: int = 8080):
app = Flask(__name__)
@app.route('/api/metrics')
def api_metrics():
return jsonify(metrics.get_summary())
@app.route('/')
def dashboard():
s = metrics.get_summary()
return render_template_string(DASHBOARD_HTML,
balance=s['gauges'].get('balance_usdt', 0),
pnl=s['gauges'].get('daily_pnl', 0),
pnl_pct=s['gauges'].get('daily_pnl_pct', 0),
orders_total=s['counters'].get('orders_total', 0),
latency_p95=s.get('order_latency_ms_p95', 0),
)
thread = threading.Thread(
target=lambda: app.run(
host='0.0.0.0', port=port, debug=False
),
daemon=True
)
thread.start()
return app
5단계: 헬스체크와 자동 복구
봇이 멈추거나 비정상 상태에 빠졌을 때 자동으로 감지하고 복구하는 watchdog을 구현합니다.
import subprocess
import os
import signal
class BotWatchdog:
"""봇 헬스체크 및 자동 재시작"""
def __init__(self, bot_name: str,
max_restart: int = 3,
heartbeat_timeout: int = 180):
self.bot_name = bot_name
self.max_restart = max_restart
self.heartbeat_timeout = heartbeat_timeout
self.restart_count = 0
self.last_heartbeat = time.time()
def receive_heartbeat(self):
"""봇에서 주기적으로 호출"""
self.last_heartbeat = time.time()
self.restart_count = 0 # 정상 동작 시 카운트 리셋
def check_health(self) -> bool:
elapsed = time.time() - self.last_heartbeat
if elapsed > self.heartbeat_timeout:
return False
return True
def restart_bot(self, bot_process):
"""봇 프로세스 재시작"""
if self.restart_count >= self.max_restart:
# 최대 재시작 초과 → 긴급 알림만
raise RuntimeError(
f"{self.bot_name} 재시작 {self.max_restart}회 초과"
)
self.restart_count += 1
# 기존 프로세스 종료
if bot_process and bot_process.poll() is None:
os.kill(bot_process.pid, signal.SIGTERM)
bot_process.wait(timeout=10)
# 재시작
new_process = subprocess.Popen(
['python3', f'/opt/trading/{self.bot_name}/main.py'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
self.last_heartbeat = time.time()
return new_process
모니터링 체크리스트
운영 환경에서 빠뜨리기 쉬운 모니터링 항목을 정리합니다. 슬리피지 관리와 함께 적용하면 리스크를 크게 줄일 수 있습니다.
| 항목 | 메트릭 | 알림 임계치 | 확인 주기 |
|---|---|---|---|
| 잔고 | balance_usdt | 전일 대비 -3% | 1분 |
| MDD | max_drawdown | -5% 초과 | 1분 |
| 주문 지연 | order_latency_p95 | 3초 초과 | 10초 |
| API 에러율 | api_errors_1min | 분당 5회 초과 | 1분 |
| 봇 헬스 | heartbeat_age | 3분 무응답 | 30초 |
| 슬리피지 | slippage_bps_p95 | 10bps 초과 | 체결마다 |
정리
자동매매 모니터링 시스템은 구조화 로그 → 메트릭 수집 → 알림 → 대시보드 → 자동 복구 5단계로 구축합니다. 전략 개발만큼이나 모니터링 인프라가 중요합니다. 봇이 돈을 벌어주는 동안 문제를 즉시 감지하고 대응할 수 있는 시스템이 갖춰져야 안정적인 자동매매 운영이 가능합니다. 코드를 먼저 짜고 로그는 나중에 하겠다는 생각은 버리세요. 로그가 곧 전략의 눈입니다.