10강 55분

이벤트 소싱과 CQRS 심화

Event Sourcing과 CQRS 패턴을 깊이 있게 학습하고 실전 프로젝트에 적용하는 방법을 배웁니다.

이벤트 소싱과 CQRS 심화

학습 목표

  • Event Sourcing의 핵심 개념과 장점
  • CQRS 패턴 심화
  • Event Store 구현
  • Projection과 Read Model 관리

Event Sourcing 심화

핵심 아이디어

상태가 아닌 상태 변경 이벤트를 저장

기존 방식 (State-oriented):

UPDATE accounts SET balance = 500 WHERE id = 'acc-123';

→ 잔액이 어떻게 500원이 되었는지 알 수 없음

Event Sourcing:

const events = [
  { type: 'AccountOpened', balance: 0 },
  { type: 'MoneyDeposited', amount: 1000 },
  { type: 'MoneyWithdrawn', amount: 300 },
  { type: 'MoneyWithdrawn', amount: 200 },
];
// 현재 잔액 = 0 + 1000 - 300 - 200 = 500

→ 모든 변경 history 추적 가능

Event Store 구현

Event 정의

interface DomainEvent {
  id: string;
  aggregateId: string;
  aggregateType: string;
  eventType: string;
  data: any;
  metadata: {
    userId?: string;
    timestamp: Date;
    version: number;
  };
}

// 주문 이벤트 예제
interface OrderCreatedEvent extends DomainEvent {
  eventType: 'OrderCreated';
  data: {
    customerId: string;
    items: OrderItem[];
    totalAmount: number;
  };
}

interface OrderConfirmedEvent extends DomainEvent {
  eventType: 'OrderConfirmed';
  data: {
    confirmedAt: Date;
  };
}

interface OrderCancelledEvent extends DomainEvent {
  eventType: 'OrderCancelled';
  data: {
    reason: string;
    cancelledAt: Date;
  };
}

Event Store 구현 (PostgreSQL)

-- Event Store 테이블
CREATE TABLE events (
  id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
  aggregate_id VARCHAR(255) NOT NULL,
  aggregate_type VARCHAR(100) NOT NULL,
  event_type VARCHAR(100) NOT NULL,
  data JSONB NOT NULL,
  metadata JSONB,
  version INT NOT NULL,
  created_at TIMESTAMP DEFAULT NOW(),

  -- Optimistic Locking을 위한 유니크 제약
  UNIQUE(aggregate_id, version)
);

-- 인덱스
CREATE INDEX idx_aggregate_id ON events(aggregate_id);
CREATE INDEX idx_aggregate_type ON events(aggregate_type);
CREATE INDEX idx_event_type ON events(event_type);
CREATE INDEX idx_created_at ON events(created_at);

Event Store 클래스:

class EventStore {
  async append(event: DomainEvent): Promise<void> {
    await db.query(`
      INSERT INTO events (
        aggregate_id, aggregate_type, event_type,
        data, metadata, version
      ) VALUES ($1, $2, $3, $4, $5, $6)
    `, [
      event.aggregateId,
      event.aggregateType,
      event.eventType,
      JSON.stringify(event.data),
      JSON.stringify(event.metadata),
      event.metadata.version,
    ]);
  }

  async getEvents(
    aggregateId: string,
    fromVersion: number = 0
  ): Promise<DomainEvent[]> {
    const result = await db.query(`
      SELECT * FROM events
      WHERE aggregate_id = $1 AND version > $2
      ORDER BY version ASC
    `, [aggregateId, fromVersion]);

    return result.rows.map(row => ({
      id: row.id,
      aggregateId: row.aggregate_id,
      aggregateType: row.aggregate_type,
      eventType: row.event_type,
      data: row.data,
      metadata: row.metadata,
    }));
  }

  async getAllEvents(
    afterTimestamp?: Date
  ): Promise<DomainEvent[]> {
    const query = afterTimestamp
      ? `SELECT * FROM events WHERE created_at > $1 ORDER BY created_at ASC`
      : `SELECT * FROM events ORDER BY created_at ASC`;

    const result = await db.query(query, afterTimestamp ? [afterTimestamp] : []);
    return result.rows;
  }
}

Aggregate with Event Sourcing

Order Aggregate 구현

class Order {
  private id: string;
  private customerId: string;
  private items: OrderItem[] = [];
  private status: OrderStatus = 'DRAFT';
  private version: number = 0;
  private uncommittedEvents: DomainEvent[] = [];

  // Event Sourcing: 이벤트로부터 상태 재구성
  static reconstruct(events: DomainEvent[]): Order {
    const order = new Order();

    for (const event of events) {
      order.applyEvent(event, false); // 재구성 시에는 이벤트 저장 안 함
    }

    return order;
  }

  // Command: 주문 생성
  create(data: { customerId: string; items: OrderItem[] }) {
    if (this.id) {
      throw new Error('Order already created');
    }

    const event: OrderCreatedEvent = {
      id: uuid(),
      aggregateId: uuid(),
      aggregateType: 'Order',
      eventType: 'OrderCreated',
      data: {
        customerId: data.customerId,
        items: data.items,
        totalAmount: this.calculateTotal(data.items),
      },
      metadata: {
        timestamp: new Date(),
        version: this.version + 1,
      },
    };

    this.applyEvent(event);
  }

  // Command: 주문 확정
  confirm() {
    if (this.status !== 'PENDING') {
      throw new Error('Order must be PENDING to confirm');
    }

    const event: OrderConfirmedEvent = {
      id: uuid(),
      aggregateId: this.id,
      aggregateType: 'Order',
      eventType: 'OrderConfirmed',
      data: {
        confirmedAt: new Date(),
      },
      metadata: {
        timestamp: new Date(),
        version: this.version + 1,
      },
    };

    this.applyEvent(event);
  }

  // Command: 주문 취소
  cancel(reason: string) {
    if (this.status === 'CANCELLED') {
      throw new Error('Order already cancelled');
    }

    const event: OrderCancelledEvent = {
      id: uuid(),
      aggregateId: this.id,
      aggregateType: 'Order',
      eventType: 'OrderCancelled',
      data: {
        reason,
        cancelledAt: new Date(),
      },
      metadata: {
        timestamp: new Date(),
        version: this.version + 1,
      },
    };

    this.applyEvent(event);
  }

  // Event Application (상태 변경)
  private applyEvent(event: DomainEvent, isNew: boolean = true) {
    switch (event.eventType) {
      case 'OrderCreated':
        this.id = event.aggregateId;
        this.customerId = event.data.customerId;
        this.items = event.data.items;
        this.status = 'PENDING';
        break;

      case 'OrderConfirmed':
        this.status = 'CONFIRMED';
        break;

      case 'OrderCancelled':
        this.status = 'CANCELLED';
        break;
    }

    this.version = event.metadata.version;

    if (isNew) {
      this.uncommittedEvents.push(event);
    }
  }

  // Uncommitted Events 가져오기
  getUncommittedEvents(): DomainEvent[] {
    return this.uncommittedEvents;
  }

  // Events 커밋 후 clear
  markEventsAsCommitted() {
    this.uncommittedEvents = [];
  }

  private calculateTotal(items: OrderItem[]): number {
    return items.reduce((sum, item) => sum + item.price * item.quantity, 0);
  }
}

Repository 구현

class OrderRepository {
  constructor(private eventStore: EventStore) {}

  async findById(orderId: string): Promise<Order | null> {
    // Event Store에서 이벤트 조회
    const events = await this.eventStore.getEvents(orderId);

    if (events.length === 0) {
      return null;
    }

    // 이벤트로부터 Aggregate 재구성
    return Order.reconstruct(events);
  }

  async save(order: Order): Promise<void> {
    // Uncommitted Events 가져오기
    const events = order.getUncommittedEvents();

    // Event Store에 저장
    for (const event of events) {
      await this.eventStore.append(event);
    }

    // Events 커밋 완료
    order.markEventsAsCommitted();
  }
}

Application Service

class OrderService {
  constructor(
    private orderRepo: OrderRepository,
    private eventBus: EventBus
  ) {}

  async createOrder(data: CreateOrderData): Promise<string> {
    // 1. Aggregate 생성
    const order = new Order();
    order.create({
      customerId: data.customerId,
      items: data.items,
    });

    // 2. 저장 (Event Store에)
    await this.orderRepo.save(order);

    // 3. 이벤트 발행 (다른 서비스 통지)
    for (const event of order.getUncommittedEvents()) {
      await this.eventBus.publish(event.eventType, event);
    }

    return order.id;
  }

  async confirmOrder(orderId: string): Promise<void> {
    // 1. 재구성
    const order = await this.orderRepo.findById(orderId);

    if (!order) {
      throw new Error('Order not found');
    }

    // 2. Command 실행
    order.confirm();

    // 3. 저장
    await this.orderRepo.save(order);

    // 4. 이벤트 발행
    for (const event of order.getUncommittedEvents()) {
      await this.eventBus.publish(event.eventType, event);
    }
  }
}

CQRS 심화

Read Model (Projection)

Query용 최적화된 Read Model:

// Denormalized Read Model
interface OrderReadModel {
  orderId: string;
  customerId: string;
  customerName: string;
  customerEmail: string;
  items: {
    productId: string;
    productName: string;
    productImage: string;
    quantity: number;
    unitPrice: number;
  }[];
  totalAmount: number;
  status: string;
  createdAt: Date;
  confirmedAt?: Date;
}

// Projection Handler
class OrderProjection {
  constructor(
    private readDb: Database,
    private customerService: CustomerService,
    private productService: ProductService
  ) {}

  async handleOrderCreated(event: OrderCreatedEvent) {
    // 외부 서비스에서 상세 정보 조회
    const customer = await this.customerService.getCustomer(
      event.data.customerId
    );

    const products = await this.productService.getProducts(
      event.data.items.map(i => i.productId)
    );

    // Read Model 생성
    await this.readDb.orderReadModels.insert({
      orderId: event.aggregateId,
      customerId: event.data.customerId,
      customerName: customer.name,
      customerEmail: customer.email,
      items: event.data.items.map((item, i) => ({
        productId: item.productId,
        productName: products[i].name,
        productImage: products[i].images[0],
        quantity: item.quantity,
        unitPrice: item.unitPrice,
      })),
      totalAmount: event.data.totalAmount,
      status: 'PENDING',
      createdAt: event.metadata.timestamp,
    });
  }

  async handleOrderConfirmed(event: OrderConfirmedEvent) {
    await this.readDb.orderReadModels.update(
      { orderId: event.aggregateId },
      {
        $set: {
          status: 'CONFIRMED',
          confirmedAt: event.data.confirmedAt,
        },
      }
    );
  }

  async handleOrderCancelled(event: OrderCancelledEvent) {
    await this.readDb.orderReadModels.update(
      { orderId: event.aggregateId },
      {
        $set: {
          status: 'CANCELLED',
          cancelledAt: event.data.cancelledAt,
          cancelReason: event.data.reason,
        },
      }
    );
  }
}

// Event Subscription
eventBus.subscribe('OrderCreated', (event) => projection.handleOrderCreated(event));
eventBus.subscribe('OrderConfirmed', (event) => projection.handleOrderConfirmed(event));
eventBus.subscribe('OrderCancelled', (event) => projection.handleOrderCancelled(event));

Query Service

class OrderQueryService {
  constructor(private readDb: Database) {}

  async getOrderById(orderId: string): Promise<OrderReadModel> {
    // Read Model에서 빠르게 조회
    return await this.readDb.orderReadModels.findOne({ orderId });
  }

  async getOrdersByCustomer(
    customerId: string,
    page: number = 1,
    limit: number = 20
  ): Promise<OrderReadModel[]> {
    return await this.readDb.orderReadModels
      .find({ customerId })
      .sort({ createdAt: -1 })
      .skip((page - 1) * limit)
      .limit(limit)
      .toArray();
  }

  async searchOrders(query: string): Promise<OrderReadModel[]> {
    // Full-text search (Elasticsearch 사용)
    return await this.readDb.orderReadModels
      .find({
        $or: [
          { customerName: { $regex: query, $options: 'i' } },
          { 'items.productName': { $regex: query, $options: 'i' } },
        ],
      })
      .toArray();
  }
}

Snapshot 패턴

문제: 이벤트가 너무 많을 때

주문에 이벤트가 10,000개 → 재구성에 시간 오래 걸림

해결: Snapshot 저장

interface Snapshot {
  aggregateId: string;
  aggregateType: string;
  version: number;
  state: any;
  createdAt: Date;
}

class SnapshotStore {
  async save(aggregateId: string, version: number, state: any) {
    await db.snapshots.insert({
      aggregateId,
      aggregateType: 'Order',
      version,
      state,
      createdAt: new Date(),
    });
  }

  async getLatest(aggregateId: string): Promise<Snapshot | null> {
    return await db.snapshots.findOne(
      { aggregateId },
      { sort: { version: -1 } }
    );
  }
}

class OrderRepository {
  constructor(
    private eventStore: EventStore,
    private snapshotStore: SnapshotStore
  ) {}

  async findById(orderId: string): Promise<Order | null> {
    // 1. Snapshot 조회
    const snapshot = await this.snapshotStore.getLatest(orderId);

    let order: Order;
    let fromVersion = 0;

    if (snapshot) {
      // Snapshot에서 복원
      order = Order.fromSnapshot(snapshot.state);
      fromVersion = snapshot.version;
    } else {
      order = new Order();
    }

    // 2. Snapshot 이후 이벤트만 조회
    const events = await this.eventStore.getEvents(orderId, fromVersion);

    // 3. 재구성
    for (const event of events) {
      order.applyEvent(event, false);
    }

    return order;
  }

  async save(order: Order): Promise<void> {
    const events = order.getUncommittedEvents();

    for (const event of events) {
      await this.eventStore.append(event);
    }

    // 100개 이벤트마다 Snapshot 저장
    if (order.getVersion() % 100 === 0) {
      await this.snapshotStore.save(
        order.id,
        order.getVersion(),
        order.toSnapshot()
      );
    }

    order.markEventsAsCommitted();
  }
}

장단점 정리

Event Sourcing 장점

✅ 완전한 감사 로그 (Audit Log) ✅ 시간 여행 (과거 상태 재현) ✅ 디버깅 용이 ✅ Event를 여러 Read Model로 변환 가능

Event Sourcing 단점

❌ Learning Curve 높음 ❌ 이벤트 스키마 변경 어려움 ❌ Snapshot 없으면 성능 저하 ❌ 저장 공간 많이 사용

CQRS 장점

✅ 읽기/쓰기 성능 독립 최적화 ✅ 복잡한 쿼리 간단히 처리 ✅ 확장성 (Read Replica 여러 개)

CQRS 단점

❌ 복잡도 증가 ❌ Eventual Consistency ❌ Read Model 동기화 관리 필요

핵심 정리

  • Event Sourcing: 상태 대신 이벤트 저장
  • Event Store가 Single Source of Truth
  • CQRS로 읽기/쓰기 분리
  • Projection으로 Query 최적화
  • Snapshot으로 성능 개선
  • Event Replay로 Read Model 재구성 가능
  • 복잡도 증가하므로 신중히 선택