Prisma Pulse·Accelerate 실전

Prisma Pulse란?

Prisma Pulse는 데이터베이스의 변경 사항을 실시간 이벤트 스트림으로 수신하는 기능이다. PostgreSQL의 CDC(Change Data Capture)를 추상화해서, prisma.model.stream() 한 줄로 INSERT/UPDATE/DELETE 이벤트를 구독할 수 있다. 폴링이나 트리거 없이 리액티브한 백엔드를 구축할 수 있다.

Prisma Accelerate는 Prisma Client의 쿼리를 엣지 캐시 레이어로 감싸는 서비스다. 글로벌 CDN에서 DB 쿼리 결과를 캐싱하고, 커넥션 풀링까지 제공한다. Serverless 환경에서 콜드 스타트 시 DB 연결 지연 문제를 근본적으로 해결한다.

Pulse 설정: 실시간 DB 이벤트 구독

Step 1: Prisma Client 확장

// schema.prisma
generator client {
  provider = "prisma-client-js"
}

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

model Order {
  id        Int      @id @default(autoincrement())
  product   String
  amount    Int
  status    OrderStatus @default(PENDING)
  userId    String
  createdAt DateTime @default(now())
  updatedAt DateTime @updatedAt
}

enum OrderStatus {
  PENDING
  CONFIRMED
  SHIPPED
  DELIVERED
  CANCELLED
}
// prisma 확장 설치
// npm install @prisma/extension-pulse

import { PrismaClient } from '@prisma/client';
import { withPulse } from '@prisma/extension-pulse';

const prisma = new PrismaClient().$extends(
  withPulse({
    apiKey: process.env.PULSE_API_KEY,
  })
);

Step 2: 이벤트 스트림 구독

// order-stream.service.ts
import { Injectable, OnModuleInit, OnModuleDestroy } from '@nestjs/common';

@Injectable()
export class OrderStreamService implements OnModuleInit, OnModuleDestroy {
  private subscription: any;

  constructor(
    private readonly prisma: ExtendedPrismaService,
    private readonly notificationService: NotificationService,
    private readonly analyticsService: AnalyticsService,
  ) {}

  async onModuleInit() {
    await this.startStreaming();
  }

  async startStreaming() {
    // 모든 Order 변경 이벤트 구독
    this.subscription = await this.prisma.order.stream();

    for await (const event of this.subscription) {
      switch (event.action) {
        case 'create':
          await this.handleOrderCreated(event.created);
          break;
        case 'update':
          await this.handleOrderUpdated(event.after);
          break;
        case 'delete':
          await this.handleOrderDeleted(event.deleted);
          break;
      }
    }
  }

  private async handleOrderCreated(order: Order) {
    // 주문 생성 시 실시간 알림
    await this.notificationService.sendPush(
      order.userId,
      `주문 #${order.id}이 접수되었습니다`
    );
    // 분석 이벤트 발행
    await this.analyticsService.track('order_created', {
      orderId: order.id,
      amount: order.amount,
    });
  }

  private async handleOrderUpdated(order: Order) {
    if (order.status === 'SHIPPED') {
      await this.notificationService.sendPush(
        order.userId,
        `주문 #${order.id}이 배송 시작되었습니다`
      );
    }
  }

  async onModuleDestroy() {
    this.subscription?.stop();
  }
}

필터링: 특정 조건의 이벤트만 구독

// 특정 상태 변경만 구독
const stream = await prisma.order.stream({
  create: {
    status: { equals: 'CONFIRMED' },  // CONFIRMED 생성만
  },
  update: {
    after: {
      status: { in: ['SHIPPED', 'DELIVERED'] },  // 배송 관련만
    },
  },
});

// 특정 사용자의 변경만 구독
const userStream = await prisma.order.stream({
  create: { userId: { equals: 'user_abc' } },
  update: { after: { userId: { equals: 'user_abc' } } },
});

Pulse 실전 패턴: SSE 실시간 알림

// order.controller.ts
@Controller('orders')
export class OrderController {

  @Sse('stream/:userId')
  async streamOrders(
    @Param('userId') userId: string,
  ): Promise<Observable<MessageEvent>> {
    const stream = await this.prisma.order.stream({
      create: { userId: { equals: userId } },
      update: { after: { userId: { equals: userId } } },
    });

    return new Observable((subscriber) => {
      (async () => {
        for await (const event of stream) {
          subscriber.next({
            data: JSON.stringify({
              action: event.action,
              order: event.action === 'create'
                ? event.created
                : event.after,
            }),
          });
        }
      })();

      return () => stream.stop();
    });
  }
}

// 프론트엔드
const eventSource = new EventSource('/orders/stream/user_abc');
eventSource.onmessage = (event) => {
  const { action, order } = JSON.parse(event.data);
  if (action === 'update' && order.status === 'SHIPPED') {
    showToast(`주문 #${order.id} 배송 시작!`);
  }
};

Accelerate: 엣지 캐싱 + 커넥션 풀링

설정

// npm install @prisma/extension-accelerate

import { PrismaClient } from '@prisma/client';
import { withAccelerate } from '@prisma/extension-accelerate';

const prisma = new PrismaClient().$extends(withAccelerate());

// .env
// DATABASE_URL="prisma://accelerate.prisma-data.net/?api_key=xxx"
// DIRECT_DATABASE_URL="postgresql://user:pass@db:5432/mydb"

캐시 전략

@Injectable()
export class ProductService {
  constructor(private readonly prisma: PrismaService) {}

  // TTL 캐시: 60초 동안 캐시 유지
  async getProducts() {
    return this.prisma.product.findMany({
      cacheStrategy: {
        ttl: 60,  // 60초 TTL
      },
    });
  }

  // SWR(Stale-While-Revalidate): 60초간 캐시 사용,
  // 이후 백그라운드에서 갱신하면서 기존 캐시 반환
  async getPopularProducts() {
    return this.prisma.product.findMany({
      where: { featured: true },
      orderBy: { salesCount: 'desc' },
      take: 20,
      cacheStrategy: {
        ttl: 60,
        swr: 300,  // 5분간 stale 데이터 허용
      },
    });
  }

  // 캐시 없이 직접 쿼리 (쓰기 후 읽기 일관성)
  async getOrderById(id: number) {
    return this.prisma.order.findUnique({
      where: { id },
      // cacheStrategy 생략 → 캐시 미사용
    });
  }
}

캐시 전략 선택 가이드

데이터 유형 TTL SWR 이유
상품 목록 60s 300s 변경 드묾, 약간의 지연 허용
사용자 프로필 30s 120s 수정 가능하지만 빈번하지 않음
설정/메타데이터 300s 600s 거의 변경 안 됨
주문 상세 0 0 실시간 정확성 필요
재고 수량 0 0 동시성 이슈, 캐시 위험

Pulse + Accelerate 조합: 캐시 무효화

Pulse로 변경을 감지하고, Accelerate 캐시를 즉시 무효화하는 패턴이다.

@Injectable()
export class CacheInvalidationService implements OnModuleInit {

  constructor(
    private readonly prisma: ExtendedPrismaService,
    private readonly cacheManager: Cache,
  ) {}

  async onModuleInit() {
    // 상품 변경 시 관련 캐시 즉시 무효화
    const productStream = await this.prisma.product.stream();

    for await (const event of productStream) {
      const productId = event.action === 'delete'
        ? event.deleted.id
        : (event.action === 'create' ? event.created.id : event.after.id);

      // 로컬 캐시 무효화
      await this.cacheManager.del(`product:${productId}`);
      await this.cacheManager.del('products:featured');
      await this.cacheManager.del('products:list');

      console.log(
        `Cache invalidated for product ${productId} (${event.action})`
      );
    }
  }
}

Serverless 환경: 커넥션 풀링

Accelerate의 또 다른 핵심 가치는 커넥션 풀링이다. AWS Lambda, Vercel, Cloudflare Workers 등에서 매 요청마다 DB 커넥션을 열면 커넥션 고갈이 발생한다.

// 기존 Serverless 문제
// Lambda 100개 동시 실행 → DB 커넥션 100개 → max_connections 초과

// Accelerate 사용 시
// Lambda 100개 → Accelerate 프록시 → DB 커넥션 10개 (풀링)

// schema.prisma
datasource db {
  provider  = "postgresql"
  url       = env("DATABASE_URL")       // Accelerate 프록시 URL
  directUrl = env("DIRECT_DATABASE_URL") // 마이그레이션용 직접 연결
}

// 마이그레이션은 직접 연결로
// npx prisma migrate deploy (DIRECT_DATABASE_URL 사용)

// 런타임 쿼리는 Accelerate 프록시로
// prisma.user.findMany() → Accelerate → DB

Self-hosted 대안: pg_notify + Prisma

Prisma Pulse는 유료 서비스다. Self-hosted 환경에서는 PostgreSQL의 LISTEN/NOTIFY로 유사한 기능을 구현할 수 있다.

// pg-notify.service.ts
import { Injectable, OnModuleInit } from '@nestjs/common';
import { Client } from 'pg';

@Injectable()
export class PgNotifyService implements OnModuleInit {
  private client: Client;

  async onModuleInit() {
    this.client = new Client(process.env.DIRECT_DATABASE_URL);
    await this.client.connect();

    // 채널 구독
    await this.client.query('LISTEN order_changes');

    this.client.on('notification', (msg) => {
      const payload = JSON.parse(msg.payload);
      this.handleChange(payload);
    });
  }

  private handleChange(payload: {
    action: string;
    table: string;
    data: any;
  }) {
    console.log(`${payload.action} on ${payload.table}:`, payload.data);
  }
}

// PostgreSQL 트리거 생성 (마이그레이션)
-- CREATE OR REPLACE FUNCTION notify_order_change()
-- RETURNS TRIGGER AS $$
-- BEGIN
--   PERFORM pg_notify('order_changes', json_build_object(
--     'action', TG_OP,
--     'table', TG_TABLE_NAME,
--     'data', CASE TG_OP
--       WHEN 'DELETE' THEN row_to_json(OLD)
--       ELSE row_to_json(NEW)
--     END
--   )::text);
--   RETURN NEW;
-- END;
-- $$ LANGUAGE plpgsql;
--
-- CREATE TRIGGER order_notify_trigger
-- AFTER INSERT OR UPDATE OR DELETE ON "Order"
-- FOR EACH ROW EXECUTE FUNCTION notify_order_change();

Accelerate 대안: Redis 캐시 레이어

// Prisma Client Extension으로 캐시 레이어 구현
import { Prisma } from '@prisma/client';

const cacheExtension = Prisma.defineExtension({
  name: 'cache',
  query: {
    $allModels: {
      async findMany({ model, args, query }) {
        const cacheKey = `${model}:findMany:${JSON.stringify(args)}`;
        
        // Redis에서 캐시 조회
        const cached = await redis.get(cacheKey);
        if (cached) return JSON.parse(cached);

        // 캐시 미스: DB 쿼리 실행
        const result = await query(args);
        
        // 결과 캐싱 (TTL 60초)
        await redis.setex(cacheKey, 60, JSON.stringify(result));
        
        return result;
      },
      async findUnique({ model, args, query }) {
        const cacheKey = `${model}:findUnique:${JSON.stringify(args.where)}`;
        const cached = await redis.get(cacheKey);
        if (cached) return JSON.parse(cached);

        const result = await query(args);
        if (result) {
          await redis.setex(cacheKey, 60, JSON.stringify(result));
        }
        return result;
      },
    },
  },
});

const prisma = new PrismaClient().$extends(cacheExtension);

Prisma Pulse와 Accelerate는 데이터베이스의 실시간성과 성능을 동시에 잡는 조합이다. Pulse로 변경을 즉시 감지하고, Accelerate로 읽기 성능을 극대화한다. SaaS 서비스라면 Prisma 플랫폼을, Self-hosted 환경이라면 pg_notify + Redis 캐시 조합으로 동일한 아키텍처를 구현할 수 있다.

관련 글: Prisma Client Extension 심화 | NestJS SSE 실시간 스트리밍

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux