CQRS는 데이터를 변경하는 Command 측면과 데이터를 조회하는 Query 측면을 분리하는 패턴입니다. Kafka는 이 두 시스템 간의 비동기적 통신 및 데이터 동기화를 위한 이벤트 버스(Event Bus) 역할을 합니다.
1. 프로젝트 구조 및 설정 개요
CQRS 패턴은 일반적으로 다음 두 개의 별도 마이크로서비스로 구성됩니다.
application.yml (Kafka 설정)
두 서비스 모두 Kafka 통신을 위해 비슷한 설정을 사용합니다.
YAML
# application.yml (Command Service 또는 Query Service)
spring:
kafka:
# Kafka 서버 주소
bootstrap-servers: localhost:9092
# Producer 설정 (Command Service)
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # JSON 직렬화
# Consumer 설정 (Query Service)
consumer:
group-id: product-group # 소비자 그룹 ID
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # JSON 역직렬화
auto-offset-reset: earliest
2. Command Service (쓰기 측면)
사용자의 요청을 받아 DB에 데이터를 저장하고, 상태 변경 이벤트를 Kafka로 발행합니다.
2.1. Event 클래스
상태 변경을 나타내는 이벤트 객체입니다.
Java
// ProductCreatedEvent.java
public class ProductCreatedEvent {
private String id;
private String name;
private double price;
// Getters, Setters, Constructors...
}
2.2. Kafka Producer
이벤트를 Kafka 토픽에 전송하는 컴포넌트입니다.
Java
// ProductEventProducer.java
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@Component
public class ProductEventProducer {
private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
private static final String TOPIC = "product-events";
// Lombok의 @RequiredArgsConstructor 등으로 주입 가능
public ProductEventProducer(KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate) {
this.kafkaTemplate = kafkaTemplate;
}
public void sendProductCreatedEvent(ProductCreatedEvent event) {
System.out.println("[COMMAND] 이벤트 발행: " + event.getName());
// Kafka에 이벤트 발행 (Key: ID, Value: Event 객체)
kafkaTemplate.send(TOPIC, event.getId(), event);
}
}
2.3. Command Service 로직
ProductCommandService는 Command DB에 저장한 후, 반드시 Kafka에 이벤트를 발행합니다.
Java
// ProductCommandService.java (Command Service의 핵심 비즈니스 로직)
@Service
public class ProductCommandService {
private final ProductRepository commandRepository; // 쓰기 전용 DB Repository
private final ProductEventProducer eventProducer;
// Lombok의 @RequiredArgsConstructor 등으로 주입 가능
public ProductCommandService(ProductRepository commandRepository, ProductEventProducer eventProducer) {
this.commandRepository = commandRepository;
this.eventProducer = eventProducer;
}
// Command: 새로운 상품 생성 요청 처리
public Product createProduct(Product newProduct) {
// 1. 쓰기 전용 DB에 저장 (트랜잭션 보장)
Product savedProduct = commandRepository.save(newProduct);
// 2. Event 객체 생성
ProductCreatedEvent event = new ProductCreatedEvent(
savedProduct.getId(),
savedProduct.getName(),
savedProduct.getPrice()
);
// 3. Kafka에 이벤트 발행
eventProducer.sendProductCreatedEvent(event);
System.out.println("[COMMAND] DB 저장 및 이벤트 발행 완료.");
return savedProduct;
}
}
3. Query Service (읽기 측면)
Kafka로부터 이벤트를 구독하여 읽기 전용 DB의 **Materialized View (구체화된 뷰)**를 업데이트합니다.
3.1. Kafka Consumer (Event Handler)
Kafka 토픽을 구독하고 이벤트를 처리하여 읽기 DB를 동기화합니다.
Java
// ProductEventListener.java (Query Service의 Event Handler)
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class ProductEventListener {
private final ProductViewRepository queryRepository; // 읽기 전용 DB Repository
// Lombok의 @RequiredArgsConstructor 등으로 주입 가능
public ProductEventListener(ProductViewRepository queryRepository) {
this.queryRepository = queryRepository;
}
@KafkaListener(topics = "product-events", groupId = "product-group")
public void handleProductCreated(ProductCreatedEvent event) {
System.out.println("[QUERY] Kafka 이벤트 수신: " + event.getName());
// 1. Read Model (조회용 엔티티) 생성 또는 업데이트
ProductView productView = new ProductView();
productView.setId(event.getId());
productView.setName(event.getName());
productView.setPrice(event.getPrice());
// 2. 읽기 전용 DB에 저장 (데이터 동기화)
queryRepository.save(productView);
System.out.println("[QUERY] Read DB 업데이트 완료: " + productView.getName());
}
}
3.2. Query Service 로직
클라이언트의 조회 요청을 읽기 전용 DB에서 처리합니다.
Java
// ProductQueryService.java
@Service
public class ProductQueryService {
private final ProductViewRepository queryRepository; // 읽기 전용 DB Repository
// Lombok의 @RequiredArgsConstructor 등으로 주입 가능
public ProductQueryService(ProductViewRepository queryRepository) {
this.queryRepository = queryRepository;
}
// Query: 상품 ID로 조회 요청 처리
public ProductView getProductById(String id) {
System.out.println("[QUERY] Read DB에서 상품 조회: " + id);
// 읽기 전용 DB에서 조회
return queryRepository.findById(id).orElse(null);
}
}
