How to model fetching from kafka and posting to Slack?

Requirement

I need to periodically (once a day) fetch messages from a Kafka topic and write all these messages to a Slack channel.

Design

I have modeled three activities:

  1. Fetch Messages: Fetch a batch of messages from the Kafka topic.
  2. Post to Slack: Post each message to Slack (this activity is called for each message).
  3. Commit Offsets: Commit the offsets to Kafka so that the next read starts from the committed offset.

Problem Statement

The issue I’m encountering is that the activity responsible for committing offsets runs into the following error:

swift

Copy code

{"message":"CommitFailedError: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max_poll_interval_ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the rebalance timeout with max_poll_interval_ms, or by reducing the maximum size of batches returned in poll() with max_poll_records.","source":"","stackTrace":"  File \"/path/to/file\", line 447, in _run_activity\n    result = await impl.execute_activity(input)\n             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/path/to/file\", line 703, in execute_activity\n    return await input.fn(*input.args)\n           ^^^^^^^^^^^^^^^^^^^^^^^^^^^\n\n  File \"/path/to/file\", line 140, in commit_kafka_offsets\n    consumer.commit(offsets=offsets)\n\n  File \"/path/to/file\", line 527, in commit\n    self._coordinator.commit_offsets_sync(offsets)\n\n  File \"/path/to/file\", line 521, in commit_offsets_sync\n    raise future.exception # pylint: disable-msg=raising-bad-type\n    ^^^^^^^^^^^^^^^^^^^^^^\n","encodedAttributes":null,"cause":null,"applicationFailureInfo":{"type":"CommitFailedError","nonRetryable":false,"details":null}}

I tried setting max_poll_interval_ms to a very high value, but that doesn’t fix the issue. The batches I am processing are small enough that I cannot optimize them further.

Proposed Solution

The only solution I can think of is to model this by having one activity that does everything:

  • Fetch messages in batches.
  • Send them to Slack.
  • Commit the offsets.
  • Iterate this process.

The downside is that if posting to Slack fails, we would re-fetch messages from Kafka, potentially duplicating a few messages. However, this might not be a huge problem.

Questions

  1. Are there any models similar to the three-activity model that can achieve this requirement?
  2. How can I commit the offsets to Kafka effectively with this design?

I can provide the code in Python if that helps.

So this is actually a Kafka question. A Kafka consumer by default is designed to run continuously. If you were going to use a Kafka consumer in the standard Kafka way, you’d run a group of Kafka consumer processes which would continuously forward new messages to your workflows using a Temporal signal.

I think your proposed solution might run into the same issue, if you implement it in the same way on the Kafka side as you’re doing now.

You want to download a batch of messages from Kafka once a day. Which is fine. You can do that with Kafka. I don’t remember how off the top of my head :slightly_smiling_face:. But that is a question you can ask on a Kafka forum: “How do I download a batch of messages once a day and commit the offsets? I don’t want to run a consumer group continuously, I just want to connect once a day, download the messages, and then disconnect”. Then you can implement your activity to do that.

I think you just run an activity that reads from Kafka and publishes to Slack using heartbeating to save progress.