Kafka Consumer에서 전체 파티션의 offset을 처음부터 재구독하는 샘플과 특정 offset부터 구독하는 샘플 코드를 정리했습니다.
1. 전체 파티션의 오프셋 처음부터 재구독 (처음부터 다시 읽기)
KafkaConsumer의 seekToBeginning() 메서드를 사용해, 구독한 파티션 전체 오프셋을 처음(earliest)으로 이동시켜 메시지를 다시 읽기 시작할 수 있습니다.
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Collections;
import java.util.Properties;
import java.util.Set;
public class KafkaSeekToBeginningExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “seek-beginning-group”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“auto.offset.reset”, “latest”); // 보통 사용안됨, 수동제어
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = “my-topic”;
consumer.subscribe(Collections.singletonList(topic));
// 아래는 파티션 할당이 완료되어야 실행 가능하므로 poll()을 먼저 호출해야 함
consumer.poll(Duration.ofMillis(100));
// 구독한 모든 파티션을 가져와 오프셋을 처음으로 이동
Set<TopicPartition> assignedPartitions = consumer.assignment();
consumer.seekToBeginning(assignedPartitions);
System.out.println(“Re-consuming from beginning of all assigned partitions”);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“partition=%d offset=%d key=%s value=%s%n”,
record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
2. 특정 offset부터 구독 시작 (파티션 수동 할당과 seek 사용)

assign() 으로 특정 파티션을 할당하고, seek() 로 원하는 offset 위치로 이동해 구독을 시작합니다.
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;import java.time.Duration;import java.util.Collections;
import java.util.Properties;
public class KafkaSeekSpecificOffsetExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(“bootstrap.servers”, “localhost:9092”);
props.put(“group.id”, “seek-specific-offset-group”);
props.put(“key.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“value.deserializer”, “org.apache.kafka.common.serialization.StringDeserializer”);
props.put(“auto.offset.reset”, “none”); // offset 없으면 예외 발생
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
String topic = “my-topic”;
int partitionNumber = 0;
long offsetToStart = 100L; // 원하는 시작 offset
TopicPartition partition = new TopicPartition(topic, partitionNumber);
consumer.assign(Collections.singletonList(partition));
// 원하는 offset 으로 이동한다
consumer.seek(partition, offsetToStart);
System.out.println(“Consuming from offset “ + offsetToStart + ” in partition “ + partitionNumber);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf(“partition=%d offset=%d key=%s value=%s%n”,
record.partition(), record.offset(), record.key(), record.value());
}
}
}
}
요약
-
전체 파티션 offset 처음부터 읽으려면
subscribe()후poll()호출 뒤,seekToBeginning(assignment)사용 -
특정 offset부터 읽으려면
assign()으로 파티션 직접 할당 후seek(partition, offset)으로 offset 지정 -
seek()호출 전에는 반드시 해당 파티션이 컨슈머에 할당되어 있어야 한다
이 두 샘플 코드를 Java Kafka 클라이언트 라이브러리에서 바로 사용 가능합니다.