패킷을 UDP로 수신하여 Kafka의 특정 토픽(Topic)으로 전송하는 구조로 수정해 드리겠습니다.
이 코드를 실행하려면 프로젝트에 Kafka Client 라이브러리가 포함되어야 합니다. (Maven 또는 Gradle 설정 필요)
1. Maven 의존성 추가 (pom.xml)
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.5.0</version>
</dependency>
2. Kafka 연동 Java 샘플 (KrxUdpToKafka.java)
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import java.net.*;
import java.util.Properties;
public class KrxUdpToKafka {
public static void main(String[] args) {
// --- 1. 설정 정보 ---
// UDP 설정
String mcastIp = "239.1.1.1";
int udpPort = 12345;
String interfaceName = "eth0";
// Kafka 설정
String bootstrapServers = "localhost:9092"; // 카프카 브로커 주소
String topicName = "krx-market-data"; // 전송할 토픽명
// --- 2. Kafka Producer 설정 ---
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
// 바이너리 데이터 보존을 위해 ByteArraySerializer 사용 (또는 String 변환 후 StringSerializer)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
// 성능을 위한 옵션 (선택)
props.put(ProducerConfig.ACKS_CONFIG, "1"); // 0: 성능중시, 1: 안전중시
props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 지연시간을 줄여 실시간성 확보
KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
// --- 3. UDP Multicast 소켓 설정 ---
try (MulticastSocket socket = new MulticastSocket(udpPort)) {
InetAddress group = InetAddress.getByName(mcastIp);
NetworkInterface networkInterface = NetworkInterface.getByName(interfaceName);
InetSocketAddress groupAddress = new InetSocketAddress(group, udpPort);
socket.joinGroup(groupAddress, networkInterface);
socket.setReuseAddress(true);
System.out.println("Started: UDP Multicast -> Kafka Pipeline...");
System.out.println("Receiving from: " + mcastIp + ":" + udpPort);
System.out.println("Sending to Kafka Topic: " + topicName);
byte[] buffer = new byte[2048];
// 애플리케이션 종료 시 안전하게 닫기 위한 훅
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
System.out.println("Closing Producer and Socket...");
producer.close();
socket.close();
}));
while (true) {
// 1) UDP 패킷 수신
DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
socket.receive(packet);
// 2) 실제 데이터 크기만큼 복사
byte[] rawData = new byte[packet.getLength()];
System.arraycopy(packet.getData(), 0, rawData, 0, packet.getLength());
// 3) Kafka로 전송
// Key값은 종목코드 등을 파싱해서 넣으면 좋지만, 여기서는 null로 설정
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topicName, null, rawData);
// 비동기 전송 및 콜백 처리
producer.send(record, (metadata, exception) -> {
if (exception != null) {
System.err.println("Kafka Send Failed: " + exception.getMessage());
} else {
// 성공 시 로그 (운영 환경에서는 생략하거나 레벨 조정)
// System.out.println("Sent to Kafka: Offset " + metadata.offset());
}
});
// 디버깅을 위한 출력 (EUC-KR 변환)
// String msg = new String(rawData, "EUC-KR");
// System.out.println("Forwarded to Kafka: " + msg);
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
3. 주요 수정 사항 설명

-
Kafka Producer 통합:
Properties를 통해 Kafka 브로커(bootstrap.servers) 정보를 설정합니다.ByteArraySerializer를 사용하여 KRX의 바이너리 전문 데이터를 훼손 없이 Kafka로 전달합니다. 만약 가독성을 위해 문자열로만 넘기고 싶다면StringSerializer를 쓰고new String(rawData, "EUC-KR")로 변환하여 보낼 수도 있습니다.
-
ProducerRecord 생성:
new ProducerRecord<>(topicName, key, value)구조입니다.key를null로 주면 라운드 로빈 방식으로 카프카 파티션에 분산됩니다. 특정 종목의 체결 정보를 순서대로 처리해야 한다면 종목코드를key로 지정하는 것이 좋습니다.
-
성능 최적화 (Linger, ACKS):
- 시세 데이터는 지연시간(Latency)이 중요하므로
LINGER_MS_CONFIG를 낮게 설정(1ms 이하)하여 패킷이 들어오는 즉시 전송하도록 유도했습니다.
- 시세 데이터는 지연시간(Latency)이 중요하므로
-
Shutdown Hook:
Ctrl+C등으로 프로그램 종료 시 Kafka Producer의 내부 버퍼에 남은 데이터를 모두 쏟아내고(flush) 안전하게 종료되도록 훅을 추가했습니다.
4. 수신 확인 방법 (Kafka CLI)
데이터가 Kafka에 잘 들어가는지 확인하려면 터미널에서 다음 명령어를 사용하세요:
# Kafka가 설치된 경로에서 실행
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic krx-market-data --from-beginning
이 구조를 바탕으로, 실제 운영 환경에서는 수신된 rawData 내의 **전문 구분자(메시지 ID)**를 파싱하여 서로 다른 Kafka 토픽(예: krx-trade, krx-orderbook)으로 라우팅하는 로직을 추가하시면 됩니다.