Issue
Kafka throws "consumer poll timeout has expired" Exception
Diagnosis
When we execute v2 connectors, can observed log entries as below:
2024-05-08T01:41:57.041Z - WARN [kafka-coordinator-heartbeat-thread | index-pipeline--entity-product-item--fusion.connectors.datasource-pmdmProductItems:org.apache.kafka.clients.consumer.internals.AbstractCoordinator$HeartbeatThread@1408] -
[Consumer clientId=entity-product-item-BqTgYu, groupId=index-pipeline--entity-product-item--fusion.connectors.datasource-pmdmProductItems]
consumer poll timeout has expired.
This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically suggests that the poll loop is spending too much time processing messages. We can address this by either increasing the max.poll.interval.ms or by reducing the maximum size of batches returned by poll() using max.poll.records.
This issue is present in version 5.9.x but was not in version 5.5.1. The reason is that the use of Kafka by the connector plugins to send documents for indexing began in Fusion 5.6. Therefore, in Fusion 5.5.1, there were no max.poll.records or max.poll.interval.ms values to configure.
This issue impacts all v2 connectors. When time-intensive processing steps are executed in the index pipeline, any v2 connector malfunctions, resulting in the re-processing of the same documents multiple times and hanging instead of stopping after completing the document processing.
The max.poll.records value is 500, and max.poll.interval.ms is 300000 (5 minutes) which are the default values and there is no option in the connector configuration to change these.
The combination of these properties means the index pipeline needs to consume 500 records within 5 minutes, which gives the index pipeline an average of 0.6 seconds per record. A limit of 600 ms could be difficult to meet if there is significant processing required for each record.
Environment
Fusion-5.6 - Fusion- 5.9.3
Cause
These two values are not configurable in 5.9.3 and fix has been applied in 5.9.4 at code base level with below changes-
private static final int MAX_POLL_INTERVAL_DEFAULT = 900000; //15 minutes
private static final int MAX_POLL_RECORDS_DEFAULT = 100;
Resolution
We can change these two properties in config files as mentioned in below screenshots.
Here are a few questions and related answers :
- Describe what happens when the index pipeline fails to complete processing the polled records within the specified "max.poll.interval.ms"?
If the Kafka client doesn’t finish processing the records it received by polling within the interval, the broker assumes the client has failed and will proceed accordingly (e.g., possibly attempting to rebalance the consumers for the given topic). The default values allow only 600 ms for processing each item, which may be insufficient in many cases. - Confirm that the "max.poll.records" value is set to 500 and the "max.poll.interval.ms" value is set to 300,000 for the index pipeline?
Yes - Could you confirm whether these values are configurable ?
Yes, it can be done .
Comments
0 comments
Article is closed for comments.