Requirement
I need to periodically (once a day) fetch messages from a Kafka topic and write all these messages to a Slack channel.
Design
I have modeled three activities:
- Fetch Messages: Fetch a batch of messages from the Kafka topic.
- Post to Slack: Post each message to Slack (this activity is called for each message).
- Commit Offsets: Commit the offsets to Kafka so that the next read starts from the committed offset.
Problem Statement
The issue I’m encountering is that the activity responsible for committing offsets runs into the following error:
swift
Copy code
{"message":"CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the rebalance timeout with max_poll_interval_ms, or by reducing the maximum size of batches returned in poll() with max_poll_records.","source":"","stackTrace":" File \"/path/to/file\", line 447, in _run_activity\n result = await impl.execute_activity(input)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n File \"/path/to/file\", line 703, in execute_activity\n return await input.fn(*input.args)\n ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n File \"/path/to/file\", line 140, in commit_kafka_offsets\n consumer.commit(offsets=offsets)\n\n File \"/path/to/file\", line 527, in commit\n self._coordinator.commit_offsets_sync(offsets)\n\n File \"/path/to/file\", line 521, in commit_offsets_sync\n raise future.exception # pylint: disable-msg=raising-bad-type\n ^^^^^^^^^^^^^^^^^^^^^^\n","encodedAttributes":null,"cause":null,"applicationFailureInfo":{"type":"CommitFailedError","nonRetryable":false,"details":null}}
I tried setting max_poll_interval_ms
to a very high value, but that doesn’t fix the issue. The batches I am processing are small enough that I cannot optimize them further.
Proposed Solution
The only solution I can think of is to model this by having one activity that does everything:
- Fetch messages in batches.
- Send them to Slack.
- Commit the offsets.
- Iterate this process.
The downside is that if posting to Slack fails, we would re-fetch messages from Kafka, potentially duplicating a few messages. However, this might not be a huge problem.
Questions
- Are there any models similar to the three-activity model that can achieve this requirement?
- How can I commit the offsets to Kafka effectively with this design?
I can provide the code in Python if that helps.