Kafka Consumer Advanced: Poll and Internal Threads Behavior
Pengaturan Penting Konsumer Kafka: Perilaku Poll dan Thread Internal
Dalam Apache Kafka, konsumer menggunakan metode poll()
untuk mengambil batch data dari broker, mengelola koordinasi grup, rebalancing partisi, dan pengiriman heartbeat untuk menjaga keanggotaan dalam grup konsumer. Perilaku poll dan thread internal seperti heartbeat thread dan poll thread sangat penting untuk memastikan konsumer tetap aktif, responsif, dan efisien dalam memproses pesan. Artikel ini menjelaskan cara kerja poll, pengaturan utama seperti heartbeat.interval.ms
, session.timeout.ms
, max.poll.interval.ms
, fetch.min.bytes
, fetch.max.wait.ms
, max.partition.fetch.bytes
, fetch.max.bytes
, dan max.poll.records
, serta bagaimana mengonfigurasi konsumer untuk performa optimal. Kami menyertakan contoh kode Java, langkah-langkah pengujian dengan CLI, dan praktik terbaik untuk lingkungan produksi.
Perilaku Poll Konsumer Kafka
Metode poll()
adalah inti dari pengoperasian konsumer Kafka. Setelah konsumer berlangganan ke topik, poll loop menangani:
Koordinasi Grup: Mengelola keanggotaan dalam grup konsumer.
Rebalancing Partisi: Menangani redistribusi partisi saat konsumer bergabung atau keluar dari grup.
Heartbeat: Mengirim sinyal ke koordinator grup untuk menunjukkan konsumer masih aktif.
Pengambilan Data: Mengambil batch pesan dari partisi yang ditugaskan.
Optimasi Internal
Konsumer Kafka dioptimalkan untuk efisiensi:
Jika konsumer berhasil mengambil data, ia akan memulai permintaan fetch berikutnya secara asinkronus (pre-fetching) saat memproses batch saat ini.
Ini mengurangi waktu tunggu pada pemanggilan
poll()
berikutnya, meningkatkan throughput dan mengurangi latensi.
Diagram Perilaku Poll:
[poll() -> Ambil Batch -> Proses Batch | Pre-fetch Batch Berikutnya]
[poll() berikutnya -> Kembalikan Batch yang Sudah Di-fetch]
Kontrol melalui Poll
Metode poll()
memungkinkan konsumer untuk mengontrol:
Posisi Konsumsi: Mengatur dari mana dalam log konsumer mulai membaca (misalnya,
earliest
,latest
).Kecepatan Konsumsi: Mengatur jumlah data yang diambil per poll (via
max.poll.records
,fetch.max.bytes
).Replay Pesan: Memungkinkan konsumsi ulang pesan dengan mengatur ulang offset.
Thread Internal Konsumer
Konsumer Kafka menggunakan dua thread utama untuk operasinya:
Heartbeat Thread: Mengirim heartbeat untuk menjaga keanggotaan grup.
Poll Thread: Menangani pengambilan data melalui
poll()
.
Diagram Thread Internal:
[Heartbeat Thread -> Kirim Heartbeat ke Koordinator]
[Poll Thread -> Panggil poll() -> Ambil & Proses Pesan]
Heartbeat Thread
Fungsi: Mengirim heartbeat secara periodik ke koordinator grup untuk menunjukkan konsumer masih aktif.
Pengaturan:
heartbeat.interval.ms
(default: 3000 ms atau 3 detik): Waktu antara pengiriman heartbeat. Menentukan seberapa sering konsumer melaporkan statusnya.session.timeout.ms
(Kafka ≥ 3.0: 45000 ms atau 45 detik; Kafka ≤ 2.8: 10000 ms atau 10 detik): Waktu maksimum konsumer dianggap aktif tanpa heartbeat. Jika melebihi waktu ini, koordinator menganggap konsumer mati dan memicu rebalance.
Aturan:
heartbeat.interval.ms
harus lebih kecil darisession.timeout.ms
, biasanya tidak lebih dari sepertiga dari nilaisession.timeout.ms
(misalnya, 3 detik untuk 10 detik).Nilai default biasanya cukup, tetapi dapat disesuaikan untuk kasus khusus.
Implikasi:
Heartbeat yang terlalu jarang dapat menyebabkan rebalance prematur.
Session timeout yang terlalu pendek meningkatkan risiko rebalance yang tidak perlu.
Poll Thread
Fungsi: Mengambil batch pesan dari partisi menggunakan
poll()
dan memprosesnya di thread utama.Pengaturan:
max.poll.interval.ms
(default: 300000 ms atau 5 menit): Waktu maksimum antara dua pemanggilanpoll()
. Jika waktu ini terlampaui, konsumer dianggap gagal (livelock), dan koordinator memicu rebalance.max.poll.records
(default: 500): Jumlah maksimum record yang dikembalikan per pemanggilanpoll()
. Mengontrol jumlah data yang diproses dalam satu iterasi.
Implikasi:
max.poll.interval.ms
yang terlalu rendah dapat menyebabkan rebalance jika pengolahan lambat (misalnya, di Apache Spark).max.poll.records
yang terlalu besar meningkatkan penggunaan memori dan waktu pengolahan, berisiko melebihimax.poll.interval.ms
.
Perilaku Fetch Konsumer
Saat poll()
dipanggil, konsumer mengambil data dari partisi dan menyimpannya dalam cache internal untuk diproses. Perilaku fetch dikontrol oleh:
fetch.min.bytes (default: 1 byte): Jumlah minimum data yang diinginkan konsumer per permintaan fetch. Broker menunda pengiriman hingga data mencapai jumlah ini atau waktu tunggu habis.
fetch.max.wait.ms (default: 500 ms): Waktu maksimum broker menunda respons fetch jika
fetch.min.bytes
belum terpenuhi.max.partition.fetch.bytes (default: 1048576 atau 1MB): Jumlah maksimum data per partisi per fetch. Jika batch pertama melebihi batas ini, batch tetap dikirim untuk memastikan kemajuan.
fetch.max.bytes (default: 57671680 atau 55MB): Jumlah maksimum data yang dikembalikan per permintaan fetch dari semua partisi.
Implikasi:
fetch.min.bytes
danfetch.max.wait.ms
mengurangi overhead jaringan dengan menunggu batch besar.max.partition.fetch.bytes
danfetch.max.bytes
membatasi penggunaan memori konsumer.max.poll.records
tidak memengaruhi fetch tetapi membatasi jumlah record yang dikembalikan ke aplikasi perpoll()
.
Konfigurasi Konsumer untuk Performa Optimal
Berikut adalah konfigurasi konsumer untuk performa tinggi dengan perilaku poll dan fetch yang dioptimalkan:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024"); // 1KB
properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, Integer.toString(1 * 1024 * 1024)); // 1MB
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Integer.toString(55 * 1024 * 1024)); // 55MB
Penjelasan:
enable.auto.commit=false
: Menggunakan komit manual untuk semantik At Least Once.heartbeat.interval.ms=3000
dansession.timeout.ms=10000
: Menjaga keanggotaan grup dengan heartbeat reguler.max.poll.interval.ms=300000
: Memberikan waktu cukup untuk pengolahan batch.max.poll.records=500
: Membatasi jumlah record per poll untuk pengolahan terkendali.fetch.min.bytes=1024
danfetch.max.wait.ms=500
: Mengoptimalkan fetch untuk batch besar.max.partition.fetch.bytes=1MB
danfetch.max.bytes=55MB
: Menjaga penggunaan memori wajar.
Praktik Konfigurasi dan Pengujian
Berikut adalah langkah-langkah praktis untuk mengonfigurasi konsumer dengan pengaturan poll dan fetch yang optimal menggunakan kode Java dan alat CLI, serta pengujian untuk memverifikasi performa dan keandalan.
Prasyarat
Pastikan klaster Kafka berjalan (versi ≥ 0.11, mode Zookeeper atau KRaft).
Gunakan ekstensi CLI yang sesuai:
.sh
untuk Linux/Mac,.bat
untuk Windows.Pastikan broker Kafka aktif di
localhost:9092
.Klaster memiliki setidaknya tiga broker untuk mendukung
replication.factor=3
.
Langkah-langkah Praktis
1. Buat Topik
Buat topik bernama poll-topic
dengan 10 partisi, faktor replikasi 3, dan min.insync.replicas=2
:
kafka-topics.sh --bootstrap-server localhost:9092 --create --topic poll-topic \
--partitions 10 --replication-factor 3 \
--config min.insync.replicas=2 \
--config compression.type=producer
Verifikasi konfigurasi topik:
kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic poll-topic
Contoh Keluaran:
Topic: poll-topic TopicId: XYZ123 PartitionCount: 10 ReplicationFactor: 3 Configs: min.insync.replicas=2,compression.type=producer
Topic: poll-topic Partition: 0 Leader: 1 Replicas: 1,2,3 Isr: 1,2,3
...
Topic: poll-topic Partition: 9 Leader: 2 Replicas: 2,3,1 Isr: 2,3,1
2. Produksi Pesan untuk Pengujian
Gunakan produser konsol untuk mengirim pesan:
kafka-console-producer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--property parse.key=true \
--property key.separator=,
Kirim beberapa pesan:
key1,{"id": 1, "data": "Poll test message 1"}
key2,{"id": 2, "data": "Poll test message 2"}
key3,{"id": 3, "data": "Poll test message 3"}
3. Implementasi Konsumer dengan Poll Optimal (Java)
Berikut adalah contoh kode Java untuk konsumer dengan pengaturan poll dan fetch yang dioptimalkan:
import org.apache.kafka.clients.consumer.*;
import org.apache.kafka.common.serialization.StringDeserializer;
import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
public class PollOptimizedConsumer {
public static void main(String[] args) {
// Konfigurasi konsumer
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group");
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
properties.setProperty(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, "3000");
properties.setProperty(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "10000");
properties.setProperty(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
properties.setProperty(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, "1024");
properties.setProperty(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "500");
properties.setProperty(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, Integer.toString(1 * 1024 * 1024));
properties.setProperty(ConsumerConfig.FETCH_MAX_BYTES_CONFIG, Integer.toString(55 * 1024 * 1024));
// Buat konsumer
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singletonList("poll-topic"));
try {
while (true) {
// Poll pesan
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// Proses setiap pesan
for (ConsumerRecord<String, String> record : records) {
String key = record.key();
String value = record.value();
System.out.printf("Menerima pesan: key=%s, value=%s, partition=%d, offset=%d%n",
key, value, record.partition(), record.offset());
// Simulasi pengolahan idempoten
try {
processMessage(key, value);
} catch (Exception e) {
System.err.printf("Gagal memproses pesan: %s%n", e.getMessage());
continue;
}
}
// Komit offset setelah semua pesan diproses
if (!records.isEmpty()) {
try {
consumer.commitSync();
System.out.println("Offset dikomit");
} catch (Exception e) {
System.err.printf("Gagal mengomit offset: %s%n", e.getMessage());
}
}
}
} finally {
consumer.close();
}
}
private static void processMessage(String key, String value) {
// Simulasi pengolahan idempoten
System.out.printf("Memproses pesan: key=%s, value=%s%n", key, value);
}
}
Penjelasan Kode:
Menggunakan pengaturan poll dan fetch yang dioptimalkan untuk throughput dan keandalan.
enable.auto.commit=false
untuk semantik At Least Once dengan komit manual.fetch.min.bytes=1024
danfetch.max.wait.ms=500
untuk batch besar.max.poll.records=500
untuk mengontrol jumlah record per poll.Heartbeat diatur dengan
heartbeat.interval.ms=3000
dansession.timeout.ms=10000
.
4. Uji Konsumer dengan CLI
Jalankan konsumer konsol untuk membandingkan perilaku:
kafka-console-consumer.sh --bootstrap-server localhost:9092 \
--topic poll-topic \
--from-beginning \
--group consumer-group \
--property print.key=true \
--property key.separator=, \
--consumer-property fetch.min.bytes=1024 \
--consumer-property fetch.max.wait.ms=500 \
--consumer-property max.partition.fetch.bytes=1048576 \
--consumer-property fetch.max.bytes=57671680 \
--consumer-property max.poll.records=500
Contoh Keluaran:
key1,{"id": 1, "data": "Poll test message 1"}
key2,{"id": 2, "data": "Poll test message 2"}
key3,{"id": 3, "data": "Poll test message 3"}
5. Uji Performa dan Keandalan
Simulasi Pengolahan Lambat:
Tambahkan penundaan (misalnya,
Thread.sleep(1000)
) diprocessMessage
untuk mengujimax.poll.interval.ms
.Pastikan konsumer tidak memicu rebalance selama pengolahan dalam batas 5 menit.
Simulasi Kegagalan Heartbeat:
Hentikan konsumer sementara untuk melebihi
session.timeout.ms
(10 detik).Periksa apakah rebalance terjadi menggunakan:
kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group consumer-group --describe
Pantau Performa:
Gunakan metrik JMX seperti
RecordsConsumedTotal
,FetchSizeAvg
, danFetchLatencyAvg
untuk mengukur efisiensi fetch.Periksa
ConsumerCoordinatorMetrics
untuk memantau heartbeat dan rebalance.
Uji Fetch Behavior:
Kurangi
fetch.min.bytes
ke1
dan amati peningkatan frekuensi fetch (dengan latensi lebih rendah).Tingkatkan
fetch.max.bytes
ke100MB
untuk menguji kapasitas memori konsumer.
Catatan
Default Biasanya Cukup: Nilai default untuk
heartbeat.interval.ms
,session.timeout.ms
, dan lainnya biasanya cukup untuk sebagian besar kasus. Jangan ubah tanpa pengujian.Mode KRaft: Perilaku poll dan heartbeat serupa di mode KRaft, tetapi pastikan controller stabil untuk koordinasi grup.
Pengolahan Berat: Untuk aplikasi seperti Apache Spark, tingkatkan
max.poll.interval.ms
untuk mengakomodasi waktu pengolahan yang lama.
Praktik Terbaik
Gunakan Komit Manual untuk At Least Once:
Atur
enable.auto.commit=false
dan gunakancommitSync()
setelah pengolahan untuk menjamin semantik At Least Once.
Jaga Heartbeat Konsisten:
Gunakan
heartbeat.interval.ms=3000
dansession.timeout.ms=10000
untuk klaster dengan latensi rendah.Pastikan
heartbeat.interval.ms
≤ 1/3 darisession.timeout.ms
.
Sesuaikan max.poll.interval.ms untuk Pengolahan Berat:
Tingkatkan ke
600000
(10 menit) untuk aplikasi seperti Spark yang memerlukan waktu pengolahan lama.Pastikan pengolahan selesai dalam batas ini untuk menghindari rebalance.
Optimalkan Fetch Behavior:
Gunakan
fetch.min.bytes=1024
danfetch.max.wait.ms=500
untuk batch besar dan efisiensi jaringan.Sesuaikan
max.partition.fetch.bytes
danfetch.max.bytes
berdasarkan kapasitas memori konsumer.
Kontrol Jumlah Record:
Gunakan
max.poll.records=500
untuk pengolahan terkendali. Kurangi ke100
untuk aplikasi dengan memori terbatas.
Pantau Performa dan Rebalance:
Gunakan
kafka-consumer-groups.sh
untuk memeriksa lag dan status grup.Pantau metrik JMX seperti
FetchSizeAvg
,FetchLatencyAvg
, danRebalanceRate
untuk mendeteksi masalah.
Uji di Lingkungan Non-Produksi:
Simulasikan pengolahan lambat, kegagalan jaringan, atau rebalance untuk menguji pengaturan poll dan heartbeat.
Bandingkan performa dengan berbagai nilai
fetch.min.bytes
danmax.poll.records
.
Dokumentasikan Konfigurasi:
Catat pengaturan
heartbeat.interval.ms
,session.timeout.ms
,max.poll.interval.ms
, dan lainnya, serta alasan pemilihannya.
Penjelasan Tambahan
Hubungan dengan Semantik Pengiriman
At Least Once: Gunakan komit manual (
commitSync
) denganenable.auto.commit=false
untuk memastikan pesan diproses sebelum offset dikomit.At Most Once: Gunakan auto-commit (
enable.auto.commit=true
) hanya jika kehilangan pesan dapat ditoleransi.Exactly Once: Gunakan Kafka Streams (
processing.guarantee=exactly_once_v2
) untuk alur Kafka ke Kafka, dengan pengaturan poll yang konsisten.
Pemecahan Masalah
Jika konsumer memicu rebalance:
Periksa Heartbeat:
Pastikan
heartbeat.interval.ms
cukup kecil relatif terhadapsession.timeout.ms
.Periksa koneksi jaringan ke koordinator grup:
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Periksa max.poll.interval.ms:
Tingkatkan jika pengolahan lambat (misalnya, ke
600000
).Optimalkan logika pengolahan untuk mempercepat
poll()
.
Jika throughput rendah:
Periksa Fetch Settings:
Tingkatkan
fetch.min.bytes
ke10240
(10KB) untuk batch lebih besar.Kurangi
fetch.max.wait.ms
ke200
untuk latensi lebih rendah.
Periksa max.poll.records:
Tingkatkan ke
1000
jika memori memadai, tetapi pantau waktu pengolahan.
Jika penggunaan memori tinggi:
Kurangi
max.partition.fetch.bytes
ke512KB
ataufetch.max.bytes
ke10MB
.Kurangi
max.poll.records
ke100
untuk mengurangi beban memori.
Kesimpulan
Perilaku poll dan thread internal (heartbeat dan poll thread) pada konsumer Kafka sangat penting untuk menjaga keanggotaan grup, pengambilan data yang efisien, dan keandalan pengolahan pesan. Pengaturan seperti heartbeat.interval.ms
, session.timeout.ms
, max.poll.interval.ms
, fetch.min.bytes
, fetch.max.wait.ms
, max.partition.fetch.bytes
, fetch.max.bytes
, dan max.poll.records
memungkinkan kontrol halus atas performa konsumer. Contoh kode Java dan CLI menunjukkan cara menerapkan konfigurasi ini, sementara praktik terbaik seperti pengujian, pemantauan metrik, dan dokumentasi memastikan operasi yang andal di lingkungan produksi. Untuk sebagian besar aplikasi, gunakan pengaturan default dengan komit manual untuk semantik At Least Once, dan sesuaikan parameter fetch untuk throughput tinggi.
Last updated