아이스버그 주문 실행 전략

아이스버그 주문이란?

아이스버그 주문(Iceberg Order)은 대량 주문을 여러 개의 소량 주문으로 분할하여 순차적으로 실행하는 알고리즘 주문 기법입니다. 빙산처럼 전체 주문의 일부만 호가창에 노출되고 나머지는 숨겨져 있다는 의미에서 이름이 유래했습니다.

대규모 매수·매도 주문을 한 번에 넣으면 시장 충격(Market Impact)이 발생하여 불리한 가격에 체결됩니다. 아이스버그 주문은 이 충격을 최소화하면서도 원하는 수량을 완전히 체결하는 것이 목표입니다. 이 글에서는 파이썬으로 아이스버그 주문 엔진을 구현하는 전체 과정을 다룹니다.

시장 충격과 슬리피지

왜 주문 분할이 필요한지 이해하려면 시장 충격 모델을 알아야 합니다.

import numpy as np

def estimate_market_impact(order_size, avg_volume, spread, volatility):
    """
    시장 충격 추정 (Square-Root 모델)
    - Almgren-Chriss 모델 간소화 버전
    """
    participation_rate = order_size / avg_volume
    
    # 영구적 충격: 주문 크기에 비례
    permanent_impact = spread * np.sqrt(participation_rate) * volatility * 100
    
    # 일시적 충격: 즉시 체결 시 추가 비용
    temporary_impact = spread * (participation_rate ** 0.6)
    
    total_impact_bps = (permanent_impact + temporary_impact) * 10000
    
    return {
        'participation_rate': round(participation_rate, 4),
        'permanent_impact_bps': round(permanent_impact * 10000, 2),
        'temporary_impact_bps': round(temporary_impact * 10000, 2),
        'total_impact_bps': round(total_impact_bps, 2)
    }

# 예시: 일 거래량의 5% 주문
# result = estimate_market_impact(
#     order_size=50000,
#     avg_volume=1000000,
#     spread=0.001,
#     volatility=0.02
# )
# → total_impact_bps ≈ 15~30 bps 추가 비용

일 거래량의 5%만 주문해도 15~30bp(0.15~0.3%)의 추가 비용이 발생할 수 있습니다. 이를 줄이는 것이 알고리즘 주문의 핵심입니다.

기본 아이스버그 엔진 구현

전체 주문을 균등 분할하고 순차적으로 실행하는 기본 아이스버그 엔진입니다.

import asyncio
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional, List
from enum import Enum

class OrderSide(Enum):
    BUY = "buy"
    SELL = "sell"

class OrderStatus(Enum):
    PENDING = "pending"
    PARTIAL = "partial"
    FILLED = "filled"
    CANCELLED = "cancelled"

@dataclass
class ChildOrder:
    order_id: str = ""
    size: float = 0
    filled: float = 0
    price: Optional[float] = None
    status: OrderStatus = OrderStatus.PENDING
    created_at: Optional[datetime] = None
    filled_at: Optional[datetime] = None

@dataclass
class IcebergOrder:
    symbol: str
    side: OrderSide
    total_size: float
    slice_size: float
    filled: float = 0
    children: List[ChildOrder] = field(default_factory=list)
    status: OrderStatus = OrderStatus.PENDING
    avg_price: float = 0

    @property
    def remaining(self):
        return self.total_size - self.filled

    @property
    def progress(self):
        return self.filled / self.total_size * 100 if self.total_size > 0 else 0
class IcebergEngine:
    def __init__(self, exchange_client, config=None):
        self.exchange = exchange_client
        self.config = config or {
            'min_interval_sec': 3,
            'max_interval_sec': 15,
            'price_tolerance_bps': 10,
            'max_retries': 3,
            'use_limit_orders': True
        }

    async def execute(self, order: IcebergOrder):
        """아이스버그 주문 실행 메인 루프"""
        order.status = OrderStatus.PARTIAL
        total_cost = 0
        total_filled = 0

        while order.remaining > 0:
            # 현재 슬라이스 크기 결정
            current_slice = min(order.slice_size, order.remaining)

            # 랜덤 지연 (패턴 감지 방지)
            delay = np.random.uniform(
                self.config['min_interval_sec'],
                self.config['max_interval_sec']
            )
            await asyncio.sleep(delay)

            # 현재 시장가 확인
            ticker = await self.exchange.get_ticker(order.symbol)
            price = self._calculate_price(ticker, order.side)

            # 자식 주문 생성 및 실행
            child = ChildOrder(
                size=current_slice,
                price=price,
                created_at=datetime.now()
            )

            try:
                result = await self._place_order(
                    order.symbol, order.side, current_slice, price
                )
                child.order_id = result['order_id']
                child.filled = result['filled']
                child.status = OrderStatus.FILLED
                child.filled_at = datetime.now()

                total_cost += result['filled'] * result['avg_price']
                total_filled += result['filled']
                order.filled = total_filled

                print(f"[SLICE] {total_filled:.4f}/{order.total_size:.4f} "
                      f"({order.progress:.1f}%) @ {result['avg_price']:.2f}")

            except Exception as e:
                child.status = OrderStatus.CANCELLED
                print(f"[ERROR] 슬라이스 실패: {e}")

            order.children.append(child)

        order.status = OrderStatus.FILLED
        order.avg_price = total_cost / total_filled if total_filled > 0 else 0

        return order

    def _calculate_price(self, ticker, side):
        """지정가 계산: 스프레드 내에서 유리한 가격"""
        if not self.config['use_limit_orders']:
            return None  # 시장가

        tolerance = self.config['price_tolerance_bps'] / 10000
        if side == OrderSide.BUY:
            return ticker['ask'] * (1 + tolerance)
        else:
            return ticker['bid'] * (1 - tolerance)

    async def _place_order(self, symbol, side, size, price):
        """거래소 API로 실제 주문 전송"""
        if self.config['use_limit_orders'] and price:
            return await self.exchange.limit_order(
                symbol, side.value, size, price
            )
        else:
            return await self.exchange.market_order(
                symbol, side.value, size
            )

핵심은 랜덤 지연(Random Delay)입니다. 일정한 간격으로 주문하면 다른 트레이더가 패턴을 감지하여 프론트런닝할 수 있습니다.

적응형 슬라이스 크기 조절

시장 상황에 따라 슬라이스 크기를 동적으로 조절하면 더 효율적입니다.

class AdaptiveIcebergEngine(IcebergEngine):
    def __init__(self, exchange_client, config=None):
        super().__init__(exchange_client, config)
        self.volatility_window = []

    def _adaptive_slice_size(self, base_size, orderbook, recent_trades):
        """시장 상황에 따른 슬라이스 크기 조절"""
        multiplier = 1.0

        # 1. 호가창 깊이 기반 조절
        best_depth = self._get_top_depth(orderbook, levels=5)
        if best_depth > base_size * 10:
            multiplier *= 1.3  # 유동성 풍부 → 크게
        elif best_depth < base_size * 3:
            multiplier *= 0.6  # 유동성 부족 → 작게

        # 2. 최근 거래량 기반 조절
        recent_volume = sum(t['size'] for t in recent_trades[-20:])
        avg_trade_size = recent_volume / max(len(recent_trades[-20:]), 1)
        
        # 평균 거래 크기의 2~5배 이내로 제한
        max_slice = avg_trade_size * 5
        if base_size * multiplier > max_slice:
            multiplier = max_slice / base_size

        # 3. 변동성 기반 조절
        prices = [t['price'] for t in recent_trades[-50:]]
        if len(prices) > 10:
            vol = np.std(prices) / np.mean(prices)
            if vol > 0.005:
                multiplier *= 0.7  # 고변동성 → 보수적
            elif vol < 0.001:
                multiplier *= 1.2  # 저변동성 → 적극적

        # 최소/최대 제한
        adjusted = base_size * multiplier
        return max(adjusted, base_size * 0.3), min(adjusted, base_size * 2.0)

    def _get_top_depth(self, orderbook, levels=5):
        """상위 N호가 물량 합산"""
        bids = sum(b[1] for b in orderbook['bids'][:levels])
        asks = sum(a[1] for a in orderbook['asks'][:levels])
        return min(bids, asks)

호가창이 두터우면 슬라이스를 키워 빠르게 체결하고, 유동성이 부족하면 줄여서 시장 충격을 억제합니다. 이 방식은 거래소 API의 실시간 호가 데이터를 활용합니다.

TWAP/VWAP 하이브리드 전략

아이스버그를 단독으로 사용하기보다 TWAP(시간 가중)이나 VWAP(거래량 가중)과 결합하면 더 효과적입니다.

class HybridExecutionEngine:
    def __init__(self, exchange_client, total_duration_min=60):
        self.exchange = exchange_client
        self.total_duration = total_duration_min * 60  # 초 단위

    def create_vwap_schedule(self, total_size, volume_profile):
        """
        거래량 프로파일 기반 VWAP 스케줄 생성
        volume_profile: 시간대별 상대 거래량 (예: [0.05, 0.08, ...])
        """
        total_weight = sum(volume_profile)
        schedule = []

        for i, weight in enumerate(volume_profile):
            slice_size = total_size * (weight / total_weight)
            schedule.append({
                'slot': i,
                'target_size': round(slice_size, 6),
                'weight': round(weight / total_weight, 4)
            })

        return schedule

    async def execute_vwap_iceberg(self, symbol, side, total_size, 
                                     volume_profile, slices_per_slot=3):
        """VWAP 스케줄에 따라 아이스버그 주문 실행"""
        schedule = self.create_vwap_schedule(total_size, volume_profile)
        slot_duration = self.total_duration / len(schedule)
        
        results = []

        for slot in schedule:
            target = slot['target_size']
            slice_size = target / slices_per_slot
            filled_in_slot = 0

            for _ in range(slices_per_slot):
                if filled_in_slot >= target:
                    break

                current_slice = min(slice_size, target - filled_in_slot)
                delay = slot_duration / slices_per_slot * np.random.uniform(0.5, 1.5)
                await asyncio.sleep(delay)

                result = await self.exchange.market_order(
                    symbol, side, current_slice
                )
                filled_in_slot += result['filled']
                results.append(result)

            print(f"[SLOT {slot['slot']}] {filled_in_slot:.4f} / "
                  f"{target:.4f} (가중치: {slot['weight']:.2%})")

        return results

VWAP 방식은 거래량이 많은 시간대에 더 많이 체결하여 평균 체결가를 시장 VWAP에 근접시킵니다.

체결 성과 분석

알고리즘 주문의 성과를 객관적으로 평가하는 지표를 구현합니다.

class ExecutionAnalyzer:
    def analyze(self, iceberg_order, market_vwap, arrival_price):
        """주문 실행 성과 분석"""
        avg_price = iceberg_order.avg_price

        # Implementation Shortfall (IS)
        # 의사결정 시점 가격 대비 실제 체결 비용
        if iceberg_order.side == OrderSide.BUY:
            is_bps = (avg_price - arrival_price) / arrival_price * 10000
        else:
            is_bps = (arrival_price - avg_price) / arrival_price * 10000

        # VWAP 대비 슬리피지
        if iceberg_order.side == OrderSide.BUY:
            vwap_slip = (avg_price - market_vwap) / market_vwap * 10000
        else:
            vwap_slip = (market_vwap - avg_price) / market_vwap * 10000

        # 체결률
        fill_rate = iceberg_order.filled / iceberg_order.total_size * 100

        # 슬라이스 통계
        fill_times = []
        for child in iceberg_order.children:
            if child.filled_at and child.created_at:
                dt = (child.filled_at - child.created_at).total_seconds()
                fill_times.append(dt)

        return {
            'avg_price': round(avg_price, 4),
            'arrival_price': round(arrival_price, 4),
            'market_vwap': round(market_vwap, 4),
            'impl_shortfall_bps': round(is_bps, 2),
            'vwap_slippage_bps': round(vwap_slip, 2),
            'fill_rate_pct': round(fill_rate, 2),
            'total_slices': len(iceberg_order.children),
            'avg_fill_time_sec': round(np.mean(fill_times), 2) if fill_times else 0
        }
지표 의미 목표
Implementation Shortfall 의사결정 시점 대비 실제 비용 < 10 bps
VWAP Slippage 시장 VWAP 대비 체결가 차이 < 5 bps
Fill Rate 목표 수량 대비 실제 체결 비율 > 95%
Participation Rate 시장 거래량 대비 주문 비율 < 10%

프론트런닝 방어

아이스버그 주문의 최대 위협은 프론트런닝(Front-Running)입니다. 다른 알고리즘이 패턴을 감지하여 먼저 매수/매도하는 것을 방어해야 합니다.

  • 랜덤 슬라이스 크기: 고정 크기 대신 ±30% 범위 랜덤화
  • 랜덤 간격: 주문 간 대기 시간을 불규칙하게 설정
  • 페이크 주문: 가끔 반대 방향 소량 주문으로 의도 은폐 (주의: 규제 확인 필요)
  • 다중 거래소 분산: 여러 거래소에 동시 분산 실행
  • 시장가/지정가 혼합: 예측 불가한 주문 유형 교차 사용
def randomize_slice(base_size, variance=0.3):
    """프론트런닝 방지를 위한 슬라이스 크기 랜덤화"""
    min_size = base_size * (1 - variance)
    max_size = base_size * (1 + variance)
    return np.random.uniform(min_size, max_size)

실전 적용 팁

  • 슬라이스 크기: 평균 체결 건당 거래량의 2~5배 이내로 설정
  • 실행 시간: 긴급하지 않으면 1~4시간에 걸쳐 분산 실행
  • 모니터링: 체결 진행률, 시장 충격, VWAP 슬리피지를 실시간 추적
  • 중단 조건: 변동성 급등, 유동성 고갈 시 자동 중단 로직 필수
  • 리스크 관리와 연계하여 최대 참여율(Participation Rate) 제한

마무리

아이스버그 주문은 대량 거래에서 시장 충격과 슬리피지를 최소화하는 핵심 알고리즘입니다. 적응형 슬라이스 조절, VWAP 하이브리드 전략, 프론트런닝 방어까지 갖추면 기관급 주문 실행 품질을 확보할 수 있습니다.

특히 암호화폐 시장에서는 호가창이 얕은 경우가 많아, 아이스버그 주문의 효용이 더욱 큽니다. 자동매매 시스템에 통합하여 거래소 API와 연결하면 실전 트레이딩의 체결 품질을 한 단계 끌어올릴 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux