이벤트 소싱과 CQRS 심화
학습 목표
- Event Sourcing의 핵심 개념과 장점
- CQRS 패턴 심화
- Event Store 구현
- Projection과 Read Model 관리
Event Sourcing 심화
핵심 아이디어
상태가 아닌 상태 변경 이벤트를 저장
기존 방식 (State-oriented):
UPDATE accounts SET balance = 500 WHERE id = 'acc-123';
→ 잔액이 어떻게 500원이 되었는지 알 수 없음
Event Sourcing:
const events = [
{ type: 'AccountOpened', balance: 0 },
{ type: 'MoneyDeposited', amount: 1000 },
{ type: 'MoneyWithdrawn', amount: 300 },
{ type: 'MoneyWithdrawn', amount: 200 },
];
// 현재 잔액 = 0 + 1000 - 300 - 200 = 500
→ 모든 변경 history 추적 가능
Event Store 구현
Event 정의
interface DomainEvent {
id: string;
aggregateId: string;
aggregateType: string;
eventType: string;
data: any;
metadata: {
userId?: string;
timestamp: Date;
version: number;
};
}
// 주문 이벤트 예제
interface OrderCreatedEvent extends DomainEvent {
eventType: 'OrderCreated';
data: {
customerId: string;
items: OrderItem[];
totalAmount: number;
};
}
interface OrderConfirmedEvent extends DomainEvent {
eventType: 'OrderConfirmed';
data: {
confirmedAt: Date;
};
}
interface OrderCancelledEvent extends DomainEvent {
eventType: 'OrderCancelled';
data: {
reason: string;
cancelledAt: Date;
};
}
Event Store 구현 (PostgreSQL)
-- Event Store 테이블
CREATE TABLE events (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
aggregate_id VARCHAR(255) NOT NULL,
aggregate_type VARCHAR(100) NOT NULL,
event_type VARCHAR(100) NOT NULL,
data JSONB NOT NULL,
metadata JSONB,
version INT NOT NULL,
created_at TIMESTAMP DEFAULT NOW(),
-- Optimistic Locking을 위한 유니크 제약
UNIQUE(aggregate_id, version)
);
-- 인덱스
CREATE INDEX idx_aggregate_id ON events(aggregate_id);
CREATE INDEX idx_aggregate_type ON events(aggregate_type);
CREATE INDEX idx_event_type ON events(event_type);
CREATE INDEX idx_created_at ON events(created_at);
Event Store 클래스:
class EventStore {
async append(event: DomainEvent): Promise<void> {
await db.query(`
INSERT INTO events (
aggregate_id, aggregate_type, event_type,
data, metadata, version
) VALUES ($1, $2, $3, $4, $5, $6)
`, [
event.aggregateId,
event.aggregateType,
event.eventType,
JSON.stringify(event.data),
JSON.stringify(event.metadata),
event.metadata.version,
]);
}
async getEvents(
aggregateId: string,
fromVersion: number = 0
): Promise<DomainEvent[]> {
const result = await db.query(`
SELECT * FROM events
WHERE aggregate_id = $1 AND version > $2
ORDER BY version ASC
`, [aggregateId, fromVersion]);
return result.rows.map(row => ({
id: row.id,
aggregateId: row.aggregate_id,
aggregateType: row.aggregate_type,
eventType: row.event_type,
data: row.data,
metadata: row.metadata,
}));
}
async getAllEvents(
afterTimestamp?: Date
): Promise<DomainEvent[]> {
const query = afterTimestamp
? `SELECT * FROM events WHERE created_at > $1 ORDER BY created_at ASC`
: `SELECT * FROM events ORDER BY created_at ASC`;
const result = await db.query(query, afterTimestamp ? [afterTimestamp] : []);
return result.rows;
}
}
Aggregate with Event Sourcing
Order Aggregate 구현
class Order {
private id: string;
private customerId: string;
private items: OrderItem[] = [];
private status: OrderStatus = 'DRAFT';
private version: number = 0;
private uncommittedEvents: DomainEvent[] = [];
// Event Sourcing: 이벤트로부터 상태 재구성
static reconstruct(events: DomainEvent[]): Order {
const order = new Order();
for (const event of events) {
order.applyEvent(event, false); // 재구성 시에는 이벤트 저장 안 함
}
return order;
}
// Command: 주문 생성
create(data: { customerId: string; items: OrderItem[] }) {
if (this.id) {
throw new Error('Order already created');
}
const event: OrderCreatedEvent = {
id: uuid(),
aggregateId: uuid(),
aggregateType: 'Order',
eventType: 'OrderCreated',
data: {
customerId: data.customerId,
items: data.items,
totalAmount: this.calculateTotal(data.items),
},
metadata: {
timestamp: new Date(),
version: this.version + 1,
},
};
this.applyEvent(event);
}
// Command: 주문 확정
confirm() {
if (this.status !== 'PENDING') {
throw new Error('Order must be PENDING to confirm');
}
const event: OrderConfirmedEvent = {
id: uuid(),
aggregateId: this.id,
aggregateType: 'Order',
eventType: 'OrderConfirmed',
data: {
confirmedAt: new Date(),
},
metadata: {
timestamp: new Date(),
version: this.version + 1,
},
};
this.applyEvent(event);
}
// Command: 주문 취소
cancel(reason: string) {
if (this.status === 'CANCELLED') {
throw new Error('Order already cancelled');
}
const event: OrderCancelledEvent = {
id: uuid(),
aggregateId: this.id,
aggregateType: 'Order',
eventType: 'OrderCancelled',
data: {
reason,
cancelledAt: new Date(),
},
metadata: {
timestamp: new Date(),
version: this.version + 1,
},
};
this.applyEvent(event);
}
// Event Application (상태 변경)
private applyEvent(event: DomainEvent, isNew: boolean = true) {
switch (event.eventType) {
case 'OrderCreated':
this.id = event.aggregateId;
this.customerId = event.data.customerId;
this.items = event.data.items;
this.status = 'PENDING';
break;
case 'OrderConfirmed':
this.status = 'CONFIRMED';
break;
case 'OrderCancelled':
this.status = 'CANCELLED';
break;
}
this.version = event.metadata.version;
if (isNew) {
this.uncommittedEvents.push(event);
}
}
// Uncommitted Events 가져오기
getUncommittedEvents(): DomainEvent[] {
return this.uncommittedEvents;
}
// Events 커밋 후 clear
markEventsAsCommitted() {
this.uncommittedEvents = [];
}
private calculateTotal(items: OrderItem[]): number {
return items.reduce((sum, item) => sum + item.price * item.quantity, 0);
}
}
Repository 구현
class OrderRepository {
constructor(private eventStore: EventStore) {}
async findById(orderId: string): Promise<Order | null> {
// Event Store에서 이벤트 조회
const events = await this.eventStore.getEvents(orderId);
if (events.length === 0) {
return null;
}
// 이벤트로부터 Aggregate 재구성
return Order.reconstruct(events);
}
async save(order: Order): Promise<void> {
// Uncommitted Events 가져오기
const events = order.getUncommittedEvents();
// Event Store에 저장
for (const event of events) {
await this.eventStore.append(event);
}
// Events 커밋 완료
order.markEventsAsCommitted();
}
}
Application Service
class OrderService {
constructor(
private orderRepo: OrderRepository,
private eventBus: EventBus
) {}
async createOrder(data: CreateOrderData): Promise<string> {
// 1. Aggregate 생성
const order = new Order();
order.create({
customerId: data.customerId,
items: data.items,
});
// 2. 저장 (Event Store에)
await this.orderRepo.save(order);
// 3. 이벤트 발행 (다른 서비스 통지)
for (const event of order.getUncommittedEvents()) {
await this.eventBus.publish(event.eventType, event);
}
return order.id;
}
async confirmOrder(orderId: string): Promise<void> {
// 1. 재구성
const order = await this.orderRepo.findById(orderId);
if (!order) {
throw new Error('Order not found');
}
// 2. Command 실행
order.confirm();
// 3. 저장
await this.orderRepo.save(order);
// 4. 이벤트 발행
for (const event of order.getUncommittedEvents()) {
await this.eventBus.publish(event.eventType, event);
}
}
}
CQRS 심화
Read Model (Projection)
Query용 최적화된 Read Model:
// Denormalized Read Model
interface OrderReadModel {
orderId: string;
customerId: string;
customerName: string;
customerEmail: string;
items: {
productId: string;
productName: string;
productImage: string;
quantity: number;
unitPrice: number;
}[];
totalAmount: number;
status: string;
createdAt: Date;
confirmedAt?: Date;
}
// Projection Handler
class OrderProjection {
constructor(
private readDb: Database,
private customerService: CustomerService,
private productService: ProductService
) {}
async handleOrderCreated(event: OrderCreatedEvent) {
// 외부 서비스에서 상세 정보 조회
const customer = await this.customerService.getCustomer(
event.data.customerId
);
const products = await this.productService.getProducts(
event.data.items.map(i => i.productId)
);
// Read Model 생성
await this.readDb.orderReadModels.insert({
orderId: event.aggregateId,
customerId: event.data.customerId,
customerName: customer.name,
customerEmail: customer.email,
items: event.data.items.map((item, i) => ({
productId: item.productId,
productName: products[i].name,
productImage: products[i].images[0],
quantity: item.quantity,
unitPrice: item.unitPrice,
})),
totalAmount: event.data.totalAmount,
status: 'PENDING',
createdAt: event.metadata.timestamp,
});
}
async handleOrderConfirmed(event: OrderConfirmedEvent) {
await this.readDb.orderReadModels.update(
{ orderId: event.aggregateId },
{
$set: {
status: 'CONFIRMED',
confirmedAt: event.data.confirmedAt,
},
}
);
}
async handleOrderCancelled(event: OrderCancelledEvent) {
await this.readDb.orderReadModels.update(
{ orderId: event.aggregateId },
{
$set: {
status: 'CANCELLED',
cancelledAt: event.data.cancelledAt,
cancelReason: event.data.reason,
},
}
);
}
}
// Event Subscription
eventBus.subscribe('OrderCreated', (event) => projection.handleOrderCreated(event));
eventBus.subscribe('OrderConfirmed', (event) => projection.handleOrderConfirmed(event));
eventBus.subscribe('OrderCancelled', (event) => projection.handleOrderCancelled(event));
Query Service
class OrderQueryService {
constructor(private readDb: Database) {}
async getOrderById(orderId: string): Promise<OrderReadModel> {
// Read Model에서 빠르게 조회
return await this.readDb.orderReadModels.findOne({ orderId });
}
async getOrdersByCustomer(
customerId: string,
page: number = 1,
limit: number = 20
): Promise<OrderReadModel[]> {
return await this.readDb.orderReadModels
.find({ customerId })
.sort({ createdAt: -1 })
.skip((page - 1) * limit)
.limit(limit)
.toArray();
}
async searchOrders(query: string): Promise<OrderReadModel[]> {
// Full-text search (Elasticsearch 사용)
return await this.readDb.orderReadModels
.find({
$or: [
{ customerName: { $regex: query, $options: 'i' } },
{ 'items.productName': { $regex: query, $options: 'i' } },
],
})
.toArray();
}
}
Snapshot 패턴
문제: 이벤트가 너무 많을 때
주문에 이벤트가 10,000개 → 재구성에 시간 오래 걸림
해결: Snapshot 저장
interface Snapshot {
aggregateId: string;
aggregateType: string;
version: number;
state: any;
createdAt: Date;
}
class SnapshotStore {
async save(aggregateId: string, version: number, state: any) {
await db.snapshots.insert({
aggregateId,
aggregateType: 'Order',
version,
state,
createdAt: new Date(),
});
}
async getLatest(aggregateId: string): Promise<Snapshot | null> {
return await db.snapshots.findOne(
{ aggregateId },
{ sort: { version: -1 } }
);
}
}
class OrderRepository {
constructor(
private eventStore: EventStore,
private snapshotStore: SnapshotStore
) {}
async findById(orderId: string): Promise<Order | null> {
// 1. Snapshot 조회
const snapshot = await this.snapshotStore.getLatest(orderId);
let order: Order;
let fromVersion = 0;
if (snapshot) {
// Snapshot에서 복원
order = Order.fromSnapshot(snapshot.state);
fromVersion = snapshot.version;
} else {
order = new Order();
}
// 2. Snapshot 이후 이벤트만 조회
const events = await this.eventStore.getEvents(orderId, fromVersion);
// 3. 재구성
for (const event of events) {
order.applyEvent(event, false);
}
return order;
}
async save(order: Order): Promise<void> {
const events = order.getUncommittedEvents();
for (const event of events) {
await this.eventStore.append(event);
}
// 100개 이벤트마다 Snapshot 저장
if (order.getVersion() % 100 === 0) {
await this.snapshotStore.save(
order.id,
order.getVersion(),
order.toSnapshot()
);
}
order.markEventsAsCommitted();
}
}
장단점 정리
Event Sourcing 장점
✅ 완전한 감사 로그 (Audit Log) ✅ 시간 여행 (과거 상태 재현) ✅ 디버깅 용이 ✅ Event를 여러 Read Model로 변환 가능
Event Sourcing 단점
❌ Learning Curve 높음 ❌ 이벤트 스키마 변경 어려움 ❌ Snapshot 없으면 성능 저하 ❌ 저장 공간 많이 사용
CQRS 장점
✅ 읽기/쓰기 성능 독립 최적화 ✅ 복잡한 쿼리 간단히 처리 ✅ 확장성 (Read Replica 여러 개)
CQRS 단점
❌ 복잡도 증가 ❌ Eventual Consistency ❌ Read Model 동기화 관리 필요
핵심 정리
- Event Sourcing: 상태 대신 이벤트 저장
- Event Store가 Single Source of Truth
- CQRS로 읽기/쓰기 분리
- Projection으로 Query 최적화
- Snapshot으로 성능 개선
- Event Replay로 Read Model 재구성 가능
- 복잡도 증가하므로 신중히 선택