I have written below code but it is not running. any idea how to get ?
import asyncio
from temporalio.api.workflowservice.v1 import ListTaskQueuePartitionsRequest
from libutil import logger
class QueueMonitor:
def __init__(self, client, task_queue, threshold):
self.client = client
self.task_queue = task_queue
self.threshold = threshold
async def monitor_queue(self):
while True:
queue_size = await self.get_queue_size()
if queue_size > self.threshold:
self.send_alert(queue_size)
await asyncio.sleep(5)
async def get_queue_size(self):
try:
service = self.client.workflow_service
partitions_request = ListTaskQueuePartitionsRequest()
# partitions_request.namespace = "default"
# partitions_request.task_queue = self.task_queue
partitions_response = await service.list_task_queue_partitions(partitions_request)
filtered_partitions = [
partition
for partition in partitions_response.partitions
if partition.status not in {"COMPLETED", "FAILED", "CANCELED"}
]
queue_size = sum(partition.backlog_count for partition in filtered_partitions)
logger.info(f"Queue size for {self.task_queue}: {queue_size}")
return queue_size
except Exception as e:
logger.error(f"Error getting queue size for {self.task_queue}: {e}")
return 0
def send_alert(self, queue_size):
# Implement your alerting mechanism here
logger.warning(
f"ALERT: Temporal queue size exceeded threshold for {self.task_queue}. "
f"Size: {queue_size}, Threshold: {self.threshold}."
)