Kafka Producer Advanced: Konfigurasi Retries Produser Kafka
Konfigurasi Retries Produser Kafka
Dalam Apache Kafka, produser dapat menghadapi kesalahan saat mengirim pesan ke broker, seperti kegagalan jaringan atau replika yang tidak tersedia. Kesalahan ini dapat bersifat retriable (dapat dicoba ulang) atau non-retriable (tidak dapat dicoba ulang). Mengonfigurasi retries dengan benar sangat penting untuk memastikan pesan tidak hilang, sambil menjaga pengurutan dan performa sistem. Artikel ini akan menjelaskan jenis-jenis kesalahan, parameter konfigurasi retries, cara mengonfigurasi produser untuk retries yang aman, serta praktik terbaik untuk menyeimbangkan ketahanan (durability), pengurutan (ordering), dan throughput. Contoh lengkap menggunakan kode Java juga disertakan untuk mengilustrasikan implementasi.
Jenis Kesalahan Produser
Saat produser mengirim pesan ke broker Kafka, broker dapat mengembalikan kode sukses atau kode kesalahan. Kesalahan ini terbagi menjadi dua kategori:
Kesalahan Retriable:
Kesalahan yang dapat diselesaikan dengan mencoba ulang pengiriman pesan.
Contoh:
NotEnoughReplicasException
, yang terjadi ketika jumlah replika In-Sync Replicas (ISR) di bawahmin.insync.replicas
. Mencoba ulang dapat berhasil jika replika kembali online.Produser dapat dikonfigurasi untuk mencoba ulang secara otomatis untuk kesalahan ini.
Kesalahan Non-Retriable:
Kesalahan yang tidak akan terselesaikan meskipun dicoba ulang.
Contoh:
InvalidConfigException
, yang menunjukkan masalah konfigurasi yang tidak dapat diperbaiki dengan pengiriman ulang.Produser akan langsung menandai pesan sebagai gagal untuk kesalahan ini.
Mengaktifkan retries sangat dianjurkan untuk mencegah kehilangan pesan akibat kesalahan retriable, tetapi konfigurasi harus diatur dengan hati-hati untuk menghindari masalah seperti pengurutan pesan yang salah atau penurunan performa.
Parameter Konfigurasi Retries
Berikut adalah parameter konfigurasi utama untuk mengelola retries pada produser Kafka:
1. retries
retries
Deskripsi: Menentukan jumlah percobaan ulang yang dilakukan produser sebelum menandai pesan sebagai gagal.
Default:
Kafka ≤ 2.0:
0
(tidak ada retries).Kafka ≥ 2.1:
2147483647
(nilai maksimum integer,Integer.MAX_VALUE
).
Rekomendasi: Biarkan
retries
tidak diatur secara eksplisit dan gunakandelivery.timeout.ms
untuk mengontrol perilaku retries, karena ini lebih intuitif dan fleksibel.
2. delivery.timeout.ms
delivery.timeout.ms
Deskripsi: Batas waktu total (dalam milidetik) untuk pengiriman pesan, termasuk semua percobaan ulang. Jika pesan tidak dapat dikirim dalam waktu ini, pesan akan ditandai sebagai gagal.
Default:
120000
(2 menit).Contoh: Dengan
delivery.timeout.ms=120000
, produser akan berhenti mencoba setelah 2 menit, meskipunretries
diatur ke nilai besar.Catatan: Diperkenalkan melalui KIP-91 (Kafka 2.1) untuk memberikan kontrol yang lebih baik atas batas waktu pengiriman.
3. retry.backoff.ms
retry.backoff.ms
Deskripsi: Waktu tunggu (dalam milidetik) antara percobaan ulang.
Default:
100
(100 milidetik).Rekomendasi: Sesuaikan nilai ini berdasarkan latensi jaringan dan frekuensi kesalahan. Nilai yang terlalu kecil dapat membebani broker, sedangkan nilai yang terlalu besar meningkatkan latensi pengiriman.
4. max.in.flight.requests.per.connection
max.in.flight.requests.per.connection
Deskripsi: Jumlah maksimum permintaan yang belum diakui (in-flight) yang dapat dikirim produser ke satu koneksi broker.
Default:
5
.Penting:
Jika diatur ke
1
, menjamin pengurutan pesan berdasarkan kunci, karena hanya satu batch yang diproses pada satu waktu.Jika lebih dari
1
(misalnya,5
), batch kedua dapat berhasil sebelum batch pertama yang gagal dicoba ulang, menyebabkan pengurutan pesan berubah. Ini bermasalah jika aplikasi bergantung pada pengurutan kunci.Dengan
enable.idempotence=true
,max.in.flight.requests.per.connection
harus ≤5
untuk menjaga pengurutan pesan.
5. enable.idempotence
enable.idempotence
Deskripsi: Mengaktifkan produser idempoten, yang mencegah duplikasi pesan akibat retries, memastikan pengiriman exactly-once untuk pesan dalam satu sesi produser.
Default:
false
.Persyaratan:
acks=all
harus diatur.max.in.flight.requests.per.connection
harus ≤5
.retries
harus >0
(defaultInteger.MAX_VALUE
pada Kafka ≥ 2.1 sudah cukup).
Keuntungan: Menjamin tidak ada duplikasi pesan dan pengurutan tetap terjaga, bahkan dengan retries.
Konfigurasi Produser Aman untuk Retries
Untuk memastikan retries aman, hindari kehilangan pesan, dan menjaga pengurutan, gunakan konfigurasi berikut:
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
Penjelasan:
enable.idempotence=true
: Mengaktifkan produser idempoten untuk mencegah duplikasi dan menjaga pengurutan.acks=all
: Memastikan pesan ditulis ke semua replika ISR, meningkatkan ketahanan.retries=Integer.MAX_VALUE
: Memungkinkan percobaan ulang maksimum, dibatasi olehdelivery.timeout.ms
.max.in.flight.requests.per.connection=5
: Menjaga pengurutan dengan idempotence (≤5
diperlukan).delivery.timeout.ms=120000
: Membatasi waktu pengiriman hingga 2 menit.retry.backoff.ms=100
: Waktu tunggu standar antara retries.
Trade-off: Keamanan vs. Throughput
Keamanan Maksimum:
Gunakan
enable.idempotence=true
,acks=all
, danmax.in.flight.requests.per.connection=1
untuk menjamin pengurutan dan pengiriman exactly-once.Kekurangan: Throughput menurun karena hanya satu permintaan yang diproses pada satu waktu.
Throughput Tinggi:
Gunakan
max.in.flight.requests.per.connection=5
(dengan idempotence) untuk meningkatkan throughput sambil tetap menjaga pengurutan.Jika pengurutan tidak penting,
acks=1
atauacks=0
dapat digunakan untuk throughput lebih tinggi, tetapi dengan risiko kehilangan data.
Catatan: Dengan
enable.idempotence=true
, pengurutan terjaga untukmax.in.flight.requests.per.connection
≤5
, sehingga ini adalah kompromi yang baik.
Praktik Konfigurasi Retries
Berikut adalah langkah-langkah praktis untuk mengonfigurasi produser dengan retries menggunakan CLI dan kode Java, dengan contoh pengujian.
Prasyarat
Pastikan klaster Kafka berjalan (mode Zookeeper atau KRaft).
Gunakan ekstensi CLI yang sesuai:
.sh
untuk Linux/Mac,.bat
untuk Windows.Pastikan broker Kafka aktif di
localhost:9092
.Klaster memiliki setidaknya tiga broker untuk mendukung
replication.factor=3
.
Langkah-langkah Praktis
1. Buat Topik
Buat topik bernama retry-topic
dengan 3 partisi, faktor replikasi 3, dan min.insync.replicas=2
:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic retry-topic \
--partitions 3 --replication-factor 3 \
--config min.insync.replicas=2
Verifikasi konfigurasi topik:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic retry-topic
Contoh Keluaran:
Topic: retry-topic TopicId: XYZ123 PartitionCount: 3 ReplicationFactor: 3 Configs: min.insync.replicas=2
Topic: retry-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
Topic: retry-topic Partition: 1 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
Topic: retry-topic Partition: 2 Leader: 3 Replicas: 3,1,2 Isr: 3,1,2
2. Implementasi Produser dengan Java
Berikut adalah contoh kode Java untuk produser dengan konfigurasi retries aman:
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class SafeRetryProducer {
public static void main(String[] args) {
// Konfigurasi produser
Properties properties = new Properties();
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
properties.setProperty(ProducerConfig.ACKS_CONFIG, "all");
properties.setProperty(ProducerConfig.RETRIES_CONFIG, Integer.toString(Integer.MAX_VALUE));
properties.setProperty(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "5");
properties.setProperty(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "120000");
properties.setProperty(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, "100");
// Buat produser
KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
// Kirim pesan
for (int i = 0; i < 10; i++) {
String key = "key-" + i;
String value = "Message " + i;
ProducerRecord<String, String> record = new ProducerRecord<>("retry-topic", key, value);
producer.send(record, (metadata, exception) -> {
if (exception == null) {
System.out.printf("Pesan dikirim: key=%s, value=%s, partition=%d, offset=%d%n",
key, value, metadata.partition(), metadata.offset());
} else {
System.err.printf("Gagal mengirim pesan: %s%n", exception.getMessage());
}
});
}
// Tutup produser
producer.flush();
producer.close();
}
}
Penjelasan Kode:
Menggunakan konfigurasi retries aman dengan idempotence.
Mengirim 10 pesan dengan kunci dan nilai untuk pengujian.
Callback menangani hasil pengiriman, mencetak sukses atau kesalahan.
3. Uji dengan Produser Konsol
Jalankan produser konsol dengan konfigurasi retries:
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic retry-topic \
--producer-property acks=all \
--producer-property retries=2147483647 \
--producer-property delivery.timeout.ms=120000 \
--producer-property max.in.flight.requests.per.connection=5 \
--producer-property enable.idempotence=true
Kirim beberapa pesan:
key1,Message 1
key2,Message 2
key3,Message 3
4. Konsumsi Pesan
Jalankan konsumer untuk memverifikasi pesan:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic retry-topic \
--from-beginning \
--property print.key=true \
--property key.separator=,
Contoh Keluaran:
key1,Message 1
key2,Message 2
key3,Message 3
5. Simulasi Kesalahan
Untuk menguji retries:
Matikan satu broker untuk memicu
NotEnoughReplicasException
.Amati apakah produser mencoba ulang secara otomatis (periksa log produser atau callback).
Pastikan semua pesan akhirnya dikirim setelah broker kembali online, selama dalam
delivery.timeout.ms
.
Catatan
Idempotence: Dengan
enable.idempotence=true
, Kafka secara otomatis menangani duplikasi pesan, tetapi memerlukanacks=all
danmax.in.flight.requests.per.connection
≤5
.Pengurutan: Untuk menjamin pengurutan kunci, atur
max.in.flight.requests.per.connection=1
, tetapi ini mengurangi throughput.Mode KRaft: Dalam mode KRaft, perilaku retries tetap sama, tetapi manajemen replika lebih efisien.
Praktik Terbaik
Aktifkan Idempotence untuk Keamanan:
Gunakan
enable.idempotence=true
untuk mencegah duplikasi dan menjaga pengurutan, terutama untuk data kritis.Pastikan
acks=all
danmax.in.flight.requests.per.connection
≤5
.
Gunakan
delivery.timeout.ms
:Alih-alih mengatur
retries
secara eksplisit, gunakandelivery.timeout.ms
(misalnya,120000
) untuk membatasi waktu pengiriman.
Sesuaikan
retry.backoff.ms
:Atur
retry.backoff.ms
berdasarkan latensi jaringan (misalnya,100
untuk jaringan cepat, lebih tinggi untuk jaringan lambat).
Pilih
max.in.flight.requests.per.connection
dengan Hati-hati:Gunakan
1
untuk pengurutan ketat, atau5
dengan idempotence untuk kompromi antara pengurutan dan throughput.
Pantau Metrik:
Gunakan metrik JMX seperti
RequestLatencyAvg
danRetryCount
untuk memantau performa retries.Periksa log produser untuk kesalahan seperti
NotEnoughReplicasException
.
Uji di Lingkungan Non-Produksi:
Simulasikan kegagalan broker untuk memastikan retries bekerja sesuai harapan tanpa kehilangan pesan.
Kombinasikan dengan
min.insync.replicas
:Gunakan
min.insync.replicas=2
denganacks=all
untuk memastikan ketahanan, terutama untuk topik denganreplication.factor=3
.
Dokumentasikan Konfigurasi:
Catat pengaturan retries dan alasan pemilihannya untuk memudahkan pemeliharaan dan debugging.
Penjelasan Tambahan
Hubungan dengan Pengurutan
Retries tanpa batasan
max.in.flight.requests.per.connection=1
dapat mengganggu pengurutan kunci. Misalnya, jika batch pertama gagal tetapi batch kedua berhasil, pesan dalam batch kedua dapat muncul lebih dulu di partisi.Dengan
enable.idempotence=true
, Kafka menangani pengurutan dengan benar hinggamax.in.flight.requests.per.connection=5
.
Pemecahan Masalah
Jika pesan hilang atau pengurutan salah:
Periksa Konfigurasi:
kafka-configs.sh --bootstrap-server localhost:9092 --describe --entity-type topics --entity-name retry-topic
Pastikan
min.insync.replicas
cukup tinggi.Periksa Log Produser: Cari kesalahan seperti
NotEnoughReplicasException
atau timeout.Uji dengan
max.in.flight.requests.per.connection=1
: Jika pengurutan bermasalah, turunkan ke1
untuk debugging.
Jika throughput rendah:
Tinjau
max.in.flight.requests.per.connection
dan tingkatkan ke5
jika idempotence diaktifkan.Pertimbangkan
acks=1
untuk data non-kritis.
Kesimpulan
Konfigurasi retries pada produser Kafka adalah kunci untuk memastikan pesan tidak hilang akibat kesalahan retriable seperti NotEnoughReplicasException
. Dengan mengatur enable.idempotence=true
, acks=all
, retries=Integer.MAX_VALUE
, delivery.timeout.ms=120000
, dan max.in.flight.requests.per.connection=5
, Anda dapat mencapai pengiriman yang aman dengan pengurutan terjaga. Untuk aplikasi yang memerlukan pengurutan ketat, gunakan max.in.flight.requests.per.connection=1
, tetapi waspadai penurunan throughput. Dengan mengikuti praktik terbaik seperti pengujian, pemantauan metrik, dan dokumentasi, Anda dapat mengelola retries secara efektif untuk mendukung aplikasi Kafka yang andal.
Last updated