CQRS (Command Query Responsibility Segregation) 패턴을 Kafka와 함께 Java (Spring Boot 기반)로 구현하는 샘플

CQRS는 데이터를 변경하는 Command 측면과 데이터를 조회하는 Query 측면을 분리하는 패턴입니다. Kafka는 이 두 시스템 간의 비동기적 통신 및 데이터 동기화를 위한 이벤트 버스(Event Bus) 역할을 합니다.


 

1. 프로젝트 구조 및 설정 개요

 

CQRS 패턴은 일반적으로 다음 두 개의 별도 마이크로서비스로 구성됩니다.

구분 역할 데이터베이스 통신 수단
Command Service (Write Side) 생성, 수정, 삭제 요청 처리 및 Kafka 이벤트 발행 쓰기 전용 DB (예: PostgreSQL) REST API (클라이언트 요청), Kafka Producer
Query Service (Read Side) 조회 요청 처리 및 Kafka 이벤트 구독 읽기 전용 DB (예: MongoDB, Redis) REST API (클라이언트 요청), Kafka Consumer

 

application.yml (Kafka 설정)

 

두 서비스 모두 Kafka 통신을 위해 비슷한 설정을 사용합니다.

YAML

# application.yml (Command Service 또는 Query Service)

spring:
  kafka:
    # Kafka 서버 주소
    bootstrap-servers: localhost:9092
    
    # Producer 설정 (Command Service)
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # JSON 직렬화
      
    # Consumer 설정 (Query Service)
    consumer:
      group-id: product-group # 소비자 그룹 ID
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer # JSON 역직렬화
      auto-offset-reset: earliest

 

2. Command Service (쓰기 측면)

 

사용자의 요청을 받아 DB에 데이터를 저장하고, 상태 변경 이벤트를 Kafka로 발행합니다.

 

2.1. Event 클래스

 

상태 변경을 나타내는 이벤트 객체입니다.

Java

// ProductCreatedEvent.java
public class ProductCreatedEvent {
    private String id;
    private String name;
    private double price;

    // Getters, Setters, Constructors...
}

 

2.2. Kafka Producer

 

이벤트를 Kafka 토픽에 전송하는 컴포넌트입니다.

Java

// ProductEventProducer.java

import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

@Component
public class ProductEventProducer {

    private final KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate;
    private static final String TOPIC = "product-events";

    // Lombok의 @RequiredArgsConstructor 등으로 주입 가능
    public ProductEventProducer(KafkaTemplate<String, ProductCreatedEvent> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void sendProductCreatedEvent(ProductCreatedEvent event) {
        System.out.println("[COMMAND] 이벤트 발행: " + event.getName());
        // Kafka에 이벤트 발행 (Key: ID, Value: Event 객체)
        kafkaTemplate.send(TOPIC, event.getId(), event);
    }
}

 

2.3. Command Service 로직

 

ProductCommandService는 Command DB에 저장한 후, 반드시 Kafka에 이벤트를 발행합니다.

Java

// ProductCommandService.java (Command Service의 핵심 비즈니스 로직)

@Service
public class ProductCommandService {

    private final ProductRepository commandRepository; // 쓰기 전용 DB Repository
    private final ProductEventProducer eventProducer;

    // Lombok의 @RequiredArgsConstructor 등으로 주입 가능
    public ProductCommandService(ProductRepository commandRepository, ProductEventProducer eventProducer) {
        this.commandRepository = commandRepository;
        this.eventProducer = eventProducer;
    }

    // Command: 새로운 상품 생성 요청 처리
    public Product createProduct(Product newProduct) {
        // 1. 쓰기 전용 DB에 저장 (트랜잭션 보장)
        Product savedProduct = commandRepository.save(newProduct);

        // 2. Event 객체 생성
        ProductCreatedEvent event = new ProductCreatedEvent(
            savedProduct.getId(), 
            savedProduct.getName(), 
            savedProduct.getPrice()
        );

        // 3. Kafka에 이벤트 발행
        eventProducer.sendProductCreatedEvent(event);
        
        System.out.println("[COMMAND] DB 저장 및 이벤트 발행 완료.");
        return savedProduct;
    }
}

 

3. Query Service (읽기 측면)

 

Kafka로부터 이벤트를 구독하여 읽기 전용 DB의 **Materialized View (구체화된 뷰)**를 업데이트합니다.

 

3.1. Kafka Consumer (Event Handler)

 

Kafka 토픽을 구독하고 이벤트를 처리하여 읽기 DB를 동기화합니다.

Java

// ProductEventListener.java (Query Service의 Event Handler)

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

@Component
public class ProductEventListener {

    private final ProductViewRepository queryRepository; // 읽기 전용 DB Repository

    // Lombok의 @RequiredArgsConstructor 등으로 주입 가능
    public ProductEventListener(ProductViewRepository queryRepository) {
        this.queryRepository = queryRepository;
    }

    @KafkaListener(topics = "product-events", groupId = "product-group")
    public void handleProductCreated(ProductCreatedEvent event) {
        System.out.println("[QUERY] Kafka 이벤트 수신: " + event.getName());
        
        // 1. Read Model (조회용 엔티티) 생성 또는 업데이트
        ProductView productView = new ProductView();
        productView.setId(event.getId());
        productView.setName(event.getName());
        productView.setPrice(event.getPrice());
        
        // 2. 읽기 전용 DB에 저장 (데이터 동기화)
        queryRepository.save(productView);

        System.out.println("[QUERY] Read DB 업데이트 완료: " + productView.getName());
    }
}

 

3.2. Query Service 로직

 

클라이언트의 조회 요청을 읽기 전용 DB에서 처리합니다.

Java

// ProductQueryService.java

@Service
public class ProductQueryService {

    private final ProductViewRepository queryRepository; // 읽기 전용 DB Repository

    // Lombok의 @RequiredArgsConstructor 등으로 주입 가능
    public ProductQueryService(ProductViewRepository queryRepository) {
        this.queryRepository = queryRepository;
    }

    // Query: 상품 ID로 조회 요청 처리
    public ProductView getProductById(String id) {
        System.out.println("[QUERY] Read DB에서 상품 조회: " + id);
        // 읽기 전용 DB에서 조회
        return queryRepository.findById(id).orElse(null);
    }
}

CQRS (Command Query Responsibility Segregation) 패턴을 Kafka와 함께 Java (Spring Boot 기반)로 구현하는 샘플

CQRS 아키텍처는 Microservice | CQRS Design Pattern with SpringBoot & Apache Kafka | JavaTechie와 같이 Spring Boot와 Kafka를 사용하여 효과적으로 구현할 수 있습니다.

Microservice | CQRS Design Pattern with SpringBoot & Apache Kafka | JavaTechie