Kafka Consumer Advanced: Perilaku Reset Offset Otomatis Konsumer Kafka

Perilaku Reset Offset Otomatis Konsumer Kafka

Dalam Apache Kafka, konsumer diharapkan membaca data dari log secara terus-menerus, dengan offset menentukan posisi terakhir yang dibaca dalam partisi. Ketika offset yang dikomit sebelumnya tidak tersedia (misalnya, karena konsumer mati lebih lama dari periode retensi offset atau karena bug aplikasi), konsumer harus menentukan dari mana memulai membaca. Artikel ini menjelaskan pengaturan auto.offset.reset, peran offset.retention.minutes, cara melakukan replay data untuk pemulihan, dan alternatif menggunakan API tingkat rendah seperti seek() dan assign(). Kami menyertakan contoh kode Java, langkah-langkah pengujian dengan CLI, dan praktik terbaik untuk memastikan pengelolaan offset yang andal di lingkungan produksi.

Perilaku Offset Konsumer

Konsumer Kafka melacak posisi baca menggunakan offset, yang disimpan di topik internal __consumer_offsets. Offset menunjukkan pesan terakhir yang telah diproses dari partisi tertentu. Namun, offset dapat menjadi "tidak valid" jika:

  • Konsumer mati lebih lama dari periode retensi offset (default: 7 hari).

  • Topik dihapus atau partisi diubah.

  • Grup konsumer baru digunakan tanpa riwayat offset.

Diagram Perilaku Offset:

[Log Partisi: Pesan 0 -> Pesan 1 -> ... -> Pesan N]
[Konsumer: Offset dikomit di Pesan N-1, membaca Pesan N berikutnya]

Jika offset tidak tersedia, pengaturan auto.offset.reset menentukan perilaku konsumer.

Pengaturan auto.offset.reset

Pengaturan auto.offset.reset mengontrol dari mana konsumer mulai membaca jika tidak ada offset yang valid untuk grup konsumer. Nilai yang mungkin adalah:

  1. latest (default):

    • Konsumer mulai membaca dari ujung (tail) partisi, yaitu pesan terbaru yang masuk setelah konsumer aktif.

    • Implikasi: Pesan sebelumnya diabaikan, cocok untuk aplikasi yang hanya peduli dengan data baru (misalnya, pemantauan real-time).

    • Kasus Penggunaan: Sistem yang dapat mentoleransi kehilangan data historis, seperti analitik streaming.

  2. earliest:

    • Konsumer mulai membaca dari offset tertua yang tersedia di partisi (biasanya dari awal log, tergantung pada retensi data topik).

    • Implikasi: Semua pesan yang tersedia di partisi akan dibaca, cocok untuk pemrosesan ulang data historis.

    • Kasus Penggunaan: Sistem yang memerlukan pemrosesan semua data, seperti ETL atau audit.

  3. none:

    • Konsumer melempar pengecualian (OffsetOutOfRangeException) jika tidak ada offset yang valid.

    • Implikasi: Memaksa aplikasi untuk menangani kasus offset tidak valid secara eksplisit.

    • Kasus Penggunaan: Sistem kritis yang memerlukan kontrol ketat atas offset.

Catatan:

  • Pilihan auto.offset.reset hanya berlaku saat offset tidak ditemukan untuk grup konsumer (misalnya, grup baru atau offset dihapus karena retensi).

  • Nilai default (latest) cocok untuk sebagian besar kasus, tetapi sesuaikan berdasarkan kebutuhan aplikasi.

offset.retention.minutes

  • Deskripsi: Menentukan periode retensi (dalam menit) untuk offset yang disimpan di topik __consumer_offsets.

  • Default: 10080 menit (7 hari) pada Kafka versi ≥ 2.0.

  • Fungsi:

    • Offset yang lebih lama dari periode retensi akan dihapus, menyebabkan offset menjadi tidak valid.

    • Jika konsumer mati lebih lama dari offset.retention.minutes (misalnya, >7 hari) atau topik memiliki throughput rendah, offset yang dikomit sebelumnya mungkin hilang.

  • Implikasi:

    • Jika offset hilang, konsumer akan menggunakan auto.offset.reset untuk menentukan posisi baca.

    • Untuk aplikasi dengan konsumer yang sering mati lama atau topik dengan throughput rendah, tingkatkan offset.retention.minutes (misalnya, ke 1 bulan atau 43200 menit).

  • Pengaturan: Ini adalah pengaturan tingkat broker, diatur melalui offsets.retention.minutes di konfigurasi broker atau perubahan dinamis:

    kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name 0 \
    --add-config offsets.retention.minutes=43200

Replay Data untuk Konsumer

Dalam situasi tertentu, Anda mungkin perlu memutar ulang (replay) data dari offset sebelumnya untuk pemulihan dari kesalahan atau analisis ulang. Langkah-langkah untuk melakukan replay data:

  1. Hentikan Semua Konsumer dalam Grup:

    • Pastikan semua konsumer dalam grup konsumer berhenti untuk mencegah konflik.

  2. Atur Ulang Offset:

    • Gunakan perintah kafka-consumer-groups.sh untuk mengatur ulang offset ke posisi yang diinginkan (misalnya, earliest, offset spesifik, atau berdasarkan waktu).

    • Contoh: Atur ulang offset ke awal untuk grup consumer-group:

      kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group \
      --topic poll-topic --reset-offsets --to-earliest --execute
    • Verifikasi offset baru:

      kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe
  3. Restart Konsumer:

    • Jalankan kembali konsumer untuk mulai membaca dari offset yang telah diatur ulang.

Catatan:

  • Replay data dapat menyebabkan duplikasi jika pengolahan tidak idempoten. Pastikan logika pengolahan menangani duplikasi (misalnya, menggunakan kunci unik).

  • Gunakan alat seperti Conduktor untuk antarmuka grafis dalam mengatur ulang offset.

Alternatif: Menggunakan API Tingkat Rendah (seek dan assign)

Sebagai alternatif untuk grup konsumer dan offset otomatis, Anda dapat menggunakan API tingkat rendah seperti seek() dan assign() untuk mengontrol posisi baca secara manual tanpa bergantung pada grup konsumer.

  • assign(): Menetapkan partisi tertentu ke konsumer secara eksplisit, bukan menggunakan langganan grup konsumer.

  • seek(): Mengatur offset spesifik untuk partisi yang ditugaskan, memungkinkan pembacaan dari posisi tertentu.

Contoh Kode Java untuk Seek dan Assign

Berikut adalah contoh kode Java yang menggunakan assign() dan seek() untuk membaca dari offset tertentu:

import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class SeekAssignConsumer {
    public static void main(String[] args) {
        // Konfigurasi konsumer
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // Buat konsumer
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        // Tetapkan partisi secara manual
        String topic = "poll-topic";
        TopicPartition partition = new TopicPartition(topic, 0);
        consumer.assign(Collections.singletonList(partition));

        // Seek ke offset tertentu (misalnya, offset 0 untuk replay dari awal)
        consumer.seek(partition, 0);

        try {
            while (true) {
                // Poll pesan
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                
                // Proses setiap pesan
                for (ConsumerRecord<String, String> record : records) {
                    String key = record.key();
                    String value = record.value();
                    System.out.printf("Menerima pesan: key=%s, value=%s, partition=%d, offset=%d%n",
                            key, value, record.partition(), record.offset());
                    
                    // Simulasi pengolahan idempoten
                    try {
                        processMessage(key, value);
                    } catch (Exception e) {
                        System.err.printf("Gagal memproses pesan: %s%n", e.getMessage());
                        continue;
                    }
                }
                
                // Komit offset secara manual
                if (!records.isEmpty()) {
                    try {
                        consumer.commitSync();
                        System.out.println("Offset dikomit");
                    } catch (Exception e) {
                        System.err.printf("Gagal mengomit offset: %s%n", e.getMessage());
                    }
                }
            }
        } finally {
            consumer.close();
        }
    }

    private static void processMessage(String key, String value) {
        // Simulasi pengolahan idempoten
        System.out.printf("Memproses pesan: key=%s, value=%s%n", key, value);
    }
}

Penjelasan Kode:

  • Menggunakan assign() untuk menetapkan partisi secara eksplisit, menghindari koordinasi grup konsumer.

  • seek(partition, 0) mengatur konsumer untuk membaca dari offset awal partisi.

  • Komit manual dengan commitSync() untuk semantik At Least Once.

  • Cocok untuk kasus di mana Anda perlu kontrol penuh atas offset, seperti replay data atau konsumsi partisi spesifik.

Praktik Konfigurasi dan Pengujian

Berikut adalah langkah-langkah praktis untuk mengonfigurasi konsumer dengan pengaturan reset offset, pengujian replay data, dan penggunaan API tingkat rendah.

Prasyarat

  • Pastikan klaster Kafka berjalan (versi ≥ 2.0, mode Zookeeper atau KRaft).

  • Gunakan ekstensi CLI yang sesuai: .sh untuk Linux/Mac, .bat untuk Windows.

  • Pastikan broker Kafka aktif di localhost:9092.

  • Klaster memiliki setidaknya tiga broker untuk mendukung replication.factor=3.

Langkah-langkah Praktis

1. Buat Topik

Buat topik bernama poll-topic dengan 3 partisi, faktor replikasi 3, dan min.insync.replicas=2:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic poll-topic \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2

Verifikasi konfigurasi topik:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic poll-topic

Contoh Keluaran:

Topic: poll-topic TopicId: XYZ123 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: poll-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: poll-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: poll-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

2. Produksi Pesan untuk Pengujian

Gunakan produser konsol untuk mengirim pesan:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--property parse.key=true \
--property key.separator=,

Kirim beberapa pesan:

key1,{"id": 1, "data": "Test message 1"}
key2,{"id": 2, "data": "Test message 2"}
key3,{"id": 3, "data": "Test message 3"}

3. Uji Reset Offset dengan CLI

Hentikan semua konsumer dalam grup consumer-group. Atur ulang offset ke earliest:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group \
--topic poll-topic --reset-offsets --to-earliest --execute

Verifikasi offset baru:

kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe

Contoh Keluaran:

Consumer group 'consumer-group' has no active members.
GROUP           TOPIC        PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG  CONSUMER-ID
consumer-group  poll-topic   0          0               3               3    -
consumer-group  poll-topic   1          0               2               2    -
consumer-group  poll-topic   2          0               1               1    -

Jalankan konsumer konsol untuk menguji perilaku reset:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--from-beginning \
--group consumer-group \
--property print.key=true \
--property key.separator=, \
--consumer-property auto.offset.reset=earliest

Contoh Keluaran:

key1,{"id": 1, "data": "Test message 1"}
key2,{"id": 2, "data": "Test message 2"}
key3,{"id": 3, "data": "Test message 3"}

4. Uji Konsumer dengan Seek dan Assign

Jalankan kode Java di atas untuk menguji konsumsi dari offset spesifik (misalnya, offset 0). Verifikasi bahwa konsumer membaca dari awal partisi.

5. Uji Perilaku dengan Offset Retention

  • Simulasi Offset Tidak Valid:

    • Atur offsets.retention.minutes ke nilai rendah (misalnya, 1 menit) untuk pengujian:

      kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name 0 \
      --add-config offsets.retention.minutes=1
    • Hentikan konsumer selama >1 menit, lalu jalankan kembali dengan auto.offset.reset=earliest.

    • Periksa apakah konsumer membaca dari awal log.

  • Pantau Offset:

    kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe
  • Pantau Performa:

    • Gunakan metrik JMX seperti RecordsConsumedTotal dan OffsetCommitRate untuk memantau efisiensi konsumsi.

Catatan

  • Retensi Data vs. Offset: Pastikan periode retensi data topik (retention.ms) lebih lama dari offset.retention.minutes jika Anda ingin replay data historis.

  • Mode KRaft: Pengelolaan offset serupa di mode KRaft, tetapi pastikan controller stabil untuk topik __consumer_offsets.

  • API Tingkat Rendah: Gunakan seek() dan assign() hanya untuk kasus khusus, seperti konsumsi partisi spesifik atau replay data manual.

Praktik Terbaik

  1. Pilih auto.offset.reset yang Tepat:

    • Gunakan latest untuk aplikasi yang hanya memerlukan data baru.

    • Gunakan earliest untuk aplikasi yang memerlukan semua data historis.

    • Gunakan none untuk sistem kritis yang memerlukan penanganan eksplisit offset tidak valid.

  2. Sesuaikan offset.retention.minutes:

    • Tingkatkan ke 43200 (1 bulan) untuk konsumer yang sering mati lama atau topik dengan throughput rendah.

    • Monitor retensi offset dengan:

      kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type brokers
  3. Gunakan Replay untuk Pemulihan:

    • Atur ulang offset dengan kafka-consumer-groups.sh untuk memutar ulang data saat pemulihan dari kesalahan.

    • Pastikan pengolahan idempoten untuk menangani duplikasi.

  4. Gunakan API Tingkat Rendah untuk Kontrol Halus:

    • Gunakan seek() dan assign() untuk konsumsi partisi spesifik atau replay offset tertentu.

    • Hindari untuk aplikasi berskala besar karena memerlukan pengelolaan manual yang kompleks.

  5. Pantau Offset dan Performa:

    • Gunakan kafka-consumer-groups.sh untuk memeriksa lag dan offset grup konsumer.

    • Pantau metrik JMX seperti OffsetCommitRate dan RecordsConsumedTotal.

  6. Uji di Lingkungan Non-Produksi:

    • Simulasikan offset tidak valid dengan mengatur offsets.retention.minutes rendah.

    • Uji perilaku auto.offset.reset dengan menghapus offset grup konsumer.

  7. Pastikan Pengolahan Idempoten:

    • Terapkan logika idempoten (misalnya, kunci unik di database) untuk menangani duplikasi saat replay atau reset offset.

  8. Dokumentasikan Konfigurasi:

    • Catat pengaturan auto.offset.reset, offset.retention.minutes, dan strategi replay untuk memudahkan debugging.

Penjelasan Tambahan

Hubungan dengan Semantik Pengiriman

  • At Least Once: Gunakan auto.offset.reset=earliest untuk memastikan semua data diproses ulang jika offset hilang, dengan komit manual setelah pengolahan.

  • At Most Once: Gunakan auto.offset.reset=latest untuk mengabaikan data historis, tetapi risiko kehilangan data meningkat.

  • Exactly Once: Gunakan Kafka Streams dengan processing.guarantee=exactly_once_v2 untuk mengelola offset secara atomik, dengan seek() untuk replay spesifik.

Pemecahan Masalah

Jika konsumer membaca dari posisi yang salah:

  • Periksa auto.offset.reset:

    • Pastikan diatur ke earliest untuk replay data atau latest untuk data baru.

  • Periksa offset.retention.minutes:

    • Tingkatkan jika konsumer sering kehilangan offset:

      kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type brokers --entity-name 0 \
      --add-config offsets.retention.minutes=43200

Jika terjadi OffsetOutOfRangeException:

  • Periksa auto.offset.reset:

    • Ubah ke earliest atau latest jika diatur ke none.

  • Periksa Retensi Data:

    kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic poll-topic

    Pastikan retention.ms cukup panjang untuk menyimpan data yang diperlukan.

Jika replay data menyebabkan duplikasi:

  • Terapkan logika idempoten di aplikasi (misalnya, simpan offset atau kunci unik di database).

  • Gunakan seek() untuk kontrol offset yang lebih spesifik.

Kesimpulan

Perilaku reset offset otomatis pada konsumer Kafka sangat penting untuk menangani situasi ketika offset tidak valid, seperti setelah konsumer mati lama atau offset dihapus. Pengaturan auto.offset.reset (latest, earliest, none) menentukan dari mana konsumer mulai membaca, sementara offset.retention.minutes mengontrol berapa lama offset disimpan. Replay data menggunakan kafka-consumer-groups.sh atau API tingkat rendah seperti seek() dan assign() memungkinkan pemulihan dari kesalahan. Contoh kode Java dan CLI menunjukkan cara menerapkan pengaturan ini, sementara praktik terbaik seperti pengujian, pemantauan offset, dan logika idempoten memastikan pengolahan yang andal. Untuk sebagian besar aplikasi, gunakan auto.offset.reset=earliest dengan komit manual untuk semantik At Least Once dan tingkatkan offset.retention.minutes untuk konsumer yang tidak stabil.

Last updated