패킷을 UDP로 수신하여 Kafka의 특정 토픽(Topic)으로 전송

패킷을 UDP로 수신하여 Kafka의 특정 토픽(Topic)으로 전송하는 구조로 수정해 드리겠습니다.

이 코드를 실행하려면 프로젝트에 Kafka Client 라이브러리가 포함되어야 합니다. (Maven 또는 Gradle 설정 필요)

1. Maven 의존성 추가 (pom.xml)

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.5.0</version>
</dependency>

2. Kafka 연동 Java 샘플 (KrxUdpToKafka.java)

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.net.*;
import java.util.Properties;

public class KrxUdpToKafka {

    public static void main(String[] args) {
        // --- 1. 설정 정보 ---
        // UDP 설정
        String mcastIp = "239.1.1.1";
        int udpPort = 12345;
        String interfaceName = "eth0";

        // Kafka 설정
        String bootstrapServers = "localhost:9092"; // 카프카 브로커 주소
        String topicName = "krx-market-data";        // 전송할 토픽명

        // --- 2. Kafka Producer 설정 ---
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        // 바이너리 데이터 보존을 위해 ByteArraySerializer 사용 (또는 String 변환 후 StringSerializer)
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
        // 성능을 위한 옵션 (선택)
        props.put(ProducerConfig.ACKS_CONFIG, "1"); // 0: 성능중시, 1: 안전중시
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 지연시간을 줄여 실시간성 확보

        KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);

        // --- 3. UDP Multicast 소켓 설정 ---
        try (MulticastSocket socket = new MulticastSocket(udpPort)) {
            
            InetAddress group = InetAddress.getByName(mcastIp);
            NetworkInterface networkInterface = NetworkInterface.getByName(interfaceName);
            InetSocketAddress groupAddress = new InetSocketAddress(group, udpPort);
            
            socket.joinGroup(groupAddress, networkInterface);
            socket.setReuseAddress(true);

            System.out.println("Started: UDP Multicast -> Kafka Pipeline...");
            System.out.println("Receiving from: " + mcastIp + ":" + udpPort);
            System.out.println("Sending to Kafka Topic: " + topicName);

            byte[] buffer = new byte[2048];

            // 애플리케이션 종료 시 안전하게 닫기 위한 훅
            Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                System.out.println("Closing Producer and Socket...");
                producer.close();
                socket.close();
            }));

            while (true) {
                // 1) UDP 패킷 수신
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                socket.receive(packet);

                // 2) 실제 데이터 크기만큼 복사
                byte[] rawData = new byte[packet.getLength()];
                System.arraycopy(packet.getData(), 0, rawData, 0, packet.getLength());

                // 3) Kafka로 전송
                // Key값은 종목코드 등을 파싱해서 넣으면 좋지만, 여기서는 null로 설정
                ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, null, rawData);
                
                // 비동기 전송 및 콜백 처리
                producer.send(record, (metadata, exception) -> {
                    if (exception != null) {
                        System.err.println("Kafka Send Failed: " + exception.getMessage());
                    } else {
                        // 성공 시 로그 (운영 환경에서는 생략하거나 레벨 조정)
                        // System.out.println("Sent to Kafka: Offset " + metadata.offset());
                    }
                });

                // 디버깅을 위한 출력 (EUC-KR 변환)
                // String msg = new String(rawData, "EUC-KR");
                // System.out.println("Forwarded to Kafka: " + msg);
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            producer.close();
        }
    }
}

3. 주요 수정 사항 설명

패킷을 UDP로 수신하여 Kafka의 특정 토픽(Topic)으로 전송

  1. Kafka Producer 통합:

    • Properties를 통해 Kafka 브로커(bootstrap.servers) 정보를 설정합니다.
    • ByteArraySerializer를 사용하여 KRX의 바이너리 전문 데이터를 훼손 없이 Kafka로 전달합니다. 만약 가독성을 위해 문자열로만 넘기고 싶다면 StringSerializer를 쓰고 new String(rawData, "EUC-KR")로 변환하여 보낼 수도 있습니다.
  2. ProducerRecord 생성:

    • new ProducerRecord<>(topicName, key, value) 구조입니다.
    • key를 null로 주면 라운드 로빈 방식으로 카프카 파티션에 분산됩니다. 특정 종목의 체결 정보를 순서대로 처리해야 한다면 종목코드를 key로 지정하는 것이 좋습니다.
  3. 성능 최적화 (Linger, ACKS):

    • 시세 데이터는 지연시간(Latency)이 중요하므로 LINGER_MS_CONFIG를 낮게 설정(1ms 이하)하여 패킷이 들어오는 즉시 전송하도록 유도했습니다.
  4. Shutdown Hook:

    • Ctrl+C 등으로 프로그램 종료 시 Kafka Producer의 내부 버퍼에 남은 데이터를 모두 쏟아내고(flush) 안전하게 종료되도록 훅을 추가했습니다.

4. 수신 확인 방법 (Kafka CLI)

데이터가 Kafka에 잘 들어가는지 확인하려면 터미널에서 다음 명령어를 사용하세요:

# Kafka가 설치된 경로에서 실행
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic krx-market-data --from-beginning

이 구조를 바탕으로, 실제 운영 환경에서는 수신된 rawData 내의 **전문 구분자(메시지 ID)**를 파싱하여 서로 다른 Kafka 토픽(예: krx-tradekrx-orderbook)으로 라우팅하는 로직을 추가하시면 됩니다.