Kafka 환경 구성(Gradle)
-
build.gradle파일에 다음 의존성을 추가합니다.
dependencies {
implementation 'org.apache.kafka:kafka-clients:3.7.0' // 예시 버전
}
Kafka 서버가 로컬 또는 원격에 있어야 하고, BOOTSTRAP_SERVERS에는 해당 호스트:포트 정보를 명시합니다.
Java Producer 예제 (메시지 보내기)
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class MyKafkaProducer {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
String message = "Hello, Kafka from Java!";
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, message);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.println("메시지 전송 성공: " + metadata.topic() + ", 파티션: " + metadata.partition());
} else {
exception.printStackTrace();
}
});
producer.flush();
producer.close();
}
}
위 코드는 Java 콘솔에서 실행 가능하며, 메시지를 지정한 토픽에 전송합니다.
Java Consumer 예제 (메시지 구독)
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class MyKafkaConsumer {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-group";
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("수신한 메시지: %s, 파티션: %d, 오프셋: %d\n", record.value(), record.partition(), record.offset());
}
}
}
}
이 코드는 지정한 토픽의 메시지를 계속해서 구독·출력합니다.
참고 사항 및 사용 방법

-
위 코드는 독립 실행형 Java 클래스로 Gradle 프로젝트에 바로 사용할 수 있습니다.
-
Kafka 브로커가 반드시 먼저 실행되고 있어야 하며, 토픽명을 일치시켜야 합니다.
-
Gradle 빌드 후
Producer → Consumer순서로 실행하면 메시지 Pub/Sub 테스트가 가능합니다.
이 방식은 Spring/Kafka Template 없이 core Java에서 동작하는 예시이며, 실무에서는 구성 방식에 따라 객체 재사용, Acks, Retires 세부 설정 등을 추가할 수 있습니다.