Prisma Pulse란?
Prisma Pulse는 데이터베이스의 변경 사항을 실시간 이벤트 스트림으로 수신하는 기능이다. PostgreSQL의 CDC(Change Data Capture)를 추상화해서, prisma.model.stream() 한 줄로 INSERT/UPDATE/DELETE 이벤트를 구독할 수 있다. 폴링이나 트리거 없이 리액티브한 백엔드를 구축할 수 있다.
Prisma Accelerate는 Prisma Client의 쿼리를 엣지 캐시 레이어로 감싸는 서비스다. 글로벌 CDN에서 DB 쿼리 결과를 캐싱하고, 커넥션 풀링까지 제공한다. Serverless 환경에서 콜드 스타트 시 DB 연결 지연 문제를 근본적으로 해결한다.
Pulse 설정: 실시간 DB 이벤트 구독
Step 1: Prisma Client 확장
// schema.prisma
generator client {
provider = "prisma-client-js"
}
datasource db {
provider = "postgresql"
url = env("DATABASE_URL")
}
model Order {
id Int @id @default(autoincrement())
product String
amount Int
status OrderStatus @default(PENDING)
userId String
createdAt DateTime @default(now())
updatedAt DateTime @updatedAt
}
enum OrderStatus {
PENDING
CONFIRMED
SHIPPED
DELIVERED
CANCELLED
}
// prisma 확장 설치
// npm install @prisma/extension-pulse
import { PrismaClient } from '@prisma/client';
import { withPulse } from '@prisma/extension-pulse';
const prisma = new PrismaClient().$extends(
withPulse({
apiKey: process.env.PULSE_API_KEY,
})
);
Step 2: 이벤트 스트림 구독
// order-stream.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';
@Injectable()
export class OrderStreamService implements OnModuleInit, OnModuleDestroy {
private subscription: any;
constructor(
private readonly prisma: ExtendedPrismaService,
private readonly notificationService: NotificationService,
private readonly analyticsService: AnalyticsService,
) {}
async onModuleInit() {
await this.startStreaming();
}
async startStreaming() {
// 모든 Order 변경 이벤트 구독
this.subscription = await this.prisma.order.stream();
for await (const event of this.subscription) {
switch (event.action) {
case 'create':
await this.handleOrderCreated(event.created);
break;
case 'update':
await this.handleOrderUpdated(event.after);
break;
case 'delete':
await this.handleOrderDeleted(event.deleted);
break;
}
}
}
private async handleOrderCreated(order: Order) {
// 주문 생성 시 실시간 알림
await this.notificationService.sendPush(
order.userId,
`주문 #${order.id}이 접수되었습니다`
);
// 분석 이벤트 발행
await this.analyticsService.track('order_created', {
orderId: order.id,
amount: order.amount,
});
}
private async handleOrderUpdated(order: Order) {
if (order.status === 'SHIPPED') {
await this.notificationService.sendPush(
order.userId,
`주문 #${order.id}이 배송 시작되었습니다`
);
}
}
async onModuleDestroy() {
this.subscription?.stop();
}
}
필터링: 특정 조건의 이벤트만 구독
// 특정 상태 변경만 구독
const stream = await prisma.order.stream({
create: {
status: { equals: 'CONFIRMED' }, // CONFIRMED 생성만
},
update: {
after: {
status: { in: ['SHIPPED', 'DELIVERED'] }, // 배송 관련만
},
},
});
// 특정 사용자의 변경만 구독
const userStream = await prisma.order.stream({
create: { userId: { equals: 'user_abc' } },
update: { after: { userId: { equals: 'user_abc' } } },
});
Pulse 실전 패턴: SSE 실시간 알림
// order.controller.ts
@Controller('orders')
export class OrderController {
@Sse('stream/:userId')
async streamOrders(
@Param('userId') userId: string,
): Promise<Observable<MessageEvent>> {
const stream = await this.prisma.order.stream({
create: { userId: { equals: userId } },
update: { after: { userId: { equals: userId } } },
});
return new Observable((subscriber) => {
(async () => {
for await (const event of stream) {
subscriber.next({
data: JSON.stringify({
action: event.action,
order: event.action === 'create'
? event.created
: event.after,
}),
});
}
})();
return () => stream.stop();
});
}
}
// 프론트엔드
const eventSource = new EventSource('/orders/stream/user_abc');
eventSource.onmessage = (event) => {
const { action, order } = JSON.parse(event.data);
if (action === 'update' && order.status === 'SHIPPED') {
showToast(`주문 #${order.id} 배송 시작!`);
}
};
Accelerate: 엣지 캐싱 + 커넥션 풀링
설정
// npm install @prisma/extension-accelerate
import { PrismaClient } from '@prisma/client';
import { withAccelerate } from '@prisma/extension-accelerate';
const prisma = new PrismaClient().$extends(withAccelerate());
// .env
// DATABASE_URL="prisma://accelerate.prisma-data.net/?api_key=xxx"
// DIRECT_DATABASE_URL="postgresql://user:pass@db:5432/mydb"
캐시 전략
@Injectable()
export class ProductService {
constructor(private readonly prisma: PrismaService) {}
// TTL 캐시: 60초 동안 캐시 유지
async getProducts() {
return this.prisma.product.findMany({
cacheStrategy: {
ttl: 60, // 60초 TTL
},
});
}
// SWR(Stale-While-Revalidate): 60초간 캐시 사용,
// 이후 백그라운드에서 갱신하면서 기존 캐시 반환
async getPopularProducts() {
return this.prisma.product.findMany({
where: { featured: true },
orderBy: { salesCount: 'desc' },
take: 20,
cacheStrategy: {
ttl: 60,
swr: 300, // 5분간 stale 데이터 허용
},
});
}
// 캐시 없이 직접 쿼리 (쓰기 후 읽기 일관성)
async getOrderById(id: number) {
return this.prisma.order.findUnique({
where: { id },
// cacheStrategy 생략 → 캐시 미사용
});
}
}
캐시 전략 선택 가이드
| 데이터 유형 | TTL | SWR | 이유 |
|---|---|---|---|
| 상품 목록 | 60s | 300s | 변경 드묾, 약간의 지연 허용 |
| 사용자 프로필 | 30s | 120s | 수정 가능하지만 빈번하지 않음 |
| 설정/메타데이터 | 300s | 600s | 거의 변경 안 됨 |
| 주문 상세 | 0 | 0 | 실시간 정확성 필요 |
| 재고 수량 | 0 | 0 | 동시성 이슈, 캐시 위험 |
Pulse + Accelerate 조합: 캐시 무효화
Pulse로 변경을 감지하고, Accelerate 캐시를 즉시 무효화하는 패턴이다.
@Injectable()
export class CacheInvalidationService implements OnModuleInit {
constructor(
private readonly prisma: ExtendedPrismaService,
private readonly cacheManager: Cache,
) {}
async onModuleInit() {
// 상품 변경 시 관련 캐시 즉시 무효화
const productStream = await this.prisma.product.stream();
for await (const event of productStream) {
const productId = event.action === 'delete'
? event.deleted.id
: (event.action === 'create' ? event.created.id : event.after.id);
// 로컬 캐시 무효화
await this.cacheManager.del(`product:${productId}`);
await this.cacheManager.del('products:featured');
await this.cacheManager.del('products:list');
console.log(
`Cache invalidated for product ${productId} (${event.action})`
);
}
}
}
Serverless 환경: 커넥션 풀링
Accelerate의 또 다른 핵심 가치는 커넥션 풀링이다. AWS Lambda, Vercel, Cloudflare Workers 등에서 매 요청마다 DB 커넥션을 열면 커넥션 고갈이 발생한다.
// 기존 Serverless 문제
// Lambda 100개 동시 실행 → DB 커넥션 100개 → max_connections 초과
// Accelerate 사용 시
// Lambda 100개 → Accelerate 프록시 → DB 커넥션 10개 (풀링)
// schema.prisma
datasource db {
provider = "postgresql"
url = env("DATABASE_URL") // Accelerate 프록시 URL
directUrl = env("DIRECT_DATABASE_URL") // 마이그레이션용 직접 연결
}
// 마이그레이션은 직접 연결로
// npx prisma migrate deploy (DIRECT_DATABASE_URL 사용)
// 런타임 쿼리는 Accelerate 프록시로
// prisma.user.findMany() → Accelerate → DB
Self-hosted 대안: pg_notify + Prisma
Prisma Pulse는 유료 서비스다. Self-hosted 환경에서는 PostgreSQL의 LISTEN/NOTIFY로 유사한 기능을 구현할 수 있다.
// pg-notify.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Client } from 'pg';
@Injectable()
export class PgNotifyService implements OnModuleInit {
private client: Client;
async onModuleInit() {
this.client = new Client(process.env.DIRECT_DATABASE_URL);
await this.client.connect();
// 채널 구독
await this.client.query('LISTEN order_changes');
this.client.on('notification', (msg) => {
const payload = JSON.parse(msg.payload);
this.handleChange(payload);
});
}
private handleChange(payload: {
action: string;
table: string;
data: any;
}) {
console.log(`${payload.action} on ${payload.table}:`, payload.data);
}
}
// PostgreSQL 트리거 생성 (마이그레이션)
-- CREATE OR REPLACE FUNCTION notify_order_change()
-- RETURNS TRIGGER AS $$
-- BEGIN
-- PERFORM pg_notify('order_changes', json_build_object(
-- 'action', TG_OP,
-- 'table', TG_TABLE_NAME,
-- 'data', CASE TG_OP
-- WHEN 'DELETE' THEN row_to_json(OLD)
-- ELSE row_to_json(NEW)
-- END
-- )::text);
-- RETURN NEW;
-- END;
-- $$ LANGUAGE plpgsql;
--
-- CREATE TRIGGER order_notify_trigger
-- AFTER INSERT OR UPDATE OR DELETE ON "Order"
-- FOR EACH ROW EXECUTE FUNCTION notify_order_change();
Accelerate 대안: Redis 캐시 레이어
// Prisma Client Extension으로 캐시 레이어 구현
import { Prisma } from '@prisma/client';
const cacheExtension = Prisma.defineExtension({
name: 'cache',
query: {
$allModels: {
async findMany({ model, args, query }) {
const cacheKey = `${model}:findMany:${JSON.stringify(args)}`;
// Redis에서 캐시 조회
const cached = await redis.get(cacheKey);
if (cached) return JSON.parse(cached);
// 캐시 미스: DB 쿼리 실행
const result = await query(args);
// 결과 캐싱 (TTL 60초)
await redis.setex(cacheKey, 60, JSON.stringify(result));
return result;
},
async findUnique({ model, args, query }) {
const cacheKey = `${model}:findUnique:${JSON.stringify(args.where)}`;
const cached = await redis.get(cacheKey);
if (cached) return JSON.parse(cached);
const result = await query(args);
if (result) {
await redis.setex(cacheKey, 60, JSON.stringify(result));
}
return result;
},
},
},
});
const prisma = new PrismaClient().$extends(cacheExtension);
Prisma Pulse와 Accelerate는 데이터베이스의 실시간성과 성능을 동시에 잡는 조합이다. Pulse로 변경을 즉시 감지하고, Accelerate로 읽기 성능을 극대화한다. SaaS 서비스라면 Prisma 플랫폼을, Self-hosted 환경이라면 pg_notify + Redis 캐시 조합으로 동일한 아키텍처를 구현할 수 있다.