Kafka Producer Advanced: Idempotent Kafka Producer
Idempotent Kafka Producer
Dalam Apache Kafka, retries pada produser dapat menyebabkan duplikasi pesan jika pengakuan (acknowledgment) dari broker tidak diterima karena masalah jaringan, meskipun pesan telah berhasil ditulis. Fitur produser idempoten diperkenalkan untuk mengatasi masalah ini dengan memastikan pengiriman exactly-once tanpa duplikasi, bahkan saat retries dilakukan. Artikel ini akan menjelaskan apa itu produser idempoten, bagaimana fitur ini bekerja untuk mencegah duplikasi, mekanisme internalnya, cara mengaktifkannya, dan contoh praktis menggunakan kode Java serta alat CLI. Kami juga menyertakan praktik terbaik untuk menerapkan produser idempoten dengan aman dan efisien.
Masalah dengan Retries dan Duplikasi Pesan
Saat produser Kafka mengirim pesan, ada risiko kecil bahwa pesan yang sama ditulis lebih dari sekali ke broker, menyebabkan duplikasi. Skenario ini dapat terjadi sebagai berikut:
Produser mengirim pesan ke broker Kafka.
Pesan berhasil ditulis dan direplikasi ke replika In-Sync Replicas (ISR).
Masalah jaringan mencegah pengakuan dari broker sampai ke produser.
Produser menganggap kegagalan pengakuan sebagai masalah jaringan sementara dan mencoba ulang pengiriman pesan.
Broker menerima dan menulis pesan yang sama untuk kedua kalinya, menghasilkan duplikasi.
Duplikasi ini bermasalah untuk aplikasi yang memerlukan pengiriman exactly-once, seperti transaksi keuangan atau pembaruan status, karena dapat menyebabkan data yang tidak konsisten.
Apa itu Produser Idempoten?
Produser idempoten adalah fitur Kafka yang memastikan pesan hanya ditulis sekali ke broker, bahkan jika produser melakukan retries karena kegagalan pengakuan. Fitur ini diperkenalkan pada Kafka versi 0.11 dan menjadi default mulai Kafka 3.0 (lihat KIP-679).
Manfaat Produser Idempoten
Mencegah Duplikasi: Menjamin pengiriman exactly-once dalam satu sesi produser, bahkan dengan retries.
Menjaga Pengurutan: Memastikan pesan dikirim dan ditulis dalam urutan yang benar berdasarkan kunci.
Ketahanan: Meningkatkan keandalan untuk aplikasi kritis tanpa overhead tambahan yang signifikan.
Bagaimana Produser Idempoten Bekerja?
Saat enable.idempotence=true
diaktifkan, Kafka menerapkan mekanisme berikut:
Producer ID (PID):
Setiap produser diberi Producer ID (PID) unik saat diinisialisasi.
PID dikirim bersama setiap pesan ke broker untuk mengidentifikasi sumber pesan.
Nomor Urut (Sequence Number):
Setiap pesan diberi nomor urut yang meningkat secara monoton (monotonically increasing sequence number), terpisah untuk setiap partisi topik.
Nomor urut ini berbeda dari offset dan digunakan hanya untuk keperluan protokol idempotence.
Pelacakan di Sisi Broker:
Broker melacak kombinasi PID dan nomor urut tertinggi yang berhasil ditulis untuk setiap partisi.
Jika broker menerima pesan dengan nomor urut yang lebih rendah dari yang sudah ada untuk PID yang sama, pesan tersebut dibuang sebagai duplikat.
Proses Pengiriman:
Produser mengirim pesan dengan PID dan nomor urut.
Jika pengakuan gagal diterima (misalnya, karena masalah jaringan), produser mencoba ulang dengan pesan yang sama (dengan PID dan nomor urut yang sama).
Broker memeriksa nomor urut:
Jika nomor urut sama atau lebih rendah dari yang sudah diterima, pesan diabaikan (mencegah duplikasi).
Jika nomor urut lebih tinggi, pesan ditulis dan nomor urut tertinggi diperbarui.
Mekanisme ini memastikan bahwa meskipun retries terjadi, hanya satu salinan pesan yang ditulis ke partisi, menjaga semantik exactly-once.
Persyaratan untuk Mengaktifkan Idempotence
Untuk mengaktifkan produser idempoten, parameter berikut harus dikonfigurasi:
enable.idempotence=true
acks=all
: Memastikan pesan ditulis ke semua replika ISR.retries
> 0: Default pada Kafka ≥ 2.1 adalahInteger.MAX_VALUE
(2147483647).max.in.flight.requests.per.connection
≤ 5: Untuk menjaga pengurutan pesan.
Catatan:
Mulai Kafka 3.0,
enable.idempotence=true
danacks=all
adalah default (lihat KIP-679).Jika Anda menggunakan pustaka klien selain Java (misalnya, Python, Go), pastikan pustaka tersebut mendukung idempotence.
Konfigurasi Produser Idempoten
Berikut adalah konfigurasi produser idempoten yang aman:
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
Penjelasan:
enable.idempotence=true
: Mengaktifkan fitur idempotence.acks=all
: Memastikan ketahanan dengan menulis ke semua replika ISR.retries=Integer.MAX_VALUE
: Memungkinkan retries maksimum, dibatasi olehdelivery.timeout.ms
.max.in.flight.requests.per.connection=5
: Menjaga pengurutan dengan idempotence.delivery.timeout.ms=120000
: Membatasi waktu pengiriman hingga 2 menit.
Praktik Konfigurasi dan Pengujian
Berikut adalah langkah-langkah praktis untuk mengonfigurasi dan menguji produser idempoten menggunakan CLI dan kode Java.
Prasyarat
Pastikan klaster Kafka berjalan (versi ≥ 0.11, mode Zookeeper atau KRaft).
Gunakan ekstensi CLI yang sesuai:
.sh
untuk Linux/Mac,.bat
untuk Windows.Pastikan broker Kafka aktif di
localhost:9092
.Klaster memiliki setidaknya tiga broker untuk mendukung
replication.factor=3
.
Langkah-langkah Praktis
1. Buat Topik
Buat topik bernama idempotent-topic
dengan 3 partisi, faktor replikasi 3, dan min.insync.replicas=2
:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic idempotent-topic \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2
Verifikasi konfigurasi topik:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic idempotent-topic
Contoh Keluaran:
Topic: idempotent-topic TopicId: XYZ123 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: idempotent-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: idempotent-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: idempotent-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
2. Implementasi Produser Idempoten dengan Java
Berikut adalah contoh kode Java untuk produser idempoten:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class IdempotentProducer {
public static void main(String[] args) {
// Konfigurasi produser
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
// Buat produser
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// Kirim pesan
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "Message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>("idempotent-topic", key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Pesan dikirim: key=%s, value=%s, partition=%d, offset=%d%n",
key, value, metadata.partition(), metadata.offset());
} else {
System.err.printf("Gagal mengirim pesan: %s%n", exception.getMessage());
}
});
}
// Tutup produser
producer.flush();
producer.close();
}
}
Penjelasan Kode:
Menggunakan konfigurasi idempoten dengan
enable.idempotence=true
.Mengirim 10 pesan dengan kunci dan nilai untuk pengujian.
Callback menangani hasil pengiriman, mencetak sukses atau kesalahan.
Produser akan secara otomatis menangani retries tanpa menghasilkan duplikasi.
3. Uji dengan Produser Konsol
Jalankan produser konsol dengan konfigurasi idempoten:
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic idempotent-topic \
--producer-property enable.idempotence=true \
--producer-property acks=all \
--producer-property retries=2147483647 \
--producer-property max.in.flight.requests.per.connection=5 \
--producer-property delivery.timeout.ms=120000 \
--property parse.key=true \
--property key.separator=,
Kirim beberapa pesan:
key1,Message 1
key2,Message 2
key3,Message 3
4. Konsumsi Pesan
Jalankan konsumer untuk memverifikasi bahwa tidak ada duplikasi:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic idempotent-topic \
--from-beginning \
--property print.key=true \
--property key.separator=,
Contoh Keluaran:
key1,Message 1
key2,Message 2
key3,Message 3
5. Simulasi Kesalahan untuk Menguji Idempotence
Matikan satu broker untuk memicu
NotEnoughReplicasException
.Kirim pesan menggunakan produser di atas.
Amati log produser atau callback untuk memastikan retries dilakukan tanpa duplikasi.
Pulihkan broker dan verifikasi bahwa konsumer hanya menerima satu salinan dari setiap pesan.
Catatan
Keterbatasan:
Idempotence hanya menjamin exactly-once dalam satu sesi produser. Jika produser dimulai ulang, PID baru diberikan, dan urutan nomor urut direset.
Untuk semantik exactly-once lintas sesi, gunakan Kafka Transactions (tersedia mulai Kafka 0.11).
Mode KRaft: Dalam mode KRaft (Kafka tanpa Zookeeper), idempotence tetap berfungsi sama, tetapi manajemen replika lebih efisien.
Kompatibilitas Klien: Jika menggunakan pustaka non-Java (misalnya,
confluent-kafka
untuk Python), pastikan idempotence didukung. Misalnya, di Python:from confluent_kafka import Producer config = { 'bootstrap.servers': 'localhost:9092', 'enable.idempotence': True, 'acks': 'all', 'retries': 2147483647, 'max.in.flight.requests.per.connection': 5, 'delivery.timeout.ms': 120000 } producer = Producer(config)
Praktik Terbaik
Aktifkan Idempotence untuk Semua Produser:
Gunakan
enable.idempotence=true
untuk semua produser, terutama pada Kafka ≥ 3.0, di mana ini adalah default.Pastikan
acks=all
danmax.in.flight.requests.per.connection
≤5
.
Gunakan
delivery.timeout.ms
:Atur
delivery.timeout.ms
(misalnya,120000
) untuk membatasi waktu retries, mencegah produser mencoba ulang tanpa batas.
Jaga Pengurutan dengan
max.in.flight.requests.per.connection
:Gunakan
1
untuk pengurutan ketat, atau5
dengan idempotence untuk kompromi antara pengurutan dan throughput.
Pantau Metrik:
Gunakan metrik JMX seperti
RequestLatencyAvg
danRetryCount
untuk memantau performa retries.Periksa log produser untuk kesalahan seperti
NotEnoughReplicasException
.
Uji di Lingkungan Non-Produksi:
Simulasikan kegagalan jaringan atau broker untuk memastikan idempotence mencegah duplikasi.
Kombinasikan dengan
min.insync.replicas
:Gunakan
min.insync.replicas=2
denganreplication.factor=3
untuk ketahanan tinggi.
Gunakan Pustaka Klien yang Kompatibel:
Pastikan pustaka klien mendukung idempotence (misalnya, Java,
confluent-kafka
untuk Python,librdkafka
untuk C/C++).
Dokumentasikan Konfigurasi:
Catat pengaturan idempotence dan alasan pemilihannya untuk memudahkan debugging dan pemeliharaan.
Penjelasan Tambahan
Hubungan dengan Retries
Tanpa idempotence, retries dapat menyebabkan duplikasi jika pengakuan gagal diterima. Idempotence menghilangkan risiko ini dengan pelacakan nomor urut di sisi broker.
Idempotence memerlukan
retries
> 0 untuk menangani kesalahan retriable sepertiNotEnoughReplicasException
.
Pemecahan Masalah
Jika duplikasi masih terjadi:
Periksa apakah
enable.idempotence=true
danacks=all
diatur dengan benar.Pastikan
max.in.flight.requests.per.connection
≤5
.Verifikasi status ISR:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic idempotent-topic
Jika performa rendah:
Tinjau
max.in.flight.requests.per.connection
. Nilai5
biasanya optimal untuk idempotence.Pertimbangkan kompresi (
compression.type=gzip
) untuk mengurangi latensi pengiriman.
Kesimpulan
Produser idempoten Kafka adalah fitur penting untuk mencegah duplikasi pesan akibat retries, menjamin pengiriman exactly-once dalam satu sesi produser. Dengan mengaktifkan enable.idempotence=true
, acks=all
, dan konfigurasi seperti max.in.flight.requests.per.connection=5
, Anda dapat memastikan pengiriman yang andal tanpa duplikasi sambil menjaga pengurutan. Contoh kode Java dan CLI menunjukkan cara menerapkan dan menguji fitur ini, sementara praktik terbaik seperti pengujian, pemantauan, dan dokumentasi membantu menjaga operasi yang efisien. Fitur ini sangat direkomendasikan untuk semua produser, terutama pada aplikasi kritis, dan didukung secara default mulai Kafka 3.0.
Last updated