NestJS Temporal 워크플로 심화

Temporal이란?

Temporal은 장기 실행 워크플로, 분산 트랜잭션, 비동기 작업 오케스트레이션을 위한 오픈소스 플랫폼입니다. 마이크로서비스 간의 복잡한 비즈니스 프로세스를 내구성 있는 코드로 작성할 수 있게 해줍니다. 네트워크 장애, 서비스 크래시, 타임아웃 등을 프레임워크 레벨에서 처리하므로, 개발자는 비즈니스 로직에만 집중할 수 있습니다.

구분 일반 큐 기반 Temporal
상태 관리 직접 DB에 저장 자동 (이벤트 소싱)
재시도 수동 구현 선언적 정책
보상 트랜잭션 수동 롤백 로직 Saga 패턴 내장
타이머/스케줄 cron + 상태 테이블 sleep/timer 내장
가시성 로그 추적 Web UI 실시간 모니터링

의존성 설정

NestJS에서 Temporal TypeScript SDK를 사용합니다.

# 핵심 패키지
npm install @temporalio/client @temporalio/worker @temporalio/workflow @temporalio/activity

# NestJS 통합
npm install @temporalio/common
// temporal.config.ts
export const TEMPORAL_CONFIG = {
  address: process.env.TEMPORAL_ADDRESS || 'localhost:7233',
  namespace: 'default',
  taskQueue: 'order-processing',
};

Workflow 정의

Workflow는 비즈니스 프로세스의 전체 흐름을 정의합니다. Temporal의 핵심 규칙: Workflow 코드는 결정론적(deterministic)이어야 합니다. 네트워크 호출, 랜덤, 현재 시간 등은 Activity로 분리합니다.

// workflows/order-processing.workflow.ts
import { proxyActivities, sleep, ApplicationFailure } from '@temporalio/workflow';
import type { OrderActivities } from '../activities/order.activities';

const {
  validateOrder,
  reserveInventory,
  processPayment,
  sendConfirmation,
  releaseInventory,
  refundPayment,
} = proxyActivities<OrderActivities>({
  startToCloseTimeout: '30s',
  retry: {
    maximumAttempts: 3,
    initialInterval: '1s',
    backoffCoefficient: 2,
    maximumInterval: '30s',
    nonRetryableErrorTypes: ['InvalidOrderError', 'FraudDetectedError'],
  },
});

export async function orderProcessingWorkflow(
  orderId: string,
  customerId: string,
  items: OrderItem[],
): Promise<OrderResult> {
  // 1. 주문 유효성 검증
  const validation = await validateOrder(orderId, items);
  if (!validation.valid) {
    throw ApplicationFailure.nonRetryable(
      `유효성 검증 실패: ${validation.reason}`,
      'InvalidOrderError',
    );
  }

  // 2. 재고 예약 (보상 트랜잭션 시작점)
  const reservationId = await reserveInventory(orderId, items);

  try {
    // 3. 결제 처리
    const paymentId = await processPayment(orderId, customerId, items);

    // 4. 확인 메일 발송
    await sendConfirmation(orderId, customerId);

    return {
      status: 'COMPLETED',
      orderId,
      paymentId,
      reservationId,
    };
  } catch (error) {
    // 보상: 결제 실패 시 재고 해제
    await releaseInventory(reservationId);

    // 결제까지 됐다면 환불
    if (error.paymentId) {
      await refundPayment(error.paymentId);
    }

    throw error;
  }
}

Activity 구현

Activity는 실제 부수 효과가 있는 작업입니다. DB 쿼리, API 호출, 파일 I/O 등을 수행합니다. NestJS 서비스를 Activity로 래핑합니다.

// activities/order.activities.ts
export interface OrderActivities {
  validateOrder(orderId: string, items: OrderItem[]): Promise<ValidationResult>;
  reserveInventory(orderId: string, items: OrderItem[]): Promise<string>;
  processPayment(orderId: string, customerId: string, items: OrderItem[]): Promise<string>;
  sendConfirmation(orderId: string, customerId: string): Promise<void>;
  releaseInventory(reservationId: string): Promise<void>;
  refundPayment(paymentId: string): Promise<void>;
}

// NestJS 서비스를 Activity로 변환
export function createOrderActivities(
  orderService: OrderService,
  inventoryService: InventoryService,
  paymentService: PaymentService,
  notificationService: NotificationService,
): OrderActivities {
  return {
    async validateOrder(orderId, items) {
      return orderService.validate(orderId, items);
    },

    async reserveInventory(orderId, items) {
      const reservation = await inventoryService.reserve(orderId, items);
      return reservation.id;
    },

    async processPayment(orderId, customerId, items) {
      const total = items.reduce((sum, i) => sum + i.price * i.quantity, 0);
      const payment = await paymentService.charge(customerId, total);
      return payment.transactionId;
    },

    async sendConfirmation(orderId, customerId) {
      await notificationService.sendOrderConfirmation(orderId, customerId);
    },

    async releaseInventory(reservationId) {
      await inventoryService.release(reservationId);
    },

    async refundPayment(paymentId) {
      await paymentService.refund(paymentId);
    },
  };
}

NestJS 모듈 통합

Worker와 Client를 NestJS 모듈로 통합하여 라이프사이클을 관리합니다.

// temporal/temporal.module.ts
@Module({
  providers: [
    TemporalWorkerService,
    TemporalClientService,
  ],
  exports: [TemporalClientService],
})
export class TemporalModule {}
// temporal/temporal-worker.service.ts
@Injectable()
export class TemporalWorkerService implements OnModuleInit, OnModuleDestroy {
  private worker: Worker;

  constructor(
    private readonly orderService: OrderService,
    private readonly inventoryService: InventoryService,
    private readonly paymentService: PaymentService,
    private readonly notificationService: NotificationService,
  ) {}

  async onModuleInit() {
    const connection = await NativeConnection.connect({
      address: TEMPORAL_CONFIG.address,
    });

    this.worker = await Worker.create({
      connection,
      namespace: TEMPORAL_CONFIG.namespace,
      taskQueue: TEMPORAL_CONFIG.taskQueue,
      workflowsPath: require.resolve('../workflows'),
      activities: createOrderActivities(
        this.orderService,
        this.inventoryService,
        this.paymentService,
        this.notificationService,
      ),
    });

    // Worker 시작 (논블로킹)
    this.worker.run().catch((err) => {
      console.error('Temporal Worker 에러:', err);
    });
  }

  async onModuleDestroy() {
    this.worker?.shutdown();
  }
}
// temporal/temporal-client.service.ts
@Injectable()
export class TemporalClientService implements OnModuleInit {
  private client: Client;

  async onModuleInit() {
    const connection = await Connection.connect({
      address: TEMPORAL_CONFIG.address,
    });
    this.client = new Client({ connection });
  }

  async startOrderWorkflow(
    orderId: string,
    customerId: string,
    items: OrderItem[],
  ): Promise<string> {
    const handle = await this.client.workflow.start(orderProcessingWorkflow, {
      args: [orderId, customerId, items],
      taskQueue: TEMPORAL_CONFIG.taskQueue,
      workflowId: `order-${orderId}`,
      // 워크플로 실행 제한 시간
      workflowExecutionTimeout: '1h',
    });

    return handle.workflowId;
  }

  async getWorkflowStatus(workflowId: string) {
    const handle = this.client.workflow.getHandle(workflowId);
    const description = await handle.describe();
    return {
      status: description.status.name,
      startTime: description.startTime,
      executionTime: description.executionTime,
    };
  }

  async cancelWorkflow(workflowId: string) {
    const handle = this.client.workflow.getHandle(workflowId);
    await handle.cancel();
  }
}

Saga 패턴: 분산 보상 트랜잭션

복잡한 분산 트랜잭션에서 Saga 패턴을 구조적으로 구현할 수 있습니다. 각 단계의 실행과 보상을 쌍으로 관리합니다.

// workflows/saga-order.workflow.ts
export async function sagaOrderWorkflow(order: OrderRequest): Promise<OrderResult> {
  const compensations: Array<() => Promise<void>> = [];

  try {
    // Step 1: 재고 예약
    const reservationId = await reserveInventory(order.id, order.items);
    compensations.push(() => releaseInventory(reservationId));

    // Step 2: 결제
    const paymentId = await processPayment(order.id, order.total);
    compensations.push(() => refundPayment(paymentId));

    // Step 3: 배송 요청
    const shippingId = await createShipment(order.id, order.address);
    compensations.push(() => cancelShipment(shippingId));

    // Step 4: 포인트 적립
    await addLoyaltyPoints(order.customerId, order.total);
    compensations.push(() => deductLoyaltyPoints(order.customerId, order.total));

    return { status: 'SUCCESS', paymentId, shippingId };
  } catch (error) {
    // 역순으로 보상 실행
    for (const compensate of compensations.reverse()) {
      try {
        await compensate();
      } catch (compError) {
        console.error('보상 트랜잭션 실패:', compError);
        // 보상 실패도 Temporal이 재시도 처리
      }
    }
    throw error;
  }
}

장기 실행 워크플로: Timer와 Signal

Temporal의 sleep은 일반 setTimeout과 달리 내구성이 있습니다. 서버가 재시작되어도 정확한 시점에 깨어납니다.

// workflows/subscription-renewal.workflow.ts
import { defineSignal, setHandler, sleep, condition } from '@temporalio/workflow';

// 외부에서 워크플로에 신호를 보내는 Signal 정의
export const cancelSubscriptionSignal = defineSignal('cancelSubscription');
export const upgradeSignal = defineSignal<[string]>('upgradePlan');

export async function subscriptionWorkflow(
  customerId: string,
  plan: string,
): Promise<void> {
  let cancelled = false;
  let currentPlan = plan;

  // Signal 핸들러 등록
  setHandler(cancelSubscriptionSignal, () => {
    cancelled = true;
  });

  setHandler(upgradeSignal, (newPlan: string) => {
    currentPlan = newPlan;
  });

  while (!cancelled) {
    // 결제 처리
    await chargeSubscription(customerId, currentPlan);
    await sendReceipt(customerId, currentPlan);

    // 30일 대기 (서버 재시작에도 안전)
    const timedOut = await condition(() => cancelled, '30 days');
    if (timedOut) break;
  }

  // 구독 취소 처리
  await processUnsubscription(customerId);
}
// 컨트롤러에서 Signal 전송
@Post('subscriptions/:id/cancel')
async cancelSubscription(@Param('id') subscriptionId: string) {
  const handle = this.temporalClient.workflow.getHandle(
    `subscription-${subscriptionId}`,
  );
  await handle.signal(cancelSubscriptionSignal);
  return { message: '구독 취소 요청됨' };
}

테스트

Temporal은 테스트 환경을 내장 제공하여 실제 서버 없이 워크플로를 검증할 수 있습니다.

import { TestWorkflowEnvironment } from '@temporalio/testing';
import { Worker } from '@temporalio/worker';

describe('OrderProcessingWorkflow', () => {
  let testEnv: TestWorkflowEnvironment;

  beforeAll(async () => {
    testEnv = await TestWorkflowEnvironment.createLocal();
  });

  afterAll(async () => {
    await testEnv?.teardown();
  });

  it('정상 주문 처리 흐름', async () => {
    const mockActivities = {
      validateOrder: async () => ({ valid: true }),
      reserveInventory: async () => 'res-001',
      processPayment: async () => 'pay-001',
      sendConfirmation: async () => {},
      releaseInventory: async () => {},
      refundPayment: async () => {},
    };

    const worker = await Worker.create({
      connection: testEnv.nativeConnection,
      taskQueue: 'test',
      workflowsPath: require.resolve('../workflows'),
      activities: mockActivities,
    });

    const result = await worker.runUntil(
      testEnv.client.workflow.execute(orderProcessingWorkflow, {
        args: ['ord-1', 'cust-1', [{ productId: 'p1', quantity: 1, price: 10000 }]],
        taskQueue: 'test',
        workflowId: 'test-order-1',
      }),
    );

    expect(result.status).toBe('COMPLETED');
    expect(result.paymentId).toBe('pay-001');
  });
});

실전 팁

  • Workflow ID 설계: 비즈니스 키를 사용하여 멱등성을 보장합니다. order-${orderId}처럼 설계하면 같은 주문에 대해 중복 워크플로가 생성되지 않습니다
  • Activity 타임아웃: startToCloseTimeout(실행 시간), scheduleToCloseTimeout(대기+실행), heartbeatTimeout(장기 작업 생존 확인)을 적절히 설정합니다
  • Workflow 버전 관리: 프로덕션에서 실행 중인 워크플로가 있으면 코드 변경 시 patched() API로 버전을 관리합니다
  • 헥사고날 아키텍처와 결합: Activity를 아웃바운드 어댑터로 취급하면 도메인 로직과 인프라를 깔끔하게 분리할 수 있습니다
  • K8s 배포: Temporal Server는 Helm Chart로 배포하고, Worker는 Deployment로 스케일링합니다

마무리

Temporal은 분산 시스템에서 가장 까다로운 문제 — 장애 복구, 보상 트랜잭션, 장기 실행 프로세스 — 를 코드 레벨에서 해결합니다. NestJS의 DI와 모듈 시스템을 활용하면 Worker와 Client를 자연스럽게 통합할 수 있으며, 복잡한 비즈니스 프로세스를 읽기 쉬운 순차 코드로 표현할 수 있습니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux