PostgreSQL LISTEN/NOTIFY란?
PostgreSQL의 LISTEN/NOTIFY는 별도의 메시지 브로커 없이 데이터베이스 레벨에서 실시간 이벤트를 발행·구독할 수 있는 내장 메커니즘입니다. Redis Pub/Sub이나 Kafka 없이도 가벼운 실시간 알림을 구현할 수 있어, 마이크로서비스 간 통신이나 캐시 무효화에 매우 유용합니다.
이 글에서는 LISTEN/NOTIFY의 내부 동작 원리, Trigger 기반 자동 알림, NestJS·Spring 통합, 그리고 프로덕션 운영 시 주의할 점까지 심층적으로 다룹니다.
기본 동작 원리
LISTEN/NOTIFY는 PostgreSQL의 비동기 알림 시스템입니다. 핵심 구조는 단순합니다:
-- 세션 A: 채널 구독
LISTEN order_events;
-- 세션 B: 이벤트 발행 (payload는 8000바이트 제한)
NOTIFY order_events, '{"id":42,"action":"created","total":15000}';
-- 세션 A: 알림 수신
-- Asynchronous notification "order_events" with payload
-- "{"id":42,"action":"created","total":15000}" received from server process with PID 1234.
핵심 특성을 정리하면:
- 트랜잭션 안전: NOTIFY는 트랜잭션이
COMMIT될 때만 전달됩니다. ROLLBACK되면 알림도 사라집니다. - Payload 제한: 최대 8,000바이트. 큰 데이터는 ID만 전달하고 수신 측에서 조회하는 패턴을 씁니다.
- 중복 제거: 같은 트랜잭션 내 동일 채널·동일 payload의 NOTIFY는 자동으로 하나로 합쳐집니다.
- 메모리 큐: 알림은 디스크에 저장되지 않습니다. 수신자가 없으면 버려지고, 연결이 끊기면 유실됩니다.
Trigger 기반 자동 알림 패턴
가장 강력한 패턴은 Trigger + NOTIFY 조합입니다. 테이블 변경 시 자동으로 이벤트를 발행합니다:
-- 범용 알림 함수
CREATE OR REPLACE FUNCTION notify_table_change()
RETURNS TRIGGER AS $$
DECLARE
payload JSON;
record RECORD;
BEGIN
-- INSERT/UPDATE는 NEW, DELETE는 OLD
IF (TG_OP = 'DELETE') THEN
record = OLD;
ELSE
record = NEW;
END IF;
payload = json_build_object(
'table', TG_TABLE_NAME,
'action', TG_OP,
'id', record.id,
'timestamp', NOW()
);
-- 채널명은 테이블명_changes
PERFORM pg_notify(TG_TABLE_NAME || '_changes', payload::TEXT);
RETURN record;
END;
$$ LANGUAGE plpgsql;
-- orders 테이블에 트리거 연결
CREATE TRIGGER orders_notify
AFTER INSERT OR UPDATE OR DELETE ON orders
FOR EACH ROW EXECUTE FUNCTION notify_table_change();
이제 orders 테이블에 INSERT/UPDATE/DELETE가 발생하면 orders_changes 채널로 자동 알림이 전송됩니다.
NestJS 통합: pg 드라이버 직접 연결
NestJS에서 LISTEN/NOTIFY를 사용하려면 ORM과 별도의 전용 연결이 필요합니다. ORM의 커넥션 풀은 LISTEN 상태를 유지할 수 없기 때문입니다:
// pg-listener.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy, Logger } from '@nestjs/common';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Client } from 'pg';
@Injectable()
export class PgListenerService implements OnModuleInit, OnModuleDestroy {
private client: Client;
private readonly logger = new Logger(PgListenerService.name);
private reconnectTimer: NodeJS.Timeout;
constructor(private readonly eventEmitter: EventEmitter2) {}
async onModuleInit() {
await this.connect();
}
private async connect() {
this.client = new Client({
connectionString: process.env.DATABASE_URL,
// LISTEN 전용 연결 — 쿼리 실행 금지
application_name: 'pg-listener',
});
this.client.on('notification', (msg) => {
try {
const payload = JSON.parse(msg.payload);
// NestJS EventEmitter로 내부 전파
this.eventEmitter.emit(`db.${msg.channel}`, payload);
this.logger.debug(`[${msg.channel}] ${msg.payload}`);
} catch (e) {
this.logger.error(`Invalid payload: ${msg.payload}`);
}
});
this.client.on('error', (err) => {
this.logger.error(`PG listener error: ${err.message}`);
this.scheduleReconnect();
});
await this.client.connect();
// 구독할 채널 등록
await this.client.query('LISTEN orders_changes');
await this.client.query('LISTEN payments_changes');
await this.client.query('LISTEN inventory_changes');
this.logger.log('PG LISTEN channels registered');
}
private scheduleReconnect() {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = setTimeout(async () => {
this.logger.warn('Reconnecting PG listener...');
try {
await this.client?.end().catch(() => {});
await this.connect();
} catch (e) {
this.logger.error(`Reconnect failed: ${e.message}`);
this.scheduleReconnect(); // 재시도
}
}, 5000);
}
async onModuleDestroy() {
clearTimeout(this.reconnectTimer);
await this.client?.end();
}
}
이벤트를 소비하는 서비스는 @OnEvent 데코레이터로 깔끔하게 처리합니다:
// order-sync.service.ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
@Injectable()
export class OrderSyncService {
@OnEvent('db.orders_changes')
async handleOrderChange(payload: {
table: string;
action: string;
id: number;
timestamp: string;
}) {
switch (payload.action) {
case 'INSERT':
await this.syncNewOrder(payload.id);
break;
case 'UPDATE':
await this.invalidateCache(payload.id);
break;
case 'DELETE':
await this.removeFromSearch(payload.id);
break;
}
}
}
Spring Boot 통합: R2DBC 리액티브 구독
Spring에서는 R2DBC의 리액티브 스트림으로 LISTEN/NOTIFY를 자연스럽게 처리할 수 있습니다:
// PgNotificationListener.java
@Component
@RequiredArgsConstructor
public class PgNotificationListener implements SmartLifecycle {
private final ConnectionFactory connectionFactory;
private final ApplicationEventPublisher publisher;
private Disposable subscription;
private volatile boolean running = false;
@Override
public void start() {
this.running = true;
this.subscription = Mono.from(connectionFactory.create())
.flatMapMany(conn -> {
PostgresqlConnection pgConn = (PostgresqlConnection) conn;
return pgConn.createStatement("LISTEN orders_changes")
.execute()
.thenMany(pgConn.getNotifications());
})
.retryWhen(Retry.backoff(Long.MAX_VALUE, Duration.ofSeconds(5))
.maxBackoff(Duration.ofMinutes(1)))
.subscribe(notification -> {
String payload = notification.getParameter();
publisher.publishEvent(
new DbChangeEvent(notification.getName(), payload)
);
});
}
@Override
public void stop() {
this.running = false;
if (subscription != null) subscription.dispose();
}
@Override
public boolean isRunning() { return running; }
}
// 이벤트 수신
@EventListener
public void onDbChange(DbChangeEvent event) {
log.info("Channel: {}, Payload: {}", event.channel(), event.payload());
}
PgBouncer 환경에서의 함정
LISTEN/NOTIFY는 PgBouncer의 transaction 모드와 호환되지 않습니다. 이것은 프로덕션에서 가장 흔한 실수입니다:
| PgBouncer 모드 | LISTEN 지원 | 설명 |
|---|---|---|
session |
✅ 지원 | 세션이 고정되므로 LISTEN 상태 유지 가능 |
transaction |
❌ 불가 | 트랜잭션 종료 시 연결이 반환되어 LISTEN 해제 |
statement |
❌ 불가 | 문장 단위 전환 — LISTEN 유지 불가 |
해결책: LISTEN 전용 연결은 PgBouncer를 우회하여 PostgreSQL에 직접 연결합니다. 일반 쿼리용 풀과 분리하세요:
# 일반 쿼리: PgBouncer 경유 (transaction mode)
DATABASE_URL=postgresql://pgbouncer:6432/mydb
# LISTEN 전용: PostgreSQL 직접 연결
LISTEN_DATABASE_URL=postgresql://postgres:5432/mydb
캐시 무효화 패턴
LISTEN/NOTIFY의 가장 실용적인 활용은 분산 캐시 무효화입니다. 여러 앱 인스턴스가 동일 DB를 바라볼 때, 한 인스턴스의 변경을 다른 인스턴스에 즉시 전파합니다:
-- 캐시 무효화 전용 트리거
CREATE OR REPLACE FUNCTION invalidate_cache()
RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('cache_invalidate', json_build_object(
'cache_key', TG_TABLE_NAME || ':' || NEW.id,
'region', TG_TABLE_NAME
)::TEXT);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
-- 캐시 대상 테이블에 적용
CREATE TRIGGER products_cache_invalidate
AFTER UPDATE ON products
FOR EACH ROW EXECUTE FUNCTION invalidate_cache();
// NestJS 캐시 무효화 핸들러
@OnEvent('db.cache_invalidate')
async handleCacheInvalidate(payload: { cache_key: string; region: string }) {
await this.cacheManager.del(payload.cache_key);
this.logger.debug(`Cache invalidated: ${payload.cache_key}`);
}
LISTEN/NOTIFY vs 다른 실시간 솔루션
| 기준 | PG LISTEN/NOTIFY | Redis Pub/Sub | Kafka |
|---|---|---|---|
| 추가 인프라 | ❌ 불필요 | Redis 필요 | Kafka 클러스터 필요 |
| 메시지 영속성 | ❌ 없음 | ❌ 없음 | ✅ 디스크 저장 |
| 트랜잭션 연동 | ✅ 네이티브 | ❌ 별도 구현 | ❌ Outbox 패턴 |
| 처리량 | ~수천/초 | ~수십만/초 | ~수백만/초 |
| 적합 용도 | 캐시 무효화, 소규모 알림 | 실시간 채팅, 이벤트 브로드캐스트 | 이벤트 소싱, 대규모 스트리밍 |
프로덕션 운영 체크리스트
LISTEN/NOTIFY를 프로덕션에 도입할 때 반드시 확인해야 할 사항입니다:
- 전용 연결 분리: LISTEN 연결은 커넥션 풀과 별도로 관리합니다. 풀의 연결은 LISTEN 상태를 유지할 수 없습니다.
- 자동 재연결: 네트워크 단절, PostgreSQL 재시작 시 자동으로 재연결하고 LISTEN을 재등록해야 합니다.
- Payload 크기 감시: 8KB 제한을 초과하면 에러가 발생합니다. 큰 데이터는 ID만 전달하세요.
- 큐 오버플로:
pg_notification_queue_usage()함수로 알림 큐 사용량을 모니터링합니다. 기본 큐 크기는 8GB이지만, 소비자가 느리면 가득 찰 수 있습니다. - 멱등성 보장: 재연결 시 알림이 유실될 수 있으므로, 주기적인 폴링과 병행하는 하이브리드 패턴을 권장합니다.
-- 큐 사용량 모니터링
SELECT pg_notification_queue_usage();
-- 0.000123 → 0.01% 사용 중 (정상)
-- 알림 큐 관련 설정 확인
SHOW max_notify_queue_pages; -- PostgreSQL 15+
하이브리드 패턴: LISTEN + Polling
알림 유실에 대비한 안전한 아키텍처는 LISTEN/NOTIFY를 즉시 알림으로, Polling을 보험으로 함께 사용합니다:
// 하이브리드 이벤트 소비자
@Injectable()
export class HybridEventConsumer implements OnModuleInit {
private lastProcessedAt = new Date();
// 1) 실시간 알림 (LISTEN/NOTIFY)
@OnEvent('db.orders_changes')
async onRealtime(payload: { id: number; action: string }) {
await this.processEvent(payload);
this.lastProcessedAt = new Date();
}
// 2) 보험 폴링 (30초마다)
@Cron('*/30 * * * * *')
async pollMissedEvents() {
const missed = await this.orderRepo.find({
where: { updatedAt: MoreThan(this.lastProcessedAt) },
});
for (const order of missed) {
await this.processEvent({
id: order.id,
action: 'POLL_CATCHUP',
});
}
this.lastProcessedAt = new Date();
}
private async processEvent(payload: { id: number; action: string }) {
// 멱등성 키로 중복 처리 방지
const idempotencyKey = `order:${payload.id}:${payload.action}`;
if (await this.cache.has(idempotencyKey)) return;
await this.cache.set(idempotencyKey, true, 60);
// 실제 비즈니스 로직
await this.syncOrder(payload.id);
}
}
마무리
PostgreSQL LISTEN/NOTIFY는 인프라 추가 없이 실시간 이벤트를 처리할 수 있는 강력한 도구입니다. 특히 캐시 무효화, 소규모 알림, DB 변경 감지에 적합하며, 트랜잭션과 네이티브로 연동되는 점이 가장 큰 장점입니다.
다만 메시지 영속성이 없고 처리량에 한계가 있으므로, 미션 크리티컬한 이벤트에는 Kafka나 Outbox 패턴을 병행하세요. LISTEN/NOTIFY + Polling 하이브리드 패턴이 실무에서 가장 안정적인 선택입니다.
관련 글로 PostgreSQL Advisory Lock 심화와 Redis Pub/Sub 실시간 메시징도 함께 참고하세요.