파이썬 칼만 필터 자동매매

칼만 필터란? 노이즈 속 진짜 가격 찾기

칼만 필터(Kalman Filter)는 노이즈가 섞인 관측값에서 숨겨진 진짜 상태를 추정하는 재귀적 알고리즘입니다. 원래 항공우주 분야에서 개발되었지만, 금융에서는 시장 노이즈를 제거하고 진정한 가격 추세를 추출하는 데 강력한 도구입니다.

이동평균(MA)과 비교했을 때 칼만 필터의 장점:

  • 적응적 평활: 시장 상태에 따라 자동으로 반응 속도 조절
  • 지연 최소화: MA보다 가격 변화에 빠르게 반응
  • 불확실성 정량화: 추정의 신뢰도를 수치로 제공
  • 다변량 확장: 여러 자산의 관계를 동시에 모델링 가능

이 글에서는 파이썬으로 칼만 필터를 구현하고, 실전 자동매매 전략까지 완성합니다.

칼만 필터 수학적 원리

칼만 필터는 예측(Predict)업데이트(Update) 두 단계를 반복합니다.

# 칼만 필터 핵심 수식
# 1단계: 예측 (Predict)
# x̂(k|k-1) = F · x̂(k-1|k-1)     상태 예측
# P(k|k-1)  = F · P(k-1|k-1) · F' + Q   공분산 예측

# 2단계: 업데이트 (Update)
# K(k)      = P(k|k-1) · H' / (H · P(k|k-1) · H' + R)   칼만 이득
# x̂(k|k)   = x̂(k|k-1) + K(k) · (z(k) - H · x̂(k|k-1))  상태 보정
# P(k|k)    = (I - K(k) · H) · P(k|k-1)                    공분산 보정

# F: 상태 전이 행렬, H: 관측 행렬
# Q: 프로세스 노이즈, R: 관측 노이즈
# K: 칼만 이득 (예측과 관측의 가중치 결정)

칼만 이득(K)이 핵심입니다. K가 크면 관측값(시장 가격)을 더 신뢰하고, K가 작으면 모델 예측을 더 신뢰합니다. 시장이 안정적일 때는 K가 작아져 노이즈를 걸러내고, 급변할 때는 K가 커져 빠르게 적응합니다.

파이썬 칼만 필터 구현

금융 시계열에 특화된 1D 칼만 필터를 처음부터 구현합니다.

import numpy as np

class KalmanFilter1D:
    """1차원 칼만 필터 — 금융 시계열 평활화"""

    def __init__(self, process_noise=1e-5, measurement_noise=1e-3,
                 initial_estimate=0, initial_error=1.0):
        """
        process_noise (Q): 가격의 실제 변동 분산 — 작을수록 평활
        measurement_noise (R): 관측 노이즈 분산 — 클수록 평활
        """
        self.Q = process_noise
        self.R = measurement_noise
        self.x = initial_estimate   # 상태 추정값
        self.P = initial_error      # 추정 오차 공분산
        self.K = 0                  # 칼만 이득
        self.history = []

    def update(self, measurement):
        """한 스텝 예측 + 업데이트"""
        # 예측 단계
        x_pred = self.x
        P_pred = self.P + self.Q

        # 업데이트 단계
        self.K = P_pred / (P_pred + self.R)
        self.x = x_pred + self.K * (measurement - x_pred)
        self.P = (1 - self.K) * P_pred

        self.history.append({
            'measurement': measurement,
            'estimate': self.x,
            'kalman_gain': self.K,
            'uncertainty': np.sqrt(self.P)
        })
        return self.x

    def filter_series(self, prices):
        """전체 시계열 필터링"""
        self.x = prices[0]
        estimates = []
        for p in prices:
            estimates.append(self.update(p))
        return np.array(estimates)

    def get_uncertainty_band(self, sigma=2):
        """신뢰 구간 밴드 반환"""
        estimates = [h['estimate'] for h in self.history]
        uncertainties = [h['uncertainty'] for h in self.history]
        upper = [e + sigma * u for e, u in zip(estimates, uncertainties)]
        lower = [e - sigma * u for e, u in zip(estimates, uncertainties)]
        return np.array(upper), np.array(lower)

# 사용 예시
np.random.seed(42)
true_price = np.cumsum(np.random.normal(0.01, 0.5, 500)) + 100
noisy_price = true_price + np.random.normal(0, 2, 500)

kf = KalmanFilter1D(process_noise=0.01, measurement_noise=4.0)
filtered = kf.filter_series(noisy_price)
print(f"노이즈 RMSE: {np.sqrt(np.mean((noisy_price - true_price)**2)):.4f}")
print(f"필터 RMSE: {np.sqrt(np.mean((filtered - true_price)**2)):.4f}")

칼만 필터 기반 자동매매 전략

칼만 필터로 추출한 평활 가격실제 가격의 괴리를 이용한 평균회귀 전략입니다.

신호 조건 해석
매수 실제가격 < 칼만추정 – 2σ 과매도 — 평균 회귀 기대
매도 실제가격 > 칼만추정 + 2σ 과매수 — 되돌림 기대
청산 실제가격 ≈ 칼만추정 (±0.5σ) 공정가치 복귀 — 이익 실현
class KalmanTradingStrategy:
    """칼만 필터 기반 평균회귀 자동매매"""

    def __init__(self, process_noise=0.001, measurement_noise=1.0,
                 entry_sigma=2.0, exit_sigma=0.5):
        self.kf = KalmanFilter1D(process_noise, measurement_noise)
        self.entry_sigma = entry_sigma
        self.exit_sigma = exit_sigma
        self.position = 0  # 1: 롱, -1: 숏, 0: 없음
        self.trades = []

    def generate_signal(self, price):
        """매매 신호 생성"""
        estimate = self.kf.update(price)
        uncertainty = np.sqrt(self.kf.P)
        deviation = (price - estimate) / max(uncertainty, 1e-8)

        signal = 0
        if self.position == 0:
            if deviation < -self.entry_sigma:
                signal = 1   # 매수 진입
            elif deviation > self.entry_sigma:
                signal = -1  # 매도 진입
        else:
            if abs(deviation) < self.exit_sigma:
                signal = -self.position  # 청산

        return signal, estimate, deviation

    def backtest(self, prices):
        """백테스트 실행"""
        self.kf.x = prices[0]
        equity = [0]
        entry_price = 0

        for i, price in enumerate(prices):
            signal, estimate, dev = self.generate_signal(price)

            if signal != 0 and self.position == 0:
                # 신규 진입
                self.position = signal
                entry_price = price
                self.trades.append({
                    'entry_idx': i, 'entry_price': price,
                    'side': 'long' if signal > 0 else 'short'
                })

            elif signal != 0 and self.position != 0:
                # 청산
                pnl = (price - entry_price) * self.position
                self.trades[-1].update({
                    'exit_idx': i, 'exit_price': price, 'pnl': pnl
                })
                self.position = 0

            unrealized = (price - entry_price) * self.position 
                         if self.position != 0 else 0
            equity.append(equity[-1] + unrealized - equity[-1]
                          if self.position == 0 else equity[-1])

        return self._calc_metrics(prices)

    def _calc_metrics(self, prices):
        """성과 지표 계산"""
        closed = [t for t in self.trades if 'pnl' in t]
        if not closed:
            return {}
        pnls = [t['pnl'] for t in closed]
        wins = [p for p in pnls if p > 0]
        return {
            'total_trades': len(closed),
            'win_rate': len(wins) / len(closed) * 100,
            'total_pnl': sum(pnls),
            'avg_pnl': np.mean(pnls),
            'max_win': max(pnls),
            'max_loss': min(pnls),
            'profit_factor': sum(wins) / abs(sum(p for p in pnls if p < 0))
                             if any(p < 0 for p in pnls) else float('inf')
        }

# 백테스트 실행
strategy = KalmanTradingStrategy(
    process_noise=0.005,
    measurement_noise=2.0,
    entry_sigma=2.0,
    exit_sigma=0.3
)
results = strategy.backtest(noisy_price)
for k, v in results.items():
    print(f"{k}: {v:.2f}" if isinstance(v, float) else f"{k}: {v}")

평균회귀 자동매매 전략과 핵심 원리는 같지만, 칼만 필터는 Z-Score보다 적응적이라는 차이가 있습니다. 시장 변동성이 변할 때 자동으로 민감도가 조절됩니다.

다변량 칼만 필터: 스프레드 추적

2개 자산의 동적 헤지 비율을 칼만 필터로 추정하면, 정적 회귀분석보다 훨씬 정확한 페어트레이딩이 가능합니다.

class KalmanPairTrader:
    """칼만 필터 동적 헤지 비율 추정"""

    def __init__(self, delta=1e-4, Ve=0.001):
        """
        delta: 상태 전이 공분산 스케일
        Ve: 관측 노이즈 분산
        """
        self.delta = delta
        self.Ve = Ve
        self.beta = np.zeros(2)   # [intercept, slope]
        self.P = np.eye(2)        # 공분산 행렬
        self.R = None

    def update(self, x, y):
        """
        x: 독립 자산 가격
        y: 종속 자산 가격
        헤지 비율(beta[1])을 동적으로 추정
        """
        F = np.array([1.0, x])  # 관측 벡터

        # 예측
        if self.R is None:
            self.R = np.eye(2)
        self.R = self.P + self.delta * np.eye(2)

        # 칼만 이득
        y_pred = F @ self.beta
        residual = y - y_pred
        S = F @ self.R @ F + self.Ve
        K = (self.R @ F) / S

        # 업데이트
        self.beta = self.beta + K * residual
        self.P = self.R - np.outer(K, F) @ self.R

        spread = residual / np.sqrt(S)
        return self.beta[1], self.beta[0], spread

    def run_pair(self, prices_x, prices_y, entry_z=2.0, exit_z=0.5):
        """페어트레이딩 시뮬레이션"""
        hedge_ratios = []
        spreads = []
        signals = []
        position = 0

        for x, y in zip(prices_x, prices_y):
            ratio, intercept, z = self.update(x, y)
            hedge_ratios.append(ratio)
            spreads.append(z)

            if position == 0:
                if z < -entry_z:
                    position = 1   # 스프레드 매수
                elif z > entry_z:
                    position = -1  # 스프레드 매도
            else:
                if abs(z) < exit_z:
                    position = 0   # 청산

            signals.append(position)

        return {
            'hedge_ratios': np.array(hedge_ratios),
            'spreads': np.array(spreads),
            'signals': np.array(signals)
        }

정적 OLS 회귀 대비 칼만 필터의 동적 헤지 비율은 구조적 변화(structural break)에 자동으로 적응합니다. 페어트레이딩 전략에서 공적분 기반 접근의 한계를 보완하는 강력한 방법입니다.

Q와 R 파라미터 최적화

칼만 필터 성능은 Q(프로세스 노이즈)R(관측 노이즈) 설정에 크게 의존합니다.

from scipy.optimize import minimize

def optimize_kalman_params(prices, train_ratio=0.7):
    """Q, R 파라미터 최적화 (로그 우도 최대화)"""
    n_train = int(len(prices) * train_ratio)
    train = prices[:n_train]

    def neg_log_likelihood(params):
        Q, R = np.exp(params)  # 양수 보장
        kf = KalmanFilter1D(Q, R)
        kf.x = train[0]
        ll = 0
        for p in train[1:]:
            P_pred = kf.P + Q
            innovation = p - kf.x
            S = P_pred + R
            ll += -0.5 * (np.log(2 * np.pi * S) + innovation**2 / S)
            kf.update(p)
        return -ll

    result = minimize(neg_log_likelihood, x0=[np.log(0.01), np.log(1.0)],
                      method='Nelder-Mead')
    Q_opt, R_opt = np.exp(result.x)
    return Q_opt, R_opt

Q_opt, R_opt = optimize_kalman_params(noisy_price)
print(f"최적 Q: {Q_opt:.6f}, R: {R_opt:.6f}")
print(f"Q/R 비율: {Q_opt/R_opt:.6f} (작을수록 평활)")
Q/R 비율 필터 특성 적합한 시장
높음 (>0.1) 빠른 반응, 노이즈에 민감 추세 시장, 고변동성
중간 (0.01~0.1) 균형잡힌 평활 일반적 시장 조건
낮음 (<0.01) 강한 평활, 느린 반응 횡보 시장, 저변동성

실전 적용 시 주의사항

  • 레짐 변화 대응: 시장 구조가 급변하면 칼만 필터가 적응하는 데 시간이 걸립니다. Q를 동적으로 조절하거나 HMM과 결합하세요.
  • 오버피팅 주의: Q, R을 인샘플에서 최적화하면 아웃샘플 성능이 떨어질 수 있습니다. 워크포워드 검증이 필수입니다.
  • 거래비용 반영: 칼만 필터의 빈번한 신호 변화는 높은 거래비용을 유발합니다. 임계값을 충분히 크게 설정하세요.
  • 다른 필터와 비교: EMA는 구현이 간단하고, 파티클 필터는 비선형 시스템에 유리합니다. 전략 복잡도에 맞는 도구를 선택하세요.
  • pykalman 활용: 실전에서는 pykalman 라이브러리의 EM 알고리즘으로 파라미터를 자동 추정할 수 있습니다.

칼만 필터는 단순 이동평균의 한계를 넘어, 적응적이고 확률론적인 가격 추정을 가능하게 합니다. 평균회귀 전략, 페어트레이딩, 추세 추종 모두에 활용할 수 있는 퀀트 트레이더의 핵심 도구입니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux