TypeORM Subscriber 트랜잭션 훅

TypeORM Subscriber 트랜잭션 훅이란?

TypeORM의 EntitySubscriber는 엔티티 변경 이벤트를 가로채는 옵저버 패턴입니다. 단순 @BeforeInsert/@AfterInsert 데코레이터와 달리, Subscriber는 트랜잭션 커밋/롤백 시점을 정확히 감지할 수 있어 감사 로그, 이벤트 발행, 캐시 무효화 등 부수 효과를 안전하게 처리할 수 있습니다.

1. Entity Listener vs Subscriber

두 메커니즘의 차이를 이해하는 것이 핵심입니다.

특성 Entity Listener (@BeforeInsert 등) Subscriber (EntitySubscriberInterface)
등록 위치 엔티티 클래스 내부 별도 클래스
DI 지원 불가 (NestJS DI 접근 불가) 가능 (NestJS Injectable)
트랜잭션 훅 없음 afterTransactionCommit/Rollback
QueryBuilder 이벤트 미발화 미발화 (save/remove만)
여러 엔티티 감시 각 엔티티에 개별 선언 listenTo()로 타겟 지정 또는 전체

2. 기본 Subscriber 구현

// subscribers/audit.subscriber.ts
import {
  EntitySubscriberInterface,
  EventSubscriber,
  InsertEvent,
  UpdateEvent,
  RemoveEvent,
} from 'typeorm';
import { Order } from '../entities/order.entity';

@EventSubscriber()
export class OrderSubscriber implements EntitySubscriberInterface<Order> {

  // 이 Subscriber가 감시할 엔티티 (생략하면 모든 엔티티)
  listenTo() {
    return Order;
  }

  // INSERT 전: 기본값 세팅
  beforeInsert(event: InsertEvent<Order>) {
    const order = event.entity;
    if (!order.orderNumber) {
      order.orderNumber = `ORD-${Date.now()}`;
    }
    order.version = 1;
  }

  // INSERT 후: ID가 할당된 상태
  afterInsert(event: InsertEvent<Order>) {
    console.log(`Order created: ${event.entity.id}`);
  }

  // UPDATE 전: 변경 필드 감지
  beforeUpdate(event: UpdateEvent<Order>) {
    // event.databaseEntity = DB의 현재 값
    // event.entity = 업데이트할 새 값 (partial일 수 있음)
    if (event.entity) {
      event.entity.updatedAt = new Date();
    }
  }

  // DELETE 후
  afterRemove(event: RemoveEvent<Order>) {
    console.log(`Order removed: ${event.entityId}`);
  }
}

3. 트랜잭션 훅: 커밋 후 안전한 부수 효과

가장 중요한 패턴: 이벤트 발행이나 외부 API 호출은 반드시 afterTransactionCommit에서 수행해야 합니다. afterInsert에서 하면 트랜잭션이 롤백되어도 이벤트가 이미 발행된 상태가 됩니다.

@EventSubscriber()
export class OrderEventSubscriber implements EntitySubscriberInterface<Order> {

  // 트랜잭션 중 발생한 이벤트를 임시 저장
  private pendingEvents: Map<string, DomainEvent[]> = new Map();

  listenTo() { return Order; }

  afterInsert(event: InsertEvent<Order>) {
    // 트랜잭션 ID로 이벤트 그룹핑
    const txId = this.getTxId(event);
    const events = this.pendingEvents.get(txId) || [];
    events.push({
      type: 'ORDER_CREATED',
      payload: { orderId: event.entity.id, total: event.entity.total },
      timestamp: new Date(),
    });
    this.pendingEvents.set(txId, events);
  }

  afterUpdate(event: UpdateEvent<Order>) {
    if (!event.entity) return;

    const txId = this.getTxId(event);
    const events = this.pendingEvents.get(txId) || [];

    // 상태 변경 감지
    const oldStatus = event.databaseEntity?.status;
    const newStatus = (event.entity as Order).status;

    if (oldStatus && newStatus && oldStatus !== newStatus) {
      events.push({
        type: 'ORDER_STATUS_CHANGED',
        payload: { 
          orderId: event.entity.id,
          from: oldStatus,
          to: newStatus,
        },
        timestamp: new Date(),
      });
    }
    this.pendingEvents.set(txId, events);
  }

  // ✅ 트랜잭션 커밋 후에만 이벤트 발행
  afterTransactionCommit(event: any) {
    const txId = this.getTxId(event);
    const events = this.pendingEvents.get(txId);
    if (events?.length) {
      // 여기서 EventEmitter, 메시지 큐 등으로 발행
      events.forEach(e => this.publishEvent(e));
      this.pendingEvents.delete(txId);
    }
  }

  // ❌ 롤백 시 이벤트 폐기
  afterTransactionRollback(event: any) {
    const txId = this.getTxId(event);
    this.pendingEvents.delete(txId);
  }

  private getTxId(event: any): string {
    return event.queryRunner?.connection?.name || 'default';
  }

  private publishEvent(event: DomainEvent) {
    // EventEmitter2, BullMQ 등으로 전달
    console.log('Publishing:', event.type, event.payload);
  }
}

4. NestJS DI 통합

TypeORM Subscriber에서 NestJS 서비스를 주입받으려면 DataSource에 수동 등록해야 합니다.

// subscribers/audit-log.subscriber.ts
@Injectable()
@EventSubscriber()
export class AuditLogSubscriber implements EntitySubscriberInterface {

  constructor(
    private dataSource: DataSource,
    private eventEmitter: EventEmitter2,
    @InjectRedis() private redis: Redis,
  ) {
    // 핵심: DataSource에 직접 등록
    dataSource.subscribers.push(this);
  }

  // 모든 엔티티 감시 (listenTo 생략)

  afterInsert(event: InsertEvent<any>) {
    this.logChange('INSERT', event);
  }

  afterUpdate(event: UpdateEvent<any>) {
    this.logChange('UPDATE', event);
  }

  afterRemove(event: RemoveEvent<any>) {
    this.logChange('DELETE', event);
  }

  afterTransactionCommit() {
    // 캐시 무효화
    this.redis.del('entity-cache:*');
  }

  private logChange(action: string, event: any) {
    const entityName = event.metadata.tableName;
    const entityId = event.entity?.id || event.entityId;

    this.eventEmitter.emit('audit.log', {
      action,
      entity: entityName,
      entityId,
      timestamp: new Date(),
      changes: action === 'UPDATE' 
        ? this.diffChanges(event.databaseEntity, event.entity)
        : undefined,
    });
  }

  private diffChanges(before: any, after: any): Record<string, any> {
    if (!before || !after) return {};
    const changes: Record<string, any> = {};
    for (const key of Object.keys(after)) {
      if (before[key] !== after[key]) {
        changes[key] = { from: before[key], to: after[key] };
      }
    }
    return changes;
  }
}
// auth.module.ts — Module에 provider로 등록
@Module({
  providers: [
    AuditLogSubscriber,  // NestJS가 DI로 생성
  ],
})
export class AuditModule {}

5. 변경 필드 감지: databaseEntity 활용

UpdateEventdatabaseEntity는 DB에서 읽어온 변경 전 상태입니다. 이를 활용하면 정확한 diff를 추적할 수 있습니다.

@EventSubscriber()
export class PriceChangeSubscriber implements EntitySubscriberInterface<Product> {

  listenTo() { return Product; }

  async beforeUpdate(event: UpdateEvent<Product>) {
    // databaseEntity를 로드하려면 옵션 필요
    // TypeORM 설정: subscribers: true + loadRelationIds: true
  }

  async afterUpdate(event: UpdateEvent<Product>) {
    const before = event.databaseEntity;
    const after = event.entity as Product;

    if (!before || !after) return;

    // 가격 변경 감지
    if (before.price !== after.price) {
      const changePercent = 
        ((after.price - before.price) / before.price) * 100;

      // 같은 트랜잭션의 QueryRunner로 이력 저장
      await event.queryRunner.query(
        `INSERT INTO price_history 
         (product_id, old_price, new_price, change_percent, changed_at)
         VALUES ($1, $2, $3, $4, NOW())`,
        [after.id, before.price, after.price, changePercent]
      );
    }
  }
}

주의: event.queryRunner.query()를 사용하면 같은 트랜잭션 안에서 추가 쿼리를 실행할 수 있어, 이력 저장이 원본 업데이트와 함께 커밋/롤백됩니다.

6. Soft Delete 자동 처리

@EventSubscriber()
export class SoftDeleteSubscriber implements EntitySubscriberInterface {

  constructor(private dataSource: DataSource) {
    dataSource.subscribers.push(this);
  }

  beforeUpdate(event: UpdateEvent<any>) {
    const entity = event.entity;
    if (!entity) return;

    // deletedAt이 설정되면 soft delete로 간주
    if (entity.deletedAt && !event.databaseEntity?.deletedAt) {
      entity.deletedBy = this.getCurrentUserId(event);
    }
  }

  // Soft delete된 엔티티의 관련 데이터도 cascade soft delete
  async afterUpdate(event: UpdateEvent<any>) {
    const entity = event.entity;
    if (!entity?.deletedAt || event.databaseEntity?.deletedAt) return;

    const metadata = event.metadata;
    
    // cascade soft delete 대상 관계 찾기
    for (const relation of metadata.relations) {
      if (relation.onDelete === 'CASCADE') {
        const relatedRepo = event.queryRunner.manager
          .getRepository(relation.inverseEntityMetadata.target);
        
        await relatedRepo
          .createQueryBuilder()
          .update()
          .set({ deletedAt: new Date() })
          .where(`${relation.inverseSidePropertyPath} = :id`, 
            { id: entity.id })
          .execute();
      }
    }
  }

  private getCurrentUserId(event: any): string | null {
    // AsyncLocalStorage 등에서 현재 사용자 조회
    return event.queryRunner?.data?.userId || null;
  }
}

7. 성능 주의사항

함정 문제 해결
Subscriber에서 DB 조회 N+1 발생 (매 엔티티마다) event.queryRunner 재사용, batch 처리
전체 엔티티 감시 불필요한 이벤트 처리 listenTo()로 대상 한정
동기 외부 호출 트랜잭션 지연 afterTransactionCommit + 비동기 큐
QueryBuilder 이벤트 미발화 createQueryBuilder().update() 시 Subscriber 무시 save() 사용 또는 수동 이벤트 발행
pendingEvents 메모리 누수 롤백 미처리 시 Map 증가 afterTransactionRollback에서 정리 필수

8. 테스트 전략

describe('OrderEventSubscriber', () => {
  let dataSource: DataSource;
  let orderRepo: Repository<Order>;
  let publishedEvents: DomainEvent[] = [];

  beforeAll(async () => {
    const module = await Test.createTestingModule({
      imports: [
        TypeOrmModule.forRoot({
          type: 'sqlite',
          database: ':memory:',
          entities: [Order],
          synchronize: true,
          // Subscriber 등록
          subscribers: [OrderEventSubscriber],
        }),
        TypeOrmModule.forFeature([Order]),
      ],
    }).compile();

    dataSource = module.get(DataSource);
    orderRepo = module.get(getRepositoryToken(Order));
  });

  it('커밋 후에만 ORDER_CREATED 이벤트 발행', async () => {
    const order = orderRepo.create({ total: 10000 });
    await orderRepo.save(order);

    // afterTransactionCommit에서 발행된 이벤트 확인
    expect(publishedEvents).toContainEqual(
      expect.objectContaining({ type: 'ORDER_CREATED' })
    );
  });

  it('롤백 시 이벤트 미발행', async () => {
    publishedEvents = [];
    const qr = dataSource.createQueryRunner();
    await qr.startTransaction();

    try {
      const order = qr.manager.create(Order, { total: -1 });
      await qr.manager.save(order);
      throw new Error('강제 롤백');
    } catch {
      await qr.rollbackTransaction();
    } finally {
      await qr.release();
    }

    expect(publishedEvents).toHaveLength(0);
  });
});

마무리

TypeORM Subscriber의 afterTransactionCommit/afterTransactionRollback 훅은 도메인 이벤트 발행의 정합성을 보장하는 핵심 메커니즘입니다. NestJS DI와 통합하려면 dataSource.subscribers.push(this) 패턴을 사용하고, event.queryRunner로 같은 트랜잭션 안에서 추가 쿼리를 실행할 수 있습니다. TypeORM Subscriber 이벤트 훅 기초를 이해한 뒤, NestJS EventEmitter2 이벤트 설계와 결합하여 안전한 이벤트 기반 아키텍처를 구축하는 것을 권장합니다.

위로 스크롤
WordPress Appliance - Powered by TurnKey Linux