Kafka Producer Advanced: Idempotent Kafka Producer

Idempotent Kafka Producer

Dalam Apache Kafka, retries pada produser dapat menyebabkan duplikasi pesan jika pengakuan (acknowledgment) dari broker tidak diterima karena masalah jaringan, meskipun pesan telah berhasil ditulis. Fitur produser idempoten diperkenalkan untuk mengatasi masalah ini dengan memastikan pengiriman exactly-once tanpa duplikasi, bahkan saat retries dilakukan. Artikel ini akan menjelaskan apa itu produser idempoten, bagaimana fitur ini bekerja untuk mencegah duplikasi, mekanisme internalnya, cara mengaktifkannya, dan contoh praktis menggunakan kode Java serta alat CLI. Kami juga menyertakan praktik terbaik untuk menerapkan produser idempoten dengan aman dan efisien.

Masalah dengan Retries dan Duplikasi Pesan

Saat produser Kafka mengirim pesan, ada risiko kecil bahwa pesan yang sama ditulis lebih dari sekali ke broker, menyebabkan duplikasi. Skenario ini dapat terjadi sebagai berikut:

  1. Produser mengirim pesan ke broker Kafka.

  2. Pesan berhasil ditulis dan direplikasi ke replika In-Sync Replicas (ISR).

  3. Masalah jaringan mencegah pengakuan dari broker sampai ke produser.

  4. Produser menganggap kegagalan pengakuan sebagai masalah jaringan sementara dan mencoba ulang pengiriman pesan.

  5. Broker menerima dan menulis pesan yang sama untuk kedua kalinya, menghasilkan duplikasi.

Duplikasi ini bermasalah untuk aplikasi yang memerlukan pengiriman exactly-once, seperti transaksi keuangan atau pembaruan status, karena dapat menyebabkan data yang tidak konsisten.

Apa itu Produser Idempoten?

Produser idempoten adalah fitur Kafka yang memastikan pesan hanya ditulis sekali ke broker, bahkan jika produser melakukan retries karena kegagalan pengakuan. Fitur ini diperkenalkan pada Kafka versi 0.11 dan menjadi default mulai Kafka 3.0 (lihat KIP-679).

Manfaat Produser Idempoten

  • Mencegah Duplikasi: Menjamin pengiriman exactly-once dalam satu sesi produser, bahkan dengan retries.

  • Menjaga Pengurutan: Memastikan pesan dikirim dan ditulis dalam urutan yang benar berdasarkan kunci.

  • Ketahanan: Meningkatkan keandalan untuk aplikasi kritis tanpa overhead tambahan yang signifikan.

Bagaimana Produser Idempoten Bekerja?

Saat enable.idempotence=true diaktifkan, Kafka menerapkan mekanisme berikut:

  1. Producer ID (PID):

    • Setiap produser diberi Producer ID (PID) unik saat diinisialisasi.

    • PID dikirim bersama setiap pesan ke broker untuk mengidentifikasi sumber pesan.

  2. Nomor Urut (Sequence Number):

    • Setiap pesan diberi nomor urut yang meningkat secara monoton (monotonically increasing sequence number), terpisah untuk setiap partisi topik.

    • Nomor urut ini berbeda dari offset dan digunakan hanya untuk keperluan protokol idempotence.

  3. Pelacakan di Sisi Broker:

    • Broker melacak kombinasi PID dan nomor urut tertinggi yang berhasil ditulis untuk setiap partisi.

    • Jika broker menerima pesan dengan nomor urut yang lebih rendah dari yang sudah ada untuk PID yang sama, pesan tersebut dibuang sebagai duplikat.

  4. Proses Pengiriman:

    • Produser mengirim pesan dengan PID dan nomor urut.

    • Jika pengakuan gagal diterima (misalnya, karena masalah jaringan), produser mencoba ulang dengan pesan yang sama (dengan PID dan nomor urut yang sama).

    • Broker memeriksa nomor urut:

      • Jika nomor urut sama atau lebih rendah dari yang sudah diterima, pesan diabaikan (mencegah duplikasi).

      • Jika nomor urut lebih tinggi, pesan ditulis dan nomor urut tertinggi diperbarui.

Mekanisme ini memastikan bahwa meskipun retries terjadi, hanya satu salinan pesan yang ditulis ke partisi, menjaga semantik exactly-once.

Persyaratan untuk Mengaktifkan Idempotence

Untuk mengaktifkan produser idempoten, parameter berikut harus dikonfigurasi:

  • enable.idempotence=true

  • acks=all: Memastikan pesan ditulis ke semua replika ISR.

  • retries > 0: Default pada Kafka ≥ 2.1 adalah Integer.MAX_VALUE (2147483647).

  • max.in.flight.requests.per.connection ≤ 5: Untuk menjaga pengurutan pesan.

Catatan:

  • Mulai Kafka 3.0, enable.idempotence=true dan acks=all adalah default (lihat KIP-679).

  • Jika Anda menggunakan pustaka klien selain Java (misalnya, Python, Go), pastikan pustaka tersebut mendukung idempotence.

Konfigurasi Produser Idempoten

Berikut adalah konfigurasi produser idempoten yang aman:

Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

Penjelasan:

  • enable.idempotence=true: Mengaktifkan fitur idempotence.

  • acks=all: Memastikan ketahanan dengan menulis ke semua replika ISR.

  • retries=Integer.MAX_VALUE: Memungkinkan retries maksimum, dibatasi oleh delivery.timeout.ms.

  • max.in.flight.requests.per.connection=5: Menjaga pengurutan dengan idempotence.

  • delivery.timeout.ms=120000: Membatasi waktu pengiriman hingga 2 menit.

Praktik Konfigurasi dan Pengujian

Berikut adalah langkah-langkah praktis untuk mengonfigurasi dan menguji produser idempoten menggunakan CLI dan kode Java.

Prasyarat

  • Pastikan klaster Kafka berjalan (versi ≥ 0.11, mode Zookeeper atau KRaft).

  • Gunakan ekstensi CLI yang sesuai: .sh untuk Linux/Mac, .bat untuk Windows.

  • Pastikan broker Kafka aktif di localhost:9092.

  • Klaster memiliki setidaknya tiga broker untuk mendukung replication.factor=3.

Langkah-langkah Praktis

1. Buat Topik

Buat topik bernama idempotent-topic dengan 3 partisi, faktor replikasi 3, dan min.insync.replicas=2:

kafka-topics.sh --bootstrap-server localhost:9092 --create --topic idempotent-topic \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2

Verifikasi konfigurasi topik:

kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic idempotent-topic

Contoh Keluaran:

Topic: idempotent-topic TopicId: XYZ123 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: idempotent-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: idempotent-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: idempotent-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2

2. Implementasi Produser Idempoten dengan Java

Berikut adalah contoh kode Java untuk produser idempoten:

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;

public class IdempotentProducer {
    public static void main(String[] args) {
        // Konfigurasi produser
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
        properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
        properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
        properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
        properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");

        // Buat produser
        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);

        // Kirim pesan
        for (int i = 0; i < 10; i++) {
            String key = "key-" + i;
            String value = "Message " + i;
            ProducerRecord<String, String> record = new ProducerRecord<>("idempotent-topic", key, value);

            producer.send(record, (metadata, exception) -> {
                if (exception == null) {
                    System.out.printf("Pesan dikirim: key=%s, value=%s, partition=%d, offset=%d%n",
                            key, value, metadata.partition(), metadata.offset());
                } else {
                    System.err.printf("Gagal mengirim pesan: %s%n", exception.getMessage());
                }
            });
        }

        // Tutup produser
        producer.flush();
        producer.close();
    }
}

Penjelasan Kode:

  • Menggunakan konfigurasi idempoten dengan enable.idempotence=true.

  • Mengirim 10 pesan dengan kunci dan nilai untuk pengujian.

  • Callback menangani hasil pengiriman, mencetak sukses atau kesalahan.

  • Produser akan secara otomatis menangani retries tanpa menghasilkan duplikasi.

3. Uji dengan Produser Konsol

Jalankan produser konsol dengan konfigurasi idempoten:

kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic idempotent-topic \
--producer-property enable.idempotence=true \
--producer-property acks=all \
--producer-property retries=2147483647 \
--producer-property max.in.flight.requests.per.connection=5 \
--producer-property delivery.timeout.ms=120000 \
--property parse.key=true \
--property key.separator=,

Kirim beberapa pesan:

key1,Message 1
key2,Message 2
key3,Message 3

4. Konsumsi Pesan

Jalankan konsumer untuk memverifikasi bahwa tidak ada duplikasi:

kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic idempotent-topic \
--from-beginning \
--property print.key=true \
--property key.separator=,

Contoh Keluaran:

key1,Message 1
key2,Message 2
key3,Message 3

5. Simulasi Kesalahan untuk Menguji Idempotence

  • Matikan satu broker untuk memicu NotEnoughReplicasException.

  • Kirim pesan menggunakan produser di atas.

  • Amati log produser atau callback untuk memastikan retries dilakukan tanpa duplikasi.

  • Pulihkan broker dan verifikasi bahwa konsumer hanya menerima satu salinan dari setiap pesan.

Catatan

  • Keterbatasan:

    • Idempotence hanya menjamin exactly-once dalam satu sesi produser. Jika produser dimulai ulang, PID baru diberikan, dan urutan nomor urut direset.

    • Untuk semantik exactly-once lintas sesi, gunakan Kafka Transactions (tersedia mulai Kafka 0.11).

  • Mode KRaft: Dalam mode KRaft (Kafka tanpa Zookeeper), idempotence tetap berfungsi sama, tetapi manajemen replika lebih efisien.

  • Kompatibilitas Klien: Jika menggunakan pustaka non-Java (misalnya, confluent-kafka untuk Python), pastikan idempotence didukung. Misalnya, di Python:

    from confluent_kafka import Producer
    config = {
        'bootstrap.servers': 'localhost:9092',
        'enable.idempotence': True,
        'acks': 'all',
        'retries': 2147483647,
        'max.in.flight.requests.per.connection': 5,
        'delivery.timeout.ms': 120000
    }
    producer = Producer(config)

Praktik Terbaik

  1. Aktifkan Idempotence untuk Semua Produser:

    • Gunakan enable.idempotence=true untuk semua produser, terutama pada Kafka ≥ 3.0, di mana ini adalah default.

    • Pastikan acks=all dan max.in.flight.requests.per.connection5.

  2. Gunakan delivery.timeout.ms:

    • Atur delivery.timeout.ms (misalnya, 120000) untuk membatasi waktu retries, mencegah produser mencoba ulang tanpa batas.

  3. Jaga Pengurutan dengan max.in.flight.requests.per.connection:

    • Gunakan 1 untuk pengurutan ketat, atau 5 dengan idempotence untuk kompromi antara pengurutan dan throughput.

  4. Pantau Metrik:

    • Gunakan metrik JMX seperti RequestLatencyAvg dan RetryCount untuk memantau performa retries.

    • Periksa log produser untuk kesalahan seperti NotEnoughReplicasException.

  5. Uji di Lingkungan Non-Produksi:

    • Simulasikan kegagalan jaringan atau broker untuk memastikan idempotence mencegah duplikasi.

  6. Kombinasikan dengan min.insync.replicas:

    • Gunakan min.insync.replicas=2 dengan replication.factor=3 untuk ketahanan tinggi.

  7. Gunakan Pustaka Klien yang Kompatibel:

    • Pastikan pustaka klien mendukung idempotence (misalnya, Java, confluent-kafka untuk Python, librdkafka untuk C/C++).

  8. Dokumentasikan Konfigurasi:

    • Catat pengaturan idempotence dan alasan pemilihannya untuk memudahkan debugging dan pemeliharaan.

Penjelasan Tambahan

Hubungan dengan Retries

  • Tanpa idempotence, retries dapat menyebabkan duplikasi jika pengakuan gagal diterima. Idempotence menghilangkan risiko ini dengan pelacakan nomor urut di sisi broker.

  • Idempotence memerlukan retries > 0 untuk menangani kesalahan retriable seperti NotEnoughReplicasException.

Pemecahan Masalah

Jika duplikasi masih terjadi:

  • Periksa apakah enable.idempotence=true dan acks=all diatur dengan benar.

  • Pastikan max.in.flight.requests.per.connection5.

  • Verifikasi status ISR:

    kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic idempotent-topic

Jika performa rendah:

  • Tinjau max.in.flight.requests.per.connection. Nilai 5 biasanya optimal untuk idempotence.

  • Pertimbangkan kompresi (compression.type=gzip) untuk mengurangi latensi pengiriman.

Kesimpulan

Produser idempoten Kafka adalah fitur penting untuk mencegah duplikasi pesan akibat retries, menjamin pengiriman exactly-once dalam satu sesi produser. Dengan mengaktifkan enable.idempotence=true, acks=all, dan konfigurasi seperti max.in.flight.requests.per.connection=5, Anda dapat memastikan pengiriman yang andal tanpa duplikasi sambil menjaga pengurutan. Contoh kode Java dan CLI menunjukkan cara menerapkan dan menguji fitur ini, sementara praktik terbaik seperti pengujian, pemantauan, dan dokumentasi membantu menjaga operasi yang efisien. Fitur ini sangat direkomendasikan untuk semua produser, terutama pada aplikasi kritis, dan didukung secara default mulai Kafka 3.0.

Last updated