왜 주문 큐가 필요한가
자동매매 봇에서 전략 시그널이 동시에 여러 개 발생하면 어떻게 될까요? 거래소 API에 주문을 동시에 쏟아내면 레이트 리밋 초과, 주문 실패, 중복 체결 같은 문제가 발생합니다. 주문 큐(Order Queue)는 이런 혼란을 방지하고, 주문을 체계적으로 관리하는 핵심 인프라입니다.
실전 자동매매에서는 시그널 생성과 주문 실행을 분리해야 합니다. 전략 로직은 “무엇을 사고팔지” 결정하고, 주문 큐는 “언제, 어떤 순서로, 어떻게 실행할지”를 담당합니다. 이 관심사 분리(Separation of Concerns)가 안정적인 봇 운영의 기본입니다.
주문 큐 아키텍처
주문 큐 시스템은 4개 계층으로 구성됩니다.
| 계층 | 역할 | 핵심 기능 |
|---|---|---|
| Producer | 전략 시그널 → 주문 요청 생성 | 중복 방지, 유효성 검증 |
| Queue | 우선순위 기반 대기열 | 정렬, TTL, 크기 제한 |
| Consumer | 큐에서 꺼내 API 호출 | 레이트 리밋, 재시도 |
| Monitor | 체결 확인 및 후처리 | 상태 추적, 알림 |
주문 모델 설계
먼저 주문 요청을 표현하는 데이터 모델을 정의합니다.
from dataclasses import dataclass, field
from enum import IntEnum, Enum
from typing import Optional
import time
import uuid
class OrderPriority(IntEnum):
"""주문 우선순위 (낮을수록 높은 우선순위)"""
EMERGENCY = 0 # 긴급 청산, 스탑로스
HIGH = 1 # 손절/익절 주문
NORMAL = 2 # 일반 진입 주문
LOW = 3 # 리밸런싱, DCA
class OrderStatus(Enum):
PENDING = "pending"
QUEUED = "queued"
SUBMITTING = "submitting"
SUBMITTED = "submitted"
FILLED = "filled"
PARTIALLY_FILLED = "partially_filled"
FAILED = "failed"
CANCELLED = "cancelled"
EXPIRED = "expired"
@dataclass(order=True)
class OrderRequest:
priority: OrderPriority = field(compare=True)
created_at: float = field(compare=True, default_factory=time.time)
# 주문 정보 (비교 대상 아님)
order_id: str = field(compare=False,
default_factory=lambda: str(uuid.uuid4())[:8])
symbol: str = field(compare=False, default="")
side: str = field(compare=False, default="") # buy / sell
order_type: str = field(compare=False, default="market")
amount: float = field(compare=False, default=0.0)
price: Optional[float] = field(compare=False, default=None)
# 실행 제어
ttl_sec: float = field(compare=False, default=30.0)
max_retries: int = field(compare=False, default=3)
retry_count: int = field(compare=False, default=0)
status: OrderStatus = field(compare=False,
default=OrderStatus.PENDING)
# 원본 전략 추적
strategy_name: str = field(compare=False, default="")
signal_id: str = field(compare=False, default="")
@property
def is_expired(self) -> bool:
return time.time() - self.created_at > self.ttl_sec
@property
def can_retry(self) -> bool:
return self.retry_count < self.max_retries
우선순위 큐 구현
파이썬 heapq를 활용한 우선순위 큐입니다. 긴급 주문(손절)이 일반 주문보다 항상 먼저 처리됩니다.
import heapq
import threading
from typing import List, Optional
import logging
logger = logging.getLogger(__name__)
class OrderQueue:
"""우선순위 기반 주문 큐"""
def __init__(self, max_size: int = 100):
self._heap: List[OrderRequest] = []
self._lock = threading.Lock()
self._event = threading.Event()
self.max_size = max_size
self._dedup_set: set = set() # 중복 방지
def push(self, order: OrderRequest) -> bool:
"""주문 추가. 중복/만료/가득 차면 False"""
with self._lock:
# 중복 체크 (같은 전략+심볼+방향)
dedup_key = (order.strategy_name,
order.symbol, order.side)
if dedup_key in self._dedup_set:
logger.warning(
f"중복 주문 거부: {dedup_key}"
)
return False
if order.is_expired:
order.status = OrderStatus.EXPIRED
return False
if len(self._heap) >= self.max_size:
logger.warning("큐 가득 참, 주문 거부")
return False
order.status = OrderStatus.QUEUED
heapq.heappush(self._heap, order)
self._dedup_set.add(dedup_key)
self._event.set()
logger.info(
f"큐 추가: {order.order_id} "
f"{order.symbol} {order.side} "
f"우선순위={order.priority.name}"
)
return True
def pop(self, timeout: float = 1.0) -> Optional[OrderRequest]:
"""가장 높은 우선순위 주문 꺼내기"""
self._event.wait(timeout=timeout)
with self._lock:
while self._heap:
order = heapq.heappop(self._heap)
# 만료 체크
if order.is_expired:
order.status = OrderStatus.EXPIRED
self._remove_dedup(order)
logger.info(
f"만료 제거: {order.order_id}"
)
continue
order.status = OrderStatus.SUBMITTING
return order
self._event.clear()
return None
def _remove_dedup(self, order: OrderRequest):
key = (order.strategy_name,
order.symbol, order.side)
self._dedup_set.discard(key)
def cancel_by_strategy(self, strategy_name: str) -> int:
"""특정 전략의 모든 대기 주문 취소"""
with self._lock:
cancelled = 0
new_heap = []
for order in self._heap:
if order.strategy_name == strategy_name:
order.status = OrderStatus.CANCELLED
self._remove_dedup(order)
cancelled += 1
else:
new_heap.append(order)
self._heap = new_heap
heapq.heapify(self._heap)
return cancelled
@property
def size(self) -> int:
with self._lock:
return len(self._heap)
레이트 리밋 + 재시도 Consumer
거래소 API의 레이트 리밋을 준수하면서 주문을 실행하는 Consumer입니다. 자동매매 봇 설계에서 가장 중요한 컴포넌트 중 하나입니다.
import asyncio
from collections import deque
class RateLimiter:
"""슬라이딩 윈도우 레이트 리미터"""
def __init__(self, max_calls: int, window_sec: float):
self.max_calls = max_calls
self.window_sec = window_sec
self.calls: deque = deque()
async def acquire(self):
"""호출 가능할 때까지 대기"""
while True:
now = time.time()
# 윈도우 밖의 기록 제거
while self.calls and self.calls[0] < now - self.window_sec:
self.calls.popleft()
if len(self.calls) < self.max_calls:
self.calls.append(now)
return
# 다음 슬롯까지 대기
wait = self.calls[0] + self.window_sec - now
await asyncio.sleep(wait + 0.01)
class OrderConsumer:
"""주문 큐 Consumer - 레이트 리밋 준수 실행"""
def __init__(self, queue: OrderQueue, exchange,
rate_limit: int = 10, window: float = 1.0):
self.queue = queue
self.exchange = exchange
self.limiter = RateLimiter(rate_limit, window)
self.running = False
self._results = {}
async def start(self):
"""Consumer 루프 시작"""
self.running = True
logger.info("OrderConsumer 시작")
while self.running:
order = self.queue.pop(timeout=0.5)
if order is None:
await asyncio.sleep(0.1)
continue
await self._execute(order)
async def _execute(self, order: OrderRequest):
"""단일 주문 실행 (재시도 포함)"""
await self.limiter.acquire()
try:
result = await self._submit_order(order)
order.status = OrderStatus.FILLED
self._results[order.order_id] = result
logger.info(
f"체결 완료: {order.order_id} "
f"{order.symbol} {order.side} "
f"@ {result.get('average', 'N/A')}"
)
except Exception as e:
error_msg = str(e)
order.retry_count += 1
if self._is_retryable(error_msg) and order.can_retry:
# 재시도: 지수 백오프 후 큐에 재삽입
backoff = min(2 ** order.retry_count, 10)
logger.warning(
f"재시도 {order.retry_count}/"
f"{order.max_retries}: {order.order_id} "
f"({backoff}s 후)"
)
await asyncio.sleep(backoff)
if not order.is_expired:
self.queue.push(order)
else:
order.status = OrderStatus.EXPIRED
else:
order.status = OrderStatus.FAILED
logger.error(
f"주문 실패: {order.order_id} - {error_msg}"
)
async def _submit_order(self, order: OrderRequest) -> dict:
"""거래소 API 주문 제출"""
params = {}
if order.order_type == "market":
return await asyncio.to_thread(
self.exchange.create_market_order,
order.symbol, order.side, order.amount,
params
)
else:
return await asyncio.to_thread(
self.exchange.create_limit_order,
order.symbol, order.side, order.amount,
order.price, params
)
@staticmethod
def _is_retryable(error: str) -> bool:
"""재시도 가능한 에러인지 판단"""
retryable = [
'rate limit', 'timeout', 'ECONNRESET',
'network', '502', '503', '429',
'temporary', 'overloaded'
]
error_lower = error.lower()
return any(r in error_lower for r in retryable)
def stop(self):
self.running = False
전략과 큐 연동
전략 시그널을 주문 큐에 넣는 Producer 패턴입니다.
class StrategyProducer:
"""전략 시그널 → 주문 큐 Producer"""
def __init__(self, queue: OrderQueue,
strategy_name: str):
self.queue = queue
self.strategy_name = strategy_name
def on_signal(self, symbol: str, side: str,
amount: float, signal_type: str = "entry"):
"""전략 시그널 수신 시 주문 생성"""
# 시그널 유형에 따라 우선순위 결정
priority_map = {
"stop_loss": OrderPriority.EMERGENCY,
"take_profit": OrderPriority.HIGH,
"entry": OrderPriority.NORMAL,
"rebalance": OrderPriority.LOW,
}
priority = priority_map.get(
signal_type, OrderPriority.NORMAL
)
# TTL도 우선순위에 따라 다르게
ttl_map = {
OrderPriority.EMERGENCY: 10, # 긴급: 10초
OrderPriority.HIGH: 20,
OrderPriority.NORMAL: 30,
OrderPriority.LOW: 60,
}
order = OrderRequest(
priority=priority,
symbol=symbol,
side=side,
amount=amount,
order_type="market",
ttl_sec=ttl_map[priority],
strategy_name=self.strategy_name,
)
success = self.queue.push(order)
if success:
logger.info(
f"[{self.strategy_name}] 주문 큐 추가: "
f"{symbol} {side} {amount}"
)
return success
# 사용 예시
queue = OrderQueue(max_size=200)
momentum = StrategyProducer(queue, "momentum_v2")
mean_rev = StrategyProducer(queue, "mean_reversion")
# 모멘텀 전략 시그널
momentum.on_signal("BTC/USDT", "buy", 0.01, "entry")
# 평균회귀 전략 손절 (우선순위 높음)
mean_rev.on_signal("ETH/USDT", "sell", 0.5, "stop_loss")
긴급 청산 바이패스
극단적 상황에서는 큐를 건너뛰고 즉시 실행해야 합니다. 슬리피지 관리와 함께 긴급 청산 로직을 구현합니다.
class EmergencyExecutor:
"""큐 바이패스 긴급 실행기"""
def __init__(self, exchange, queue: OrderQueue):
self.exchange = exchange
self.queue = queue
async def emergency_close_all(self, reason: str):
"""모든 포지션 즉시 청산"""
logger.critical(f"긴급 전체 청산: {reason}")
# 1. 큐의 모든 대기 주문 취소
with self.queue._lock:
for order in self.queue._heap:
order.status = OrderStatus.CANCELLED
self.queue._heap.clear()
self.queue._dedup_set.clear()
# 2. 미체결 주문 전체 취소
open_orders = await asyncio.to_thread(
self.exchange.fetch_open_orders
)
for oo in open_orders:
await asyncio.to_thread(
self.exchange.cancel_order,
oo['id'], oo['symbol']
)
# 3. 보유 포지션 시장가 청산
positions = await asyncio.to_thread(
self.exchange.fetch_positions
)
for pos in positions:
amt = abs(float(pos['contracts']))
if amt > 0:
side = 'sell' if pos['side'] == 'long' else 'buy'
await asyncio.to_thread(
self.exchange.create_market_order,
pos['symbol'], side, amt,
{'reduceOnly': True}
)
logger.critical(
f"긴급 청산: {pos['symbol']} "
f"{side} {amt}"
)
주문 큐 성능 비교
| 방식 | 동시 주문 처리 | 레이트 리밋 준수 | 장애 복원 | 복잡도 |
|---|---|---|---|---|
| 직접 API 호출 | ❌ 충돌 | ❌ 초과 빈번 | ❌ 유실 | 낮음 |
| 단순 FIFO 큐 | ✅ 순차 처리 | ✅ 제어 가능 | △ 재시도 없음 | 중간 |
| 우선순위 큐 | ✅ 우선순위 기반 | ✅ 슬라이딩 윈도우 | ✅ 지수 백오프 | 높음 |
운영 팁
- TTL은 짧게: 시장가 주문의 TTL은 10~30초면 충분합니다. 오래된 시그널로 주문하면 불리한 가격에 체결됩니다.
- 중복 방지 필수: 같은 전략이 같은 심볼에 같은 방향으로 중복 주문하는 것을 dedup으로 막아야 합니다.
- 긴급 바이패스: 손절은 큐를 거치지 않는 바이패스 경로를 반드시 마련하세요.
- 큐 크기 모니터링: 큐가 계속 쌓이면 Consumer가 느리거나 API 장애입니다. 큐 크기가 50을 넘으면 알림을 보내세요.
- 재시도 제한: 무한 재시도는 금물입니다. 3회 실패하면 포기하고 알림을 보내는 것이 안전합니다.
정리
주문 큐는 자동매매 봇의 안정성과 확장성을 결정하는 핵심 인프라입니다. 전략이 아무리 좋아도 주문이 제대로 실행되지 않으면 의미가 없습니다. 우선순위 큐 + 레이트 리미터 + 지수 백오프 재시도 + 긴급 바이패스 조합으로 프로덕션 수준의 주문 실행 시스템을 구축하세요. 전략은 두뇌, 주문 큐는 손발입니다.