PostgreSQL LISTEN/NOTIFY 실시간 이벤트

PostgreSQL LISTEN/NOTIFY란?

PostgreSQL의 LISTEN/NOTIFY는 별도의 메시지 브로커 없이 데이터베이스 레벨에서 실시간 이벤트를 발행·구독할 수 있는 내장 메커니즘입니다. Redis Pub/Sub이나 Kafka 없이도 가벼운 실시간 알림을 구현할 수 있어, 마이크로서비스 간 통신이나 캐시 무효화에 매우 유용합니다.

이 글에서는 LISTEN/NOTIFY의 내부 동작 원리, Trigger 기반 자동 알림, NestJS·Spring 통합, 그리고 프로덕션 운영 시 주의할 점까지 심층적으로 다룹니다.

기본 동작 원리

LISTEN/NOTIFY는 PostgreSQL의 비동기 알림 시스템입니다. 핵심 구조는 단순합니다:

-- 세션 A: 채널 구독
LISTEN order_events;

-- 세션 B: 이벤트 발행 (payload는 8000바이트 제한)
NOTIFY order_events, '{"id":42,"action":"created","total":15000}';

-- 세션 A: 알림 수신
-- Asynchronous notification "order_events" with payload
-- "{"id":42,"action":"created","total":15000}" received from server process with PID 1234.

핵심 특성을 정리하면:

  • 트랜잭션 안전: NOTIFY는 트랜잭션이 COMMIT될 때만 전달됩니다. ROLLBACK되면 알림도 사라집니다.
  • Payload 제한: 최대 8,000바이트. 큰 데이터는 ID만 전달하고 수신 측에서 조회하는 패턴을 씁니다.
  • 중복 제거: 같은 트랜잭션 내 동일 채널·동일 payload의 NOTIFY는 자동으로 하나로 합쳐집니다.
  • 메모리 큐: 알림은 디스크에 저장되지 않습니다. 수신자가 없으면 버려지고, 연결이 끊기면 유실됩니다.

Trigger 기반 자동 알림 패턴

가장 강력한 패턴은 Trigger + NOTIFY 조합입니다. 테이블 변경 시 자동으로 이벤트를 발행합니다:

-- 범용 알림 함수
CREATE OR REPLACE FUNCTION notify_table_change()
RETURNS TRIGGER AS $$
DECLARE
  payload JSON;
  record RECORD;
BEGIN
  -- INSERT/UPDATE는 NEW, DELETE는 OLD
  IF (TG_OP = 'DELETE') THEN
    record = OLD;
  ELSE
    record = NEW;
  END IF;

  payload = json_build_object(
    'table', TG_TABLE_NAME,
    'action', TG_OP,
    'id', record.id,
    'timestamp', NOW()
  );

  -- 채널명은 테이블명_changes
  PERFORM pg_notify(TG_TABLE_NAME || '_changes', payload::TEXT);

  RETURN record;
END;
$$ LANGUAGE plpgsql;

-- orders 테이블에 트리거 연결
CREATE TRIGGER orders_notify
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION notify_table_change();

이제 orders 테이블에 INSERT/UPDATE/DELETE가 발생하면 orders_changes 채널로 자동 알림이 전송됩니다.

NestJS 통합: pg 드라이버 직접 연결

NestJS에서 LISTEN/NOTIFY를 사용하려면 ORM과 별도의 전용 연결이 필요합니다. ORM의 커넥션 풀은 LISTEN 상태를 유지할 수 없기 때문입니다:

// pg-listener.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Client } from 'pg';

@Injectable()
export class PgListenerService implements OnModuleInit, OnModuleDestroy {
  private client: Client;
  private readonly logger = new Logger(PgListenerService.name);
  private reconnectTimer: NodeJS.Timeout;

  constructor(private readonly eventEmitter: EventEmitter2) {}

  async onModuleInit() {
    await this.connect();
  }

  private async connect() {
    this.client = new Client({
      connectionString: process.env.DATABASE_URL,
      // LISTEN 전용 연결 — 쿼리 실행 금지
      application_name: 'pg-listener',
    });

    this.client.on('notification', (msg) => {
      try {
        const payload = JSON.parse(msg.payload);
        // NestJS EventEmitter로 내부 전파
        this.eventEmitter.emit(`db.${msg.channel}`, payload);
        this.logger.debug(`[${msg.channel}] ${msg.payload}`);
      } catch (e) {
        this.logger.error(`Invalid payload: ${msg.payload}`);
      }
    });

    this.client.on('error', (err) => {
      this.logger.error(`PG listener error: ${err.message}`);
      this.scheduleReconnect();
    });

    await this.client.connect();

    // 구독할 채널 등록
    await this.client.query('LISTEN orders_changes');
    await this.client.query('LISTEN payments_changes');
    await this.client.query('LISTEN inventory_changes');

    this.logger.log('PG LISTEN channels registered');
  }

  private scheduleReconnect() {
    clearTimeout(this.reconnectTimer);
    this.reconnectTimer = setTimeout(async () => {
      this.logger.warn('Reconnecting PG listener...');
      try {
        await this.client?.end().catch(() => {});
        await this.connect();
      } catch (e) {
        this.logger.error(`Reconnect failed: ${e.message}`);
        this.scheduleReconnect(); // 재시도
      }
    }, 5000);
  }

  async onModuleDestroy() {
    clearTimeout(this.reconnectTimer);
    await this.client?.end();
  }
}

이벤트를 소비하는 서비스는 @OnEvent 데코레이터로 깔끔하게 처리합니다:

// order-sync.service.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';

@Injectable()
export class OrderSyncService {
  @OnEvent('db.orders_changes')
  async handleOrderChange(payload: {
    table: string;
    action: string;
    id: number;
    timestamp: string;
  }) {
    switch (payload.action) {
      case 'INSERT':
        await this.syncNewOrder(payload.id);
        break;
      case 'UPDATE':
        await this.invalidateCache(payload.id);
        break;
      case 'DELETE':
        await this.removeFromSearch(payload.id);
        break;
    }
  }
}

Spring Boot 통합: R2DBC 리액티브 구독

Spring에서는 R2DBC의 리액티브 스트림으로 LISTEN/NOTIFY를 자연스럽게 처리할 수 있습니다:

// PgNotificationListener.java
@Component
@RequiredArgsConstructor
public class PgNotificationListener implements SmartLifecycle {

    private final ConnectionFactory connectionFactory;
    private final ApplicationEventPublisher publisher;
    private Disposable subscription;
    private volatile boolean running = false;

    @Override
    public void start() {
        this.running = true;
        this.subscription = Mono.from(connectionFactory.create())
            .flatMapMany(conn -> {
                PostgresqlConnection pgConn = (PostgresqlConnection) conn;
                return pgConn.createStatement("LISTEN orders_changes")
                    .execute()
                    .thenMany(pgConn.getNotifications());
            })
            .retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(5))
                .maxBackoff(Duration.ofMinutes(1)))
            .subscribe(notification -> {
                String payload = notification.getParameter();
                publisher.publishEvent(
                    new DbChangeEvent(notification.getName(), payload)
                );
            });
    }

    @Override
    public void stop() {
        this.running = false;
        if (subscription != null) subscription.dispose();
    }

    @Override
    public boolean isRunning() { return running; }
}

// 이벤트 수신
@EventListener
public void onDbChange(DbChangeEvent event) {
    log.info("Channel: {}, Payload: {}", event.channel(), event.payload());
}

PgBouncer 환경에서의 함정

LISTEN/NOTIFY는 PgBouncer의 transaction 모드와 호환되지 않습니다. 이것은 프로덕션에서 가장 흔한 실수입니다:

PgBouncer 모드 LISTEN 지원 설명
session ✅ 지원 세션이 고정되므로 LISTEN 상태 유지 가능
transaction ❌ 불가 트랜잭션 종료 시 연결이 반환되어 LISTEN 해제
statement ❌ 불가 문장 단위 전환 — LISTEN 유지 불가

해결책: LISTEN 전용 연결은 PgBouncer를 우회하여 PostgreSQL에 직접 연결합니다. 일반 쿼리용 풀과 분리하세요:

# 일반 쿼리: PgBouncer 경유 (transaction mode)
DATABASE_URL=postgresql://pgbouncer:6432/mydb

# LISTEN 전용: PostgreSQL 직접 연결
LISTEN_DATABASE_URL=postgresql://postgres:5432/mydb

캐시 무효화 패턴

LISTEN/NOTIFY의 가장 실용적인 활용은 분산 캐시 무효화입니다. 여러 앱 인스턴스가 동일 DB를 바라볼 때, 한 인스턴스의 변경을 다른 인스턴스에 즉시 전파합니다:

-- 캐시 무효화 전용 트리거
CREATE OR REPLACE FUNCTION invalidate_cache()
RETURNS TRIGGER AS $$
BEGIN
  PERFORM pg_notify('cache_invalidate', json_build_object(
    'cache_key', TG_TABLE_NAME || ':' || NEW.id,
    'region', TG_TABLE_NAME
  )::TEXT);
  RETURN NEW;
END;
$$ LANGUAGE plpgsql;

-- 캐시 대상 테이블에 적용
CREATE TRIGGER products_cache_invalidate
AFTER UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION invalidate_cache();
// NestJS 캐시 무효화 핸들러
@OnEvent('db.cache_invalidate')
async handleCacheInvalidate(payload: { cache_key: string; region: string }) {
  await this.cacheManager.del(payload.cache_key);
  this.logger.debug(`Cache invalidated: ${payload.cache_key}`);
}

LISTEN/NOTIFY vs 다른 실시간 솔루션

기준 PG LISTEN/NOTIFY Redis Pub/Sub Kafka
추가 인프라 ❌ 불필요 Redis 필요 Kafka 클러스터 필요
메시지 영속성 ❌ 없음 ❌ 없음 ✅ 디스크 저장
트랜잭션 연동 ✅ 네이티브 ❌ 별도 구현 ❌ Outbox 패턴
처리량 ~수천/초 ~수십만/초 ~수백만/초
적합 용도 캐시 무효화, 소규모 알림 실시간 채팅, 이벤트 브로드캐스트 이벤트 소싱, 대규모 스트리밍

프로덕션 운영 체크리스트

LISTEN/NOTIFY를 프로덕션에 도입할 때 반드시 확인해야 할 사항입니다:

  • 전용 연결 분리: LISTEN 연결은 커넥션 풀과 별도로 관리합니다. 풀의 연결은 LISTEN 상태를 유지할 수 없습니다.
  • 자동 재연결: 네트워크 단절, PostgreSQL 재시작 시 자동으로 재연결하고 LISTEN을 재등록해야 합니다.
  • Payload 크기 감시: 8KB 제한을 초과하면 에러가 발생합니다. 큰 데이터는 ID만 전달하세요.
  • 큐 오버플로: pg_notification_queue_usage() 함수로 알림 큐 사용량을 모니터링합니다. 기본 큐 크기는 8GB이지만, 소비자가 느리면 가득 찰 수 있습니다.
  • 멱등성 보장: 재연결 시 알림이 유실될 수 있으므로, 주기적인 폴링과 병행하는 하이브리드 패턴을 권장합니다.
-- 큐 사용량 모니터링
SELECT pg_notification_queue_usage();
-- 0.000123  → 0.01% 사용 중 (정상)

-- 알림 큐 관련 설정 확인
SHOW max_notify_queue_pages;  -- PostgreSQL 15+

하이브리드 패턴: LISTEN + Polling

알림 유실에 대비한 안전한 아키텍처는 LISTEN/NOTIFY를 즉시 알림으로, Polling을 보험으로 함께 사용합니다:

// 하이브리드 이벤트 소비자
@Injectable()
export class HybridEventConsumer implements OnModuleInit {
  private lastProcessedAt = new Date();

  // 1) 실시간 알림 (LISTEN/NOTIFY)
  @OnEvent('db.orders_changes')
  async onRealtime(payload: { id: number; action: string }) {
    await this.processEvent(payload);
    this.lastProcessedAt = new Date();
  }

  // 2) 보험 폴링 (30초마다)
  @Cron('*/30 * * * * *')
  async pollMissedEvents() {
    const missed = await this.orderRepo.find({
      where: { updatedAt: MoreThan(this.lastProcessedAt) },
    });
    for (const order of missed) {
      await this.processEvent({
        id: order.id,
        action: 'POLL_CATCHUP',
      });
    }
    this.lastProcessedAt = new Date();
  }

  private async processEvent(payload: { id: number; action: string }) {
    // 멱등성 키로 중복 처리 방지
    const idempotencyKey = `order:${payload.id}:${payload.action}`;
    if (await this.cache.has(idempotencyKey)) return;
    
    await this.cache.set(idempotencyKey, true, 60);
    // 실제 비즈니스 로직
    await this.syncOrder(payload.id);
  }
}

마무리

PostgreSQL LISTEN/NOTIFY는 인프라 추가 없이 실시간 이벤트를 처리할 수 있는 강력한 도구입니다. 특히 캐시 무효화, 소규모 알림, DB 변경 감지에 적합하며, 트랜잭션과 네이티브로 연동되는 점이 가장 큰 장점입니다.

다만 메시지 영속성이 없고 처리량에 한계가 있으므로, 미션 크리티컬한 이벤트에는 Kafka나 Outbox 패턴을 병행하세요. LISTEN/NOTIFY + Polling 하이브리드 패턴이 실무에서 가장 안정적인 선택입니다.

관련 글로 PostgreSQL Advisory Lock 심화Redis Pub/Sub 실시간 메시징도 함께 참고하세요.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux