Drizzle ORM 트랜잭션 심화

Drizzle 트랜잭션이란?

Drizzle ORM의 트랜잭션은 TypeScript 타입 시스템과 완벽히 통합된 타입 안전 트랜잭션 API를 제공합니다. 콜백 기반의 자동 커밋/롤백부터 중첩 트랜잭션(Savepoint), Prepared Statement 캐싱까지 — ORM 수준에서 데이터 정합성과 성능을 동시에 확보할 수 있습니다.

기본 트랜잭션

콜백 방식 — 자동 커밋/롤백

import { drizzle } from 'drizzle-orm/node-postgres';
import { orders, orderItems, inventory } from './schema';

const db = drizzle(pool);

// 콜백 내에서 예외 발생 시 자동 롤백
const result = await db.transaction(async (tx) => {
  // 1. 주문 생성
  const [order] = await tx.insert(orders).values({
    customerId: 'cust-1',
    totalAmount: 15000,
    status: 'pending',
  }).returning();

  // 2. 주문 상품 등록
  await tx.insert(orderItems).values([
    { orderId: order.id, productId: 'prod-1', quantity: 2, price: 5000 },
    { orderId: order.id, productId: 'prod-2', quantity: 1, price: 5000 },
  ]);

  // 3. 재고 차감
  const [stock] = await tx
    .select()
    .from(inventory)
    .where(eq(inventory.productId, 'prod-1'))
    .for('update');  // SELECT FOR UPDATE — 비관적 잠금

  if (stock.quantity < 2) {
    throw new Error('Insufficient stock');  // 자동 롤백
  }

  await tx.update(inventory)
    .set({ quantity: stock.quantity - 2 })
    .where(eq(inventory.productId, 'prod-1'));

  return order;  // 자동 커밋 후 반환
});

트랜잭션 설정 — 격리 수준

// PostgreSQL 격리 수준 설정
await db.transaction(async (tx) => {
  // 결제 처리 — Serializable로 최고 수준 격리
  await tx.update(accounts)
    .set({ balance: sql`balance - ${amount}` })
    .where(eq(accounts.id, fromId));

  await tx.update(accounts)
    .set({ balance: sql`balance + ${amount}` })
    .where(eq(accounts.id, toId));
}, {
  isolationLevel: 'serializable',
  accessMode: 'read write',
  deferrable: false,
});

// Read Only 트랜잭션 — 리포트 쿼리에 적합
const report = await db.transaction(async (tx) => {
  const totalSales = await tx
    .select({ total: sql<number>`SUM(total_amount)` })
    .from(orders)
    .where(gte(orders.createdAt, startOfMonth));

  const topProducts = await tx
    .select({
      productId: orderItems.productId,
      count: sql<number>`COUNT(*)`,
    })
    .from(orderItems)
    .groupBy(orderItems.productId)
    .orderBy(desc(sql`COUNT(*)`))
    .limit(10);

  return { totalSales, topProducts };
}, {
  isolationLevel: 'repeatable read',
  accessMode: 'read only',
});

중첩 트랜잭션 — Savepoint

Drizzle은 트랜잭션 내에서 tx.transaction()을 호출하면 자동으로 SAVEPOINT를 사용합니다.

await db.transaction(async (tx) => {
  // 주문 생성 (외부 트랜잭션)
  const [order] = await tx.insert(orders).values({
    customerId: 'cust-1',
    totalAmount: 30000,
    status: 'pending',
  }).returning();

  // 포인트 차감 시도 (중첩 트랜잭션 = SAVEPOINT)
  try {
    await tx.transaction(async (stx) => {
      const [user] = await stx
        .select()
        .from(users)
        .where(eq(users.id, 'cust-1'));

      if (user.points < 1000) {
        throw new Error('Not enough points');
        // ROLLBACK TO SAVEPOINT — 외부 트랜잭션은 유지
      }

      await stx.update(users)
        .set({ points: user.points - 1000 })
        .where(eq(users.id, 'cust-1'));

      await stx.update(orders)
        .set({ pointsUsed: 1000, totalAmount: 29000 })
        .where(eq(orders.id, order.id));
    });
  } catch (e) {
    // 포인트 차감 실패해도 주문은 계속 진행
    console.log('Points deduction failed, continuing without points');
  }

  // 결제 처리 (외부 트랜잭션 계속)
  await tx.insert(payments).values({
    orderId: order.id,
    amount: 30000,
    method: 'card',
  });
});

Prepared Statement

Drizzle의 .prepare()는 쿼리를 미리 컴파일해 반복 실행 성능을 최적화합니다.

import { placeholder } from 'drizzle-orm';

// Prepared Statement 정의
const findOrdersByCustomer = db
  .select()
  .from(orders)
  .where(
    and(
      eq(orders.customerId, placeholder('customerId')),
      eq(orders.status, placeholder('status')),
    )
  )
  .orderBy(desc(orders.createdAt))
  .limit(placeholder('limit'))
  .prepare('find_orders_by_customer');

// 실행 — SQL 파싱 없이 바로 실행
const pendingOrders = await findOrdersByCustomer.execute({
  customerId: 'cust-1',
  status: 'pending',
  limit: 20,
});

const completedOrders = await findOrdersByCustomer.execute({
  customerId: 'cust-1',
  status: 'completed',
  limit: 50,
});

// INSERT Prepared Statement
const insertOrder = db
  .insert(orders)
  .values({
    customerId: placeholder('customerId'),
    totalAmount: placeholder('totalAmount'),
    status: placeholder('status'),
  })
  .returning()
  .prepare('insert_order');

const [newOrder] = await insertOrder.execute({
  customerId: 'cust-2',
  totalAmount: 25000,
  status: 'pending',
});

NestJS 통합 패턴

// drizzle.module.ts
import { Module, Global } from '@nestjs/common';
import { drizzle, NodePgDatabase } from 'drizzle-orm/node-postgres';
import { Pool } from 'pg';
import * as schema from './schema';

export const DRIZZLE = Symbol('DRIZZLE');

@Global()
@Module({
  providers: [
    {
      provide: DRIZZLE,
      useFactory: () => {
        const pool = new Pool({ connectionString: process.env.DATABASE_URL });
        return drizzle(pool, { schema });
      },
    },
  ],
  exports: [DRIZZLE],
})
export class DrizzleModule {}

// order.service.ts
@Injectable()
export class OrderService {
  constructor(@Inject(DRIZZLE) private db: NodePgDatabase<typeof schema>) {}

  async createOrder(cmd: CreateOrderDto) {
    return this.db.transaction(async (tx) => {
      const [order] = await tx.insert(schema.orders)
        .values({
          customerId: cmd.customerId,
          totalAmount: cmd.totalAmount,
          status: 'pending',
        })
        .returning();

      if (cmd.items.length > 0) {
        await tx.insert(schema.orderItems).values(
          cmd.items.map(item => ({
            orderId: order.id,
            productId: item.productId,
            quantity: item.quantity,
            price: item.price,
          })),
        );
      }

      return order;
    });
  }

  // Prepared Statement를 서비스 레벨에서 캐싱
  private findByStatusStmt = this.db
    .select()
    .from(schema.orders)
    .where(eq(schema.orders.status, placeholder('status')))
    .prepare('find_by_status');

  async findByStatus(status: string) {
    return this.findByStatusStmt.execute({ status });
  }
}

배치 처리와 트랜잭션

// 대량 데이터 처리 — 청크 단위 트랜잭션
async function batchUpdatePrices(updates: { id: string; price: number }[]) {
  const CHUNK_SIZE = 500;

  for (let i = 0; i < updates.length; i += CHUNK_SIZE) {
    const chunk = updates.slice(i, i + CHUNK_SIZE);

    await db.transaction(async (tx) => {
      for (const { id, price } of chunk) {
        await tx.update(products)
          .set({ price, updatedAt: new Date() })
          .where(eq(products.id, id));
      }
    });

    console.log(`Processed ${Math.min(i + CHUNK_SIZE, updates.length)}/${updates.length}`);
  }
}

// sql 템플릿으로 벌크 UPSERT
await db.transaction(async (tx) => {
  await tx.insert(products)
    .values(bulkData)
    .onConflictDoUpdate({
      target: products.sku,
      set: {
        price: sql`excluded.price`,
        stock: sql`excluded.stock`,
        updatedAt: new Date(),
      },
    });
});

에러 핸들링 패턴

// 재시도 가능한 트랜잭션 래퍼
async function withRetry<T>(
  fn: () => Promise<T>,
  maxRetries = 3,
): Promise<T> {
  for (let attempt = 1; attempt <= maxRetries; attempt++) {
    try {
      return await fn();
    } catch (error: any) {
      // Serialization failure — 재시도 가능
      if (error.code === '40001' && attempt < maxRetries) {
        const delay = Math.pow(2, attempt) * 100;
        await new Promise(r => setTimeout(r, delay));
        continue;
      }
      // Deadlock detected — 재시도 가능
      if (error.code === '40P01' && attempt < maxRetries) {
        continue;
      }
      throw error;
    }
  }
  throw new Error('Max retries exceeded');
}

// 사용
const order = await withRetry(() =>
  db.transaction(async (tx) => {
    // serializable 격리에서 충돌 시 자동 재시도
    return processPayment(tx, orderId, amount);
  }, { isolationLevel: 'serializable' })
);

운영 팁

  • 트랜잭션 범위 최소화: DB 외 작업(HTTP 호출, 파일 I/O)은 트랜잭션 밖에서 실행
  • Prepared Statement 이름: 고유한 이름으로 충돌 방지 — 커넥션 풀에서 재사용됨
  • 격리 수준 선택: 대부분 read committed(기본값)로 충분, 금융 로직만 serializable
  • SELECT FOR UPDATE: 동시성 제어가 필요한 재고/잔액에 타입 안전 쿼리와 함께 사용
  • 커넥션 풀: 트랜잭션이 커넥션을 점유하므로 풀 크기 튜닝 필수

정리

Drizzle ORM의 트랜잭션은 TypeScript 타입 시스템 위에서 동작하는 안전한 데이터 정합성 도구입니다. 콜백 기반 자동 커밋/롤백, SAVEPOINT 중첩 트랜잭션, 격리 수준 제어, Prepared Statement 캐싱을 조합하면 성능과 안정성을 동시에 확보할 수 있습니다. NestJS와 통합하면 서비스 레벨에서 선언적이고 재사용 가능한 트랜잭션 패턴을 구축할 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux