I want to capture the errors of activity and count it and compare with threshold but not working. Any help?
My code below:
monitor_error.py
from temporalio import activity
from libutil import logger
THRESHOLD = 3
failed_task_counts = {}
def handle_exceptions(func):
async def wrapper(*args, **kwargs):
try:
return await func(*args, **kwargs)
except Exception as e:
logger.error(f"Error executing task: {e}")
task_name = args[1].__name__ if args[1] else args[0].__name__
failed_task_counts[task_name] = failed_task_counts.get(task_name, 0) + 1
return wrapper
@activity.defn
@handle_exceptions
async def handle_activity(target_func, *args, **kwargs):
return await target_func(*args, **kwargs)
async def check_threshold():
for task_name, count in failed_task_counts.items():
if count >= THRESHOLD:
logger.error(f"Alert: Task '{task_name}' has reached the failure threshold ({THRESHOLD}).")
and calling
`from monitor_error import handle_activity, check_threshold
async def main():
client = await temporal_util.get_temporal_client(‘default’)
workers = [
Worker(
client,
task_queue="Test",
workflows=[],
activities=[handle_activity(mytest.process_import)],
interceptors=[SentryInterceptor()],
),
]
logger.info('Workers created, running...')
await asyncio.gather(*[w.run() for w in workers])
await asyncio.create_task(check_threshold())
@temporal_workers.register
def runner():
asyncio.run(main())
if name == ‘main’:
temporal_workers.main()
`
this is throwing error. Your help is appreciated.