問題
接起來 Kafka 之後,Consumer 卻遇到有些資料會接收不到的情況。
解決
Producer Acknowledgements
argment: acks-1 = 等所有的 comsumer server 接收到訊息並回覆確認
0 = 不等
1 = 等 leader 那台 comsumer 接收到訊息並回覆確認1
2
3
4
5
6
7
8
9async function audProcessor(config) {
console.log('.........sending');
return await producer.send({
topic: KAFKA.TOPIC,
messages: [{ value: JSON.stringify(config) }],
// avoid kafka losing data
acks: -1, //acks=all
});
}Producer retries
- 設定
delivery.timeout.ms
argment: transactionTimeout - 設定
retries
argment: retry
1
2
3
4
5
6
7
8
9const producer = kafka.producer({
...
// avoid kafka losing data
transactionTimeout: 60000, // 1min
retry: {
retries: 10,
},
});- 設定
補充
kafkajs max.in.flight.requests.per.connection 設定
如果資料讀取的順序性對你的 application 很重要的話,要另外設定
max.in.flight.requests.per.connection to 1
but not for my case
資料來源
Help, Kafka ate my data!
Producing Messages
Does Kafka really guarantee the order of messages?