avatar

Kafka 採坑紀錄

Message明明都被處理過卻有殘留Lag

先解說架構,在我的練習專案「GeoPulse Pipeline」中,Kafka規劃出了 三個分區,而我一開始寫的commit程式碼為:

var offsets = buffer.Last(); consumer.Commit(offsets);

這樣的寫法代表著我在commit的這個動作,只抓取buffer中最後的紀錄來進行commit,而buffer前面的message就不會被commit。

partition 0 1000 message partition 1 1000 message partition 2 1000 message partition 1 1000 message => buffer.Last() 只會抓取這個紀錄commmit

這會導致明明所有的message都被處理過了,但kafka卻紀錄還有message沒有被處理。

解決方法如下:

var offsets = buffer .GroupBy(msg => msg.TopicPartition) .Select(g => new TopicPartitionOffset( g.Key, g.Last().Offset + 1 )) .ToList(); consumer.Commit(offsets);

這個寫法的話會將buffer整理成一個扁平的List一次進行commit。

Kafka 採坑紀錄 | SIC 個人網站