問題

接起來 Kafka 之後,Consumer 卻遇到有些資料會接收不到的情況。

解決

  1. Producer Acknowledgements
    argment: acks

    -1 = 等所有的 comsumer server 接收到訊息並回覆確認
    0 = 不等
    1 = 等 leader 那台 comsumer 接收到訊息並回覆確認

    1
    2
    3
    4
    5
    6
    7
    8
    9
    async function audProcessor(config) {
    console.log('.........sending');
    return await producer.send({
    topic: KAFKA.TOPIC,
    messages: [{ value: JSON.stringify(config) }],
    // avoid kafka losing data
    acks: -1, //acks=all
    });
    }
  2. Producer retries

    • 設定 delivery.timeout.ms
      argment: transactionTimeout
    • 設定 retries
      argment: retry
    1
    2
    3
    4
    5
    6
    7
    8
    9
    const producer = kafka.producer({
    ...
    // avoid kafka losing data
    transactionTimeout: 60000, // 1min
    retry: {
    retries: 10,
    },
    });

補充

kafkajs max.in.flight.requests.per.connection 設定
如果資料讀取的順序性對你的 application 很重要的話,要另外設定

max.in.flight.requests.per.connection to 1
but not for my case

資料來源

Help, Kafka ate my data!
Producing Messages
Does Kafka really guarantee the order of messages?