Kafka Consumer Advanced: Perilaku Reset Offset Otomatis Konsumer Kafka
Perilaku Reset Offset Otomatis Konsumer Kafka
Dalam Apache Kafka, konsumer diharapkan membaca data dari log secara terus-menerus, dengan offset menentukan posisi terakhir yang dibaca dalam partisi. Ketika offset yang dikomit sebelumnya tidak tersedia (misalnya, karena konsumer mati lebih lama dari periode retensi offset atau karena bug aplikasi), konsumer harus menentukan dari mana memulai membaca. Artikel ini menjelaskan pengaturan auto.offset.reset
, peran offset.retention.minutes
, cara melakukan replay data untuk pemulihan, dan alternatif menggunakan API tingkat rendah seperti seek()
dan assign()
. Kami menyertakan contoh kode Java, langkah-langkah pengujian dengan CLI, dan praktik terbaik untuk memastikan pengelolaan offset yang andal di lingkungan produksi.
Perilaku Offset Konsumer
Konsumer Kafka melacak posisi baca menggunakan offset, yang disimpan di topik internal __consumer_offsets
. Offset menunjukkan pesan terakhir yang telah diproses dari partisi tertentu. Namun, offset dapat menjadi "tidak valid" jika:
Konsumer mati lebih lama dari periode retensi offset (default: 7 hari).
Topik dihapus atau partisi diubah.
Grup konsumer baru digunakan tanpa riwayat offset.
Diagram Perilaku Offset:
[Log Partisi: Pesan 0 -> Pesan 1 -> ... -> Pesan N]
[Konsumer: Offset dikomit di Pesan N-1, membaca Pesan N berikutnya]
Jika offset tidak tersedia, pengaturan auto.offset.reset
menentukan perilaku konsumer.
Pengaturan auto.offset.reset
Pengaturan auto.offset.reset
mengontrol dari mana konsumer mulai membaca jika tidak ada offset yang valid untuk grup konsumer. Nilai yang mungkin adalah:
latest (default):
Konsumer mulai membaca dari ujung (tail) partisi, yaitu pesan terbaru yang masuk setelah konsumer aktif.
Implikasi: Pesan sebelumnya diabaikan, cocok untuk aplikasi yang hanya peduli dengan data baru (misalnya, pemantauan real-time).
Kasus Penggunaan: Sistem yang dapat mentoleransi kehilangan data historis, seperti analitik streaming.
earliest:
Konsumer mulai membaca dari offset tertua yang tersedia di partisi (biasanya dari awal log, tergantung pada retensi data topik).
Implikasi: Semua pesan yang tersedia di partisi akan dibaca, cocok untuk pemrosesan ulang data historis.
Kasus Penggunaan: Sistem yang memerlukan pemrosesan semua data, seperti ETL atau audit.
none:
Konsumer melempar pengecualian (
OffsetOutOfRangeException
) jika tidak ada offset yang valid.Implikasi: Memaksa aplikasi untuk menangani kasus offset tidak valid secara eksplisit.
Kasus Penggunaan: Sistem kritis yang memerlukan kontrol ketat atas offset.
Catatan:
Pilihan
auto.offset.reset
hanya berlaku saat offset tidak ditemukan untuk grup konsumer (misalnya, grup baru atau offset dihapus karena retensi).Nilai default (
latest
) cocok untuk sebagian besar kasus, tetapi sesuaikan berdasarkan kebutuhan aplikasi.
offset.retention.minutes
Deskripsi: Menentukan periode retensi (dalam menit) untuk offset yang disimpan di topik
__consumer_offsets
.Default: 10080 menit (7 hari) pada Kafka versi ≥ 2.0.
Fungsi:
Offset yang lebih lama dari periode retensi akan dihapus, menyebabkan offset menjadi tidak valid.
Jika konsumer mati lebih lama dari
offset.retention.minutes
(misalnya, >7 hari) atau topik memiliki throughput rendah, offset yang dikomit sebelumnya mungkin hilang.
Implikasi:
Jika offset hilang, konsumer akan menggunakan
auto.offset.reset
untuk menentukan posisi baca.Untuk aplikasi dengan konsumer yang sering mati lama atau topik dengan throughput rendah, tingkatkan
offset.retention.minutes
(misalnya, ke 1 bulan atau 43200 menit).
Pengaturan: Ini adalah pengaturan tingkat broker, diatur melalui
offsets.retention.minutes
di konfigurasi broker atau perubahan dinamis:kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name 0 \ --add-config offsets.retention.minutes=43200
Replay Data untuk Konsumer
Dalam situasi tertentu, Anda mungkin perlu memutar ulang (replay) data dari offset sebelumnya untuk pemulihan dari kesalahan atau analisis ulang. Langkah-langkah untuk melakukan replay data:
Hentikan Semua Konsumer dalam Grup:
Pastikan semua konsumer dalam grup konsumer berhenti untuk mencegah konflik.
Atur Ulang Offset:
Gunakan perintah
kafka-consumer-groups.sh
untuk mengatur ulang offset ke posisi yang diinginkan (misalnya,earliest
, offset spesifik, atau berdasarkan waktu).Contoh: Atur ulang offset ke awal untuk grup
consumer-group
:kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group \ --topic poll-topic --reset-offsets --to-earliest --execute
Verifikasi offset baru:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe
Restart Konsumer:
Jalankan kembali konsumer untuk mulai membaca dari offset yang telah diatur ulang.
Catatan:
Replay data dapat menyebabkan duplikasi jika pengolahan tidak idempoten. Pastikan logika pengolahan menangani duplikasi (misalnya, menggunakan kunci unik).
Gunakan alat seperti Conduktor untuk antarmuka grafis dalam mengatur ulang offset.
Alternatif: Menggunakan API Tingkat Rendah (seek dan assign)
Sebagai alternatif untuk grup konsumer dan offset otomatis, Anda dapat menggunakan API tingkat rendah seperti seek()
dan assign()
untuk mengontrol posisi baca secara manual tanpa bergantung pada grup konsumer.
assign(): Menetapkan partisi tertentu ke konsumer secara eksplisit, bukan menggunakan langganan grup konsumer.
seek(): Mengatur offset spesifik untuk partisi yang ditugaskan, memungkinkan pembacaan dari posisi tertentu.
Contoh Kode Java untuk Seek dan Assign
Berikut adalah contoh kode Java yang menggunakan assign()
dan seek()
untuk membaca dari offset tertentu:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class SeekAssignConsumer {
public static void main(String[] args) {
// Konfigurasi konsumer
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// Buat konsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
// Tetapkan partisi secara manual
String topic = "poll-topic";
TopicPartition partition = new TopicPartition(topic, 0);
consumer.assign(Collections.singletonList(partition));
// Seek ke offset tertentu (misalnya, offset 0 untuk replay dari awal)
consumer.seek(partition, 0);
try {
while (true) {
// Poll pesan
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Proses setiap pesan
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.printf("Menerima pesan: key=%s, value=%s, partition=%d, offset=%d%n",
key, value, record.partition(), record.offset());
// Simulasi pengolahan idempoten
try {
processMessage(key, value);
} catch (Exception e) {
System.err.printf("Gagal memproses pesan: %s%n", e.getMessage());
continue;
}
}
// Komit offset secara manual
if (!records.isEmpty()) {
try {
consumer.commitSync();
System.out.println("Offset dikomit");
} catch (Exception e) {
System.err.printf("Gagal mengomit offset: %s%n", e.getMessage());
}
}
}
} finally {
consumer.close();
}
}
private static void processMessage(String key, String value) {
// Simulasi pengolahan idempoten
System.out.printf("Memproses pesan: key=%s, value=%s%n", key, value);
}
}
Penjelasan Kode:
Menggunakan
assign()
untuk menetapkan partisi secara eksplisit, menghindari koordinasi grup konsumer.seek(partition, 0)
mengatur konsumer untuk membaca dari offset awal partisi.Komit manual dengan
commitSync()
untuk semantik At Least Once.Cocok untuk kasus di mana Anda perlu kontrol penuh atas offset, seperti replay data atau konsumsi partisi spesifik.
Praktik Konfigurasi dan Pengujian
Berikut adalah langkah-langkah praktis untuk mengonfigurasi konsumer dengan pengaturan reset offset, pengujian replay data, dan penggunaan API tingkat rendah.
Prasyarat
Pastikan klaster Kafka berjalan (versi ≥ 2.0, mode Zookeeper atau KRaft).
Gunakan ekstensi CLI yang sesuai:
.sh
untuk Linux/Mac,.bat
untuk Windows.Pastikan broker Kafka aktif di
localhost:9092
.Klaster memiliki setidaknya tiga broker untuk mendukung
replication.factor=3
.
Langkah-langkah Praktis
1. Buat Topik
Buat topik bernama poll-topic
dengan 3 partisi, faktor replikasi 3, dan min.insync.replicas=2
:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic poll-topic \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2
Verifikasi konfigurasi topik:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic poll-topic
Contoh Keluaran:
Topic: poll-topic TopicId: XYZ123 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: poll-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: poll-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: poll-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
2. Produksi Pesan untuk Pengujian
Gunakan produser konsol untuk mengirim pesan:
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--property parse.key=true \
--property key.separator=,
Kirim beberapa pesan:
key1,{"id": 1, "data": "Test message 1"}
key2,{"id": 2, "data": "Test message 2"}
key3,{"id": 3, "data": "Test message 3"}
3. Uji Reset Offset dengan CLI
Hentikan semua konsumer dalam grup consumer-group
. Atur ulang offset ke earliest
:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group \
--topic poll-topic --reset-offsets --to-earliest --execute
Verifikasi offset baru:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe
Contoh Keluaran:
Consumer group 'consumer-group' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID
consumer-group poll-topic 0 0 3 3 -
consumer-group poll-topic 1 0 2 2 -
consumer-group poll-topic 2 0 1 1 -
Jalankan konsumer konsol untuk menguji perilaku reset:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--from-beginning \
--group consumer-group \
--property print.key=true \
--property key.separator=, \
--consumer-property auto.offset.reset=earliest
Contoh Keluaran:
key1,{"id": 1, "data": "Test message 1"}
key2,{"id": 2, "data": "Test message 2"}
key3,{"id": 3, "data": "Test message 3"}
4. Uji Konsumer dengan Seek dan Assign
Jalankan kode Java di atas untuk menguji konsumsi dari offset spesifik (misalnya, offset 0). Verifikasi bahwa konsumer membaca dari awal partisi.
5. Uji Perilaku dengan Offset Retention
Simulasi Offset Tidak Valid:
Atur
offsets.retention.minutes
ke nilai rendah (misalnya, 1 menit) untuk pengujian:kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name 0 \ --add-config offsets.retention.minutes=1
Hentikan konsumer selama >1 menit, lalu jalankan kembali dengan
auto.offset.reset=earliest
.Periksa apakah konsumer membaca dari awal log.
Pantau Offset:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe
Pantau Performa:
Gunakan metrik JMX seperti
RecordsConsumedTotal
danOffsetCommitRate
untuk memantau efisiensi konsumsi.
Catatan
Retensi Data vs. Offset: Pastikan periode retensi data topik (
retention.ms
) lebih lama darioffset.retention.minutes
jika Anda ingin replay data historis.Mode KRaft: Pengelolaan offset serupa di mode KRaft, tetapi pastikan controller stabil untuk topik
__consumer_offsets
.API Tingkat Rendah: Gunakan
seek()
danassign()
hanya untuk kasus khusus, seperti konsumsi partisi spesifik atau replay data manual.
Praktik Terbaik
Pilih auto.offset.reset yang Tepat:
Gunakan
latest
untuk aplikasi yang hanya memerlukan data baru.Gunakan
earliest
untuk aplikasi yang memerlukan semua data historis.Gunakan
none
untuk sistem kritis yang memerlukan penanganan eksplisit offset tidak valid.
Sesuaikan offset.retention.minutes:
Tingkatkan ke
43200
(1 bulan) untuk konsumer yang sering mati lama atau topik dengan throughput rendah.Monitor retensi offset dengan:
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers
Gunakan Replay untuk Pemulihan:
Atur ulang offset dengan
kafka-consumer-groups.sh
untuk memutar ulang data saat pemulihan dari kesalahan.Pastikan pengolahan idempoten untuk menangani duplikasi.
Gunakan API Tingkat Rendah untuk Kontrol Halus:
Gunakan
seek()
danassign()
untuk konsumsi partisi spesifik atau replay offset tertentu.Hindari untuk aplikasi berskala besar karena memerlukan pengelolaan manual yang kompleks.
Pantau Offset dan Performa:
Gunakan
kafka-consumer-groups.sh
untuk memeriksa lag dan offset grup konsumer.Pantau metrik JMX seperti
OffsetCommitRate
danRecordsConsumedTotal
.
Uji di Lingkungan Non-Produksi:
Simulasikan offset tidak valid dengan mengatur
offsets.retention.minutes
rendah.Uji perilaku
auto.offset.reset
dengan menghapus offset grup konsumer.
Pastikan Pengolahan Idempoten:
Terapkan logika idempoten (misalnya, kunci unik di database) untuk menangani duplikasi saat replay atau reset offset.
Dokumentasikan Konfigurasi:
Catat pengaturan
auto.offset.reset
,offset.retention.minutes
, dan strategi replay untuk memudahkan debugging.
Penjelasan Tambahan
Hubungan dengan Semantik Pengiriman
At Least Once: Gunakan
auto.offset.reset=earliest
untuk memastikan semua data diproses ulang jika offset hilang, dengan komit manual setelah pengolahan.At Most Once: Gunakan
auto.offset.reset=latest
untuk mengabaikan data historis, tetapi risiko kehilangan data meningkat.Exactly Once: Gunakan Kafka Streams dengan
processing.guarantee=exactly_once_v2
untuk mengelola offset secara atomik, denganseek()
untuk replay spesifik.
Pemecahan Masalah
Jika konsumer membaca dari posisi yang salah:
Periksa auto.offset.reset:
Pastikan diatur ke
earliest
untuk replay data ataulatest
untuk data baru.
Periksa offset.retention.minutes:
Tingkatkan jika konsumer sering kehilangan offset:
kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name 0 \ --add-config offsets.retention.minutes=43200
Jika terjadi OffsetOutOfRangeException
:
Periksa auto.offset.reset:
Ubah ke
earliest
ataulatest
jika diatur kenone
.
Periksa Retensi Data:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic poll-topic
Pastikan
retention.ms
cukup panjang untuk menyimpan data yang diperlukan.
Jika replay data menyebabkan duplikasi:
Terapkan logika idempoten di aplikasi (misalnya, simpan offset atau kunci unik di database).
Gunakan
seek()
untuk kontrol offset yang lebih spesifik.
Kesimpulan
Perilaku reset offset otomatis pada konsumer Kafka sangat penting untuk menangani situasi ketika offset tidak valid, seperti setelah konsumer mati lama atau offset dihapus. Pengaturan auto.offset.reset
(latest
, earliest
, none
) menentukan dari mana konsumer mulai membaca, sementara offset.retention.minutes
mengontrol berapa lama offset disimpan. Replay data menggunakan kafka-consumer-groups.sh
atau API tingkat rendah seperti seek()
dan assign()
memungkinkan pemulihan dari kesalahan. Contoh kode Java dan CLI menunjukkan cara menerapkan pengaturan ini, sementara praktik terbaik seperti pengujian, pemantauan offset, dan logika idempoten memastikan pengolahan yang andal. Untuk sebagian besar aplikasi, gunakan auto.offset.reset=earliest
dengan komit manual untuk semantik At Least Once dan tingkatkan offset.retention.minutes
untuk konsumer yang tidak stabil.
Last updated