주문흐름 불균형 매매 전략

주문흐름 불균형이란?

주문흐름 불균형(Order Flow Imbalance, OFI)은 매수 주문과 매도 주문의 압력 차이를 수치화한 지표입니다. 호가창에서 매수세가 매도세보다 강하면 양의 불균형, 반대면 음의 불균형이 발생하며, 이 신호는 단기 가격 방향을 예측하는 데 매우 효과적입니다.

고빈도 트레이딩(HFT) 업체들이 핵심 알파 소스로 사용하는 전략이며, 개인 트레이더도 거래소 API의 실시간 호가 데이터를 활용하면 파이썬으로 구현할 수 있습니다.

OFI 계산 원리

OFI는 호가창의 최우선 호가(BBO) 변화를 기반으로 매수·매도 압력을 측정합니다. 핵심 아이디어는 단순합니다.

상황 해석 OFI 기여
매수호가 상승 또는 매수잔량 증가 매수 압력 강화 양(+)
매수호가 하락 또는 매수잔량 감소 매수 압력 약화 음(−)
매도호가 상승 또는 매도잔량 증가 매도 압력 약화 양(+)
매도호가 하락 또는 매도잔량 감소 매도 압력 강화 음(−)

파이썬 OFI 계산 구현

실시간 호가 스냅샷에서 OFI를 계산하는 핵심 클래스입니다. 이전 스냅샷과 비교하여 매수·매도 압력 변화를 추적합니다.

import numpy as np
from dataclasses import dataclass
from typing import Optional

@dataclass
class BookSnapshot:
    bid_price: float
    bid_size: float
    ask_price: float
    ask_size: float
    timestamp: float

class OFICalculator:
    def __init__(self, window_size=50):
        self.prev: Optional[BookSnapshot] = None
        self.ofi_history = []
        self.window_size = window_size

    def update(self, snapshot: BookSnapshot) -> float:
        """새 호가 스냅샷으로 OFI 업데이트"""
        if self.prev is None:
            self.prev = snapshot
            return 0.0

        # 매수 측 기여
        if snapshot.bid_price > self.prev.bid_price:
            bid_contrib = snapshot.bid_size
        elif snapshot.bid_price == self.prev.bid_price:
            bid_contrib = snapshot.bid_size - self.prev.bid_size
        else:
            bid_contrib = -self.prev.bid_size

        # 매도 측 기여
        if snapshot.ask_price < self.prev.ask_price:
            ask_contrib = -snapshot.ask_size
        elif snapshot.ask_price == self.prev.ask_price:
            ask_contrib = -(snapshot.ask_size - self.prev.ask_size)
        else:
            ask_contrib = self.prev.ask_size

        ofi = bid_contrib + ask_contrib
        self.ofi_history.append(ofi)

        # 윈도우 크기 유지
        if len(self.ofi_history) > self.window_size * 10:
            self.ofi_history = self.ofi_history[-self.window_size * 5:]

        self.prev = snapshot
        return ofi

    def cumulative_ofi(self, window=None):
        """누적 OFI (윈도우 기간)"""
        w = window or self.window_size
        recent = self.ofi_history[-w:]
        return sum(recent)

    def normalized_ofi(self, window=None):
        """정규화된 OFI (-1 ~ 1)"""
        w = window or self.window_size
        recent = self.ofi_history[-w:]
        if not recent:
            return 0.0
        cum = sum(recent)
        abs_sum = sum(abs(x) for x in recent)
        if abs_sum == 0:
            return 0.0
        return cum / abs_sum

OFI 기반 자동매매 봇

아래는 실시간 웹소켓으로 호가 데이터를 수신하고, OFI 신호에 따라 자동 매매하는 전략 봇입니다.

import ccxt
import time
from datetime import datetime

class OFITradingBot:
    def __init__(self, exchange, symbol, capital):
        self.exchange = exchange
        self.symbol = symbol
        self.capital = capital
        self.ofi_calc = OFICalculator(window_size=50)
        self.position = 0       # 현재 포지션
        self.entry_price = 0
        self.trades = []

        # 전략 파라미터
        self.entry_threshold = 0.6   # 진입 임계값
        self.exit_threshold = 0.1    # 청산 임계값
        self.position_pct = 0.1      # 자본의 10%씩 진입
        self.stop_loss_pct = 0.5     # 0.5% 손절
        self.take_profit_pct = 1.0   # 1.0% 익절

    def fetch_book_snapshot(self):
        """호가 스냅샷 가져오기"""
        book = self.exchange.fetch_order_book(
            self.symbol, limit=5
        )
        return BookSnapshot(
            bid_price=book['bids'][0][0],
            bid_size=book['bids'][0][1],
            ask_price=book['asks'][0][0],
            ask_size=book['asks'][0][1],
            timestamp=time.time()
        )

    def check_stop_conditions(self, current_price):
        """손절/익절 체크"""
        if self.position == 0:
            return None

        pnl_pct = ((current_price - self.entry_price)
                   / self.entry_price * 100)
        if self.position < 0:
            pnl_pct = -pnl_pct

        if pnl_pct <= -self.stop_loss_pct:
            return 'stop_loss'
        if pnl_pct >= self.take_profit_pct:
            return 'take_profit'
        return None

    def execute_signal(self, signal, price):
        """매매 신호 실행"""
        qty = (self.capital * self.position_pct) / price

        if signal == 'buy' and self.position <= 0:
            if self.position < 0:
                # 숏 청산
                self.exchange.create_market_order(
                    self.symbol, 'buy', abs(self.position)
                )
            self.exchange.create_market_order(
                self.symbol, 'buy', qty
            )
            self.position = qty
            self.entry_price = price
            self._log_trade('BUY', qty, price)

        elif signal == 'sell' and self.position >= 0:
            if self.position > 0:
                self.exchange.create_market_order(
                    self.symbol, 'sell', self.position
                )
            self.exchange.create_market_order(
                self.symbol, 'sell', qty
            )
            self.position = -qty
            self.entry_price = price
            self._log_trade('SELL', qty, price)

        elif signal == 'close' and self.position != 0:
            side = 'sell' if self.position > 0 else 'buy'
            self.exchange.create_market_order(
                self.symbol, side, abs(self.position)
            )
            self._log_trade('CLOSE', abs(self.position), price)
            self.position = 0
            self.entry_price = 0

    def run(self, interval=1, max_minutes=60):
        """OFI 매매 루프"""
        end_time = time.time() + max_minutes * 60
        print(f"[OFI 봇 시작] {self.symbol} | "
              f"임계값: ±{self.entry_threshold}")

        while time.time() < end_time:
            try:
                snap = self.fetch_book_snapshot()
                self.ofi_calc.update(snap)
                nofi = self.ofi_calc.normalized_ofi()
                mid = (snap.bid_price + snap.ask_price) / 2

                # 손절/익절 체크
                stop = self.check_stop_conditions(mid)
                if stop:
                    self.execute_signal('close', mid)
                    print(f"[{stop.upper()}] @ {mid}")
                    time.sleep(interval)
                    continue

                # OFI 신호 기반 매매
                if nofi > self.entry_threshold:
                    self.execute_signal('buy', mid)
                    print(f"[매수] OFI={nofi:.3f} @ {mid}")
                elif nofi < -self.entry_threshold:
                    self.execute_signal('sell', mid)
                    print(f"[매도] OFI={nofi:.3f} @ {mid}")
                elif abs(nofi) < self.exit_threshold 
                        and self.position != 0:
                    self.execute_signal('close', mid)
                    print(f"[청산] OFI 중립 @ {mid}")

            except Exception as e:
                print(f"[오류] {e}")

            time.sleep(interval)

    def _log_trade(self, action, qty, price):
        self.trades.append({
            'time': datetime.now().isoformat(),
            'action': action,
            'qty': qty,
            'price': price
        })

멀티 레벨 OFI 확장

기본 OFI는 최우선 호가만 사용하지만, 여러 호가 단계의 변화를 가중 합산하면 더 강력한 신호를 얻을 수 있습니다.

def multi_level_ofi(self, book, prev_book, levels=5):
    """다단계 호가 OFI 계산"""
    total_ofi = 0
    weights = [1.0, 0.5, 0.25, 0.125, 0.0625]

    for i in range(min(levels, len(book['bids']),
                       len(prev_book['bids']))):
        w = weights[i] if i < len(weights) else weights[-1]

        bid_delta = (book['bids'][i][1] - prev_book['bids'][i][1]
                     if book['bids'][i][0] == prev_book['bids'][i][0]
                     else book['bids'][i][1])
        ask_delta = (book['asks'][i][1] - prev_book['asks'][i][1]
                     if book['asks'][i][0] == prev_book['asks'][i][0]
                     else -book['asks'][i][1])

        total_ofi += w * (bid_delta + ask_delta)

    return total_ofi

OFI 신호 필터링 기법

원시 OFI 신호에는 노이즈가 많습니다. 아래 필터링 기법으로 신호 품질을 높일 수 있습니다.

  • EMA 스무딩 — 누적 OFI에 지수이동평균을 적용하여 노이즈 감소
  • 거래량 필터 — 거래량이 일정 수준 이상일 때만 OFI 신호 유효
  • 스프레드 필터 — 호가 스프레드가 비정상적으로 넓으면 신호 무시 (유동성 부족)
  • 시간대 필터 — 장 시작/마감 직전의 비정상적 OFI 패턴 제외
  • 연속 확인 — N틱 연속 같은 방향의 OFI가 나올 때만 진입

백테스트 시 주의사항

함정 설명 해결 방법
미래 참조 편향 현재 틱의 OFI로 같은 틱에 매매 신호 1틱 지연 적용
틱 데이터 품질 호가 스냅샷 누락/지연 거래소 직접 수집 데이터 사용
슬리피지 과소평가 OFI가 강한 순간은 체결이 어려움 보수적 슬리피지 모델 적용
시장 미시구조 변화 거래소 규칙·수수료 변경 기간별 분리 백테스트

관련 글

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux