자동매매 멀티 계좌 동기화

왜 멀티 계좌로 운영하는가

자동매매를 본격적으로 운영하면 단일 거래소, 단일 계좌의 한계에 부딪힙니다. 거래소 점검 시 매매가 중단되고, 특정 거래소의 유동성이 부족하면 슬리피지가 커지며, 한 곳에 자금을 집중하면 거래소 리스크에 노출됩니다. 이 글에서는 멀티 계좌 자동매매 시스템을 설계하고 계좌 간 상태를 동기화하는 방법을 다룹니다.

멀티 계좌 아키텍처 설계

멀티 계좌 시스템의 핵심은 전략 레이어와 실행 레이어를 분리하는 것입니다. 전략은 하나지만, 실행은 여러 거래소에서 동시에 이루어집니다.

from abc import ABC, abstractmethod
from typing import Dict, List
from dataclasses import dataclass

@dataclass
class AccountConfig:
    name: str
    exchange: str
    api_key: str
    api_secret: str
    weight: float  # 자금 배분 비율 (0.0 ~ 1.0)
    max_position_usd: float
    enabled: bool = True

class ExchangeAdapter(ABC):
    """거래소별 어댑터 인터페이스"""
    @abstractmethod
    def get_balance(self) -> float: ...
    
    @abstractmethod
    def place_order(self, symbol: str, side: str, qty: float, 
                    order_type: str = "market") -> dict: ...
    
    @abstractmethod
    def get_positions(self) -> List[dict]: ...
    
    @abstractmethod
    def cancel_all(self, symbol: str) -> int: ...

class BinanceAdapter(ExchangeAdapter):
    def __init__(self, config: AccountConfig):
        self.config = config
        # Binance API 클라이언트 초기화
    
    def get_balance(self) -> float:
        # 구현
        pass

class BybitAdapter(ExchangeAdapter):
    def __init__(self, config: AccountConfig):
        self.config = config
        # Bybit API 클라이언트 초기화
    
    def get_balance(self) -> float:
        # 구현
        pass

ExchangeAdapter 추상 클래스로 거래소별 차이를 캡슐화합니다. 새 거래소를 추가할 때 어댑터만 구현하면 전략 코드를 수정할 필요가 없습니다.

주문 분배기: 가중치 기반 배분

전략이 “BTC 매수” 시그널을 생성하면, 주문 분배기(Order Dispatcher)가 각 계좌의 가중치에 따라 주문 수량을 나눠 보냅니다.

class OrderDispatcher:
    def __init__(self, accounts: Dict[str, ExchangeAdapter], 
                 configs: Dict[str, AccountConfig]):
        self.accounts = accounts
        self.configs = configs
    
    def dispatch(self, symbol: str, side: str, total_usd: float) -> List[dict]:
        """가중치에 따라 주문 분배"""
        results = []
        active = {k: v for k, v in self.configs.items() if v.enabled}
        total_weight = sum(c.weight for c in active.values())
        
        for name, config in active.items():
            allocated_usd = total_usd * (config.weight / total_weight)
            allocated_usd = min(allocated_usd, config.max_position_usd)
            
            if allocated_usd < 10:  # 최소 주문 금액
                continue
            
            try:
                adapter = self.accounts[name]
                ticker_price = self._get_price(adapter, symbol)
                qty = allocated_usd / ticker_price
                
                result = adapter.place_order(symbol, side, qty)
                result['account'] = name
                result['allocated_usd'] = allocated_usd
                results.append(result)
                
            except Exception as e:
                results.append({
                    'account': name, 'status': 'error',
                    'error': str(e)
                })
        
        return results
    
    def _get_price(self, adapter, symbol):
        # 현재가 조회 로직
        pass

max_position_usd로 계좌별 최대 포지션을 제한합니다. 한 거래소에 과도한 포지션이 쌓이는 것을 방지하는 안전장치입니다.

상태 동기화: 계좌 간 포지션 정합성

멀티 계좌에서 가장 까다로운 문제는 포지션 정합성입니다. A 거래소에서는 체결되었지만 B 거래소에서는 실패한 경우, 전체 포지션이 의도와 달라집니다.

import time

class PositionSynchronizer:
    def __init__(self, accounts: Dict[str, ExchangeAdapter],
                 configs: Dict[str, AccountConfig]):
        self.accounts = accounts
        self.configs = configs
        self.target_positions: Dict[str, Dict[str, float]] = {}
    
    def set_target(self, symbol: str, total_qty: float):
        """전략이 원하는 전체 목표 포지션 설정"""
        active = {k: v for k, v in self.configs.items() if v.enabled}
        total_weight = sum(c.weight for c in active.values())
        
        self.target_positions[symbol] = {}
        for name, config in active.items():
            self.target_positions[symbol][name] = 
                total_qty * (config.weight / total_weight)
    
    def sync(self, symbol: str) -> List[dict]:
        """실제 포지션과 목표 포지션의 차이를 조정"""
        if symbol not in self.target_positions:
            return []
        
        adjustments = []
        for name, target_qty in self.target_positions[symbol].items():
            adapter = self.accounts[name]
            actual = self._get_actual_qty(adapter, symbol)
            diff = target_qty - actual
            
            if abs(diff) < 0.0001:
                continue
            
            side = "BUY" if diff > 0 else "SELL"
            qty = abs(diff)
            
            try:
                result = adapter.place_order(symbol, side, qty)
                adjustments.append({
                    'account': name, 'action': side,
                    'qty': qty, 'status': 'filled'
                })
            except Exception as e:
                adjustments.append({
                    'account': name, 'action': side,
                    'qty': qty, 'status': 'error', 'error': str(e)
                })
        
        return adjustments
    
    def _get_actual_qty(self, adapter, symbol):
        positions = adapter.get_positions()
        for p in positions:
            if p['symbol'] == symbol:
                return p['qty']
        return 0.0

sync 메서드를 주기적으로 호출(예: 1분마다)하면 부분 체결이나 에러로 발생한 포지션 불일치를 자동으로 보정합니다. 이 패턴은 자동매매 장애 복구 설계에서 다룬 포지션 동기화의 확장입니다.

계좌 헬스체크와 자동 전환

특정 거래소에 장애가 발생하면 해당 계좌를 비활성화하고 나머지 계좌로 가중치를 재분배해야 합니다.

class AccountHealthChecker:
    def __init__(self, accounts: Dict[str, ExchangeAdapter],
                 configs: Dict[str, AccountConfig]):
        self.accounts = accounts
        self.configs = configs
        self.failure_counts: Dict[str, int] = {}
        self.last_success: Dict[str, float] = {}
    
    def check_all(self) -> Dict[str, bool]:
        """전 계좌 헬스체크"""
        results = {}
        for name, adapter in self.accounts.items():
            try:
                balance = adapter.get_balance()
                self.failure_counts[name] = 0
                self.last_success[name] = time.time()
                results[name] = True
            except Exception:
                self.failure_counts[name] = 
                    self.failure_counts.get(name, 0) + 1
                results[name] = False
                
                # 3회 연속 실패 시 비활성화
                if self.failure_counts[name] >= 3:
                    self.configs[name].enabled = False
                    print(f"🔴 {name} 비활성화 (연속 {self.failure_counts[name]}회 실패)")
        
        return results
    
    def try_recover(self):
        """비활성 계좌 복구 시도"""
        for name, config in self.configs.items():
            if config.enabled:
                continue
            try:
                self.accounts[name].get_balance()
                config.enabled = True
                self.failure_counts[name] = 0
                print(f"🟢 {name} 복구 완료")
            except Exception:
                pass

3회 연속 실패 시 자동 비활성화, 이후 주기적으로 복구를 시도합니다. 장애 전파를 차단하면서도 복구 기회를 놓치지 않는 설계입니다.

자금 리밸런싱

장기 운영 시 계좌 간 잔고 편차가 커질 수 있습니다. 수익이 나는 거래소에 자금이 쏠리고, 손실이 나는 거래소는 자금이 부족해집니다.

def check_rebalance_needed(configs: Dict[str, AccountConfig],
                           balances: Dict[str, float],
                           threshold: float = 0.15) -> List[dict]:
    """리밸런싱 필요 여부 확인"""
    total = sum(balances.values())
    if total == 0:
        return []
    
    transfers = []
    for name, config in configs.items():
        if not config.enabled:
            continue
        actual_ratio = balances.get(name, 0) / total
        target_ratio = config.weight
        deviation = actual_ratio - target_ratio
        
        if abs(deviation) > threshold:
            transfers.append({
                'account': name,
                'actual_ratio': f"{actual_ratio:.1%}",
                'target_ratio': f"{target_ratio:.1%}",
                'deviation': f"{deviation:+.1%}",
                'action': '출금 필요' if deviation > 0 else '입금 필요',
                'amount_usd': abs(deviation) * total
            })
    
    return transfers

자동 송금까지 구현하는 것은 보안 리스크가 크므로, 리밸런싱이 필요한 상황을 알림으로 보내고 수동으로 처리하는 것을 권장합니다. 실시간 자산 추적은 자동매매 실시간 PnL 추적 구현과 연계하면 효과적입니다.

통합 리포트

여러 계좌의 PnL을 합산하여 전체 성과를 한눈에 파악하는 리포트가 필요합니다.

def generate_multi_account_report(accounts: Dict[str, ExchangeAdapter],
                                   configs: Dict[str, AccountConfig]) -> str:
    lines = ["📊 멀티 계좌 리포트", "━" * 20]
    total_balance = 0
    total_positions = 0
    
    for name, adapter in accounts.items():
        if not configs[name].enabled:
            lines.append(f"🔴 {name}: 비활성")
            continue
        
        try:
            balance = adapter.get_balance()
            positions = adapter.get_positions()
            active = len([p for p in positions if p.get('qty', 0) > 0])
            total_balance += balance
            total_positions += active
            lines.append(
                f"🟢 {name}: ${balance:,.2f} | "
                f"포지션 {active}개 | 비중 {configs[name].weight:.0%}"
            )
        except Exception as e:
            lines.append(f"⚠️ {name}: 조회 실패 ({e})")
    
    lines.append("━" * 20)
    lines.append(f"총 자산: ${total_balance:,.2f}")
    lines.append(f"총 포지션: {total_positions}개")
    
    return "n".join(lines)

마무리

멀티 계좌 자동매매는 분산 투자의 원칙을 시스템 레벨에서 구현하는 것입니다. 거래소 리스크 분산, 유동성 확보, 장애 대응력 향상 등 단일 계좌로는 얻을 수 없는 이점이 있습니다. 핵심은 어댑터 패턴으로 거래소 차이를 추상화하고, 주기적 포지션 동기화로 정합성을 유지하며, 헬스체크로 장애를 자동 격리하는 것입니다. 처음에는 2개 거래소, 동일 전략으로 시작하여 점진적으로 확장하는 것을 권장합니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux