Workflow not getting completed e2e

Hello,
I have a workflow created for user registration. I am having problems as the workflow is not getting completed end-to-end. Further details below. All code is in python using fastAPI

  1. I have a website which takes input for user reg.
  2. The Post on the website triggers the Workflow.
  3. I start the worker job separately
  4. Once “submit” is clicked on the website the post is triggered.
  5. Based on my custom logging I can see the - POST method triggering the workflow which in turn triggers the activity however at this point the “Worker” job is existed and nothing happens further.
  6. On the workflow class I have start_to_close timeout at 600 sec.
    Snippet of the log below
  7. I dont see any error not even in the UI.

Can anyone explain whats going on if i am doing something wrong

Note: the log below is truncated for sensitive info

============================LOG ==============================
12:42:21,708 - INFO -app-log - Worker initiated for user registration
2024-09-08 12:42:37,947 - INFO -app-log - Landing page rendered.
2024-09-08 12:42:43,629 - DEBUG -watchfiles.main - 1 change detected: {(<Change.modified: 2>, ‘C:\temporal_http\app.log’)}
2024-09-08 12:42:50,451 - INFO -app-log - Form data submitted with parameter ==> username
2024-09-08 12:42:50,795 - INFO -app-log - Connection to temporal server completed.
2024-09-08 12:42:50,796 - INFO -app-log - Workflow starting.
2024-09-08 12:42:50,907 - INFO -app-log - Executing workflow => RegisterUser.
2024-09-08 12:42:50,928 - DEBUG -temporalio.worker._activity - Running activity register_user (token b’\n$fa3ac80e-340c-4ee8-b33a-c26b9462df05\x12\x16user_register_workflow\x1a$8c169dd6-7682-4df6-b816-45fb56948063 \x05(\x012\x011B\rregister_userJ\x08\x08\x01\x10\x9b\x80@\x18\x01’)
2024-09-08 12:42:50,928 - DEBUG -temporalio.activity - Starting activity ({‘activity_id’: ‘1’, ‘activity_type’: ‘register_user’, ‘attempt’: 1, ‘namespace’: ‘default’, ‘task_queue’: ‘user_register_queue’, ‘workflow_id’: ‘user_register_workflow’, ‘workflow_run_id’: ‘8c169dd6-7682-4df6-b816-45fb56948063’, ‘workflow_type’: ‘RegisterUser’})
2024-09-08 12:42:50,928 - INFO -app-log - User registration activity initiated.
2024-09-08 12:42:50,929 - INFO -app-log - Working on user username
2024-09-08 12:42:52,469 - DEBUG -watchfiles.main - 1 change detected: {(<Change.modified: 2>, ‘C:\temporal_http\app.log’)}
2024-09-08 12:42:52,826 - DEBUG -watchfiles.main - 1 change detected: {(<Change.modified: 2>, ‘C:\\temporal_http\app.log’)}
2024-09-08 12:42:53,443 - DEBUG -watchfiles.main - 1 change detected: {(<Change.modified: 2>, ‘C:\temporal_http\app.log’)}
2024-09-08 12:43:50,868 - DEBUG -watchfiles.main - 1 change detected: {(<Change.modified: 2>, ‘C:\\temporal_http\app.log’)}
2024-09-08 12:44:59,072 - DEBUG -watchfiles.main - 3 changes detected: {(<Change.added: 1>, ‘C:\temporal_http\worker.py~’), (<Change.modified: 2>, ‘C\temporal_http\worker.py’), (<Change.deleted: 3>, ‘C:\temporal_http\worker.py~’)}
2024-09-08 12:59:36,836 - DEBUG -asyncio - Using proactor: IocpProactor
2024-09-08 12:59:37,170 - INFO -app-log - Worker initiated for user registration
2024-09-08 12:59:37,434 - DEBUG -temporalio.worker._activity - Running activity register_user (token b’\n$fa3ac80e-340c-4ee8-b33a-c26b9462df05\x12\x16user_register_workflow\x1a$8c169dd6-7682-4df6-b816-45fb56948063 \x05(\x022\x011B\rregister_userJ\x08\x08\x01\x10\x9e\x80@\x18\x01’)
2024-09-08 12:59:37,434 - DEBUG -temporalio.activity - Starting activity ({‘activity_id’: ‘1’, ‘activity_type’: ‘register_user’, ‘attempt’: 2, ‘namespace’: ‘default’, ‘task_queue’: ‘user_register_queue’, ‘workflow_id’: ‘user_register_workflow’, ‘workflow_run_id’: ‘8c169dd6-7682-4df6-b816-45fb56948063’, ‘workflow_type’: ‘RegisterUser’})
2024-09-08 12:59:37,434 - INFO -app-log - User registration activity initiated.
2024-09-08 12:59:37,434 - INFO -app-log - Working on user

What is “worker job”? Is it a process that hosts workflow and activity worker? It should always run and never exist unless you deploy the next version.

Yes, worker job has the workflow and the activity registered in it. Is there a way to check why the worker job is exiting abruptly?
I tried bringing the worker up again but that didnt take the workflow ahead…

Have you seen the samples? They show how to run the workers in a way that they don’t exit.

no, can you redirect please to the correct URL
I was referring to this URL - Core application - Python SDK feature guide | Temporal Documentation - and building from there…

I tried going through the samples and modified the code but the flow back to the point of calling (worker.py) is not happening from the activities.py… Everything till the activities work OK but then the worker.py gets exited which I am unable to decode as even the temporal UI also doesnt give any error… Any idea what more should be looked at .
acitivities.py

@activity.defn
async def get_loc(url: str) → dict:
logger.info(“==> Activity get_loc”)

try:
    async with aiohttp.ClientSession() as session:
        async with session.get(url, timeout=60) as response:
            json_response = await response.json()
            logger.info(f"API response {json_response}")

            return json_response
except aiohttp.ClientError as err:
    logger.error(f"Request failed = {str(err)}")
    return {"error": "request failed"}

except Exception as err:
    logger.error(f"Unexpected error {str(err)}")
    return {"error": "request failed"}

Worker

async def main():
# connect to temporal server/client
client = await Client.connect(constants.TEMPORAL_HOST)
logger.info(“==> Worker started.”)
# worker setup
async with Worker(
client,
task_queue=constants.TASK_QUEUE_1,
workflows=[MainWorkflow],
activities=[get_loc] # register activity with worker
):
result = await client.execute_workflow(
MainWorkflow.run,
constants.BASE_URL,
id=constants.WORKFLOW_ID_1,
task_queue=constants.TASK_QUEUE_1,
)

    print(f"Result ==> {result}")

The worker process must keep running as in our samples. Usually, execute_workflow is called from a different process than the one that hosts the worker.

Yes, my execute_workflow was in a different process. I added it part of the worker.py to see if that makes any difference. Saw similar approach in the samples so gave a try

Sorry, I didn’t get if you can execute the samples.

Yes am able to execute the samples.

How is your worker initialization code different from the samples?

The worker is defined similar to the samples except that its all seperate .py file. The worker works Ok. When triggered goes to the workflow.py and then to the activities.py . At this point the worker gets aborted, I am unable to see any errors coming out neither on the terminal nor on the temporal UI… Its making the debug process very challenging…

What do you mean by “aborted”? Does it die without any exception thrown from any of the methods?

thats right, the worker process exits without any error message

when i re-try I get the following error in the UI “Pending Activities” section
{
“message”: “activity StartToClose timeout”,
“source”: “Server”,
“timeoutFailureInfo”: {
“timeoutType”: “TIMEOUT_TYPE_START_TO_CLOSE”
}
}
Dont understand what the above error is stating. my config has the below defined in the workflow

start_to_close_timeout=timedelta(seconds=600), # activity timeout
retry_policy=retry_policy_def,

Also, does this look similar to Dropped tasks when cancelling polling requests under load · Issue #1058 · temporalio/temporal · GitHub

Can you provide a simple, standalone replication of your issue? May be easiest to slightly alter the working samples to demonstrate your issue.

code as below… this is all run in the local

workflow.py ==>

@workflow.defn
class MainWorkflow:

    @workflow.run
    async def run(self, url: str) -> dict:
        try:
            logger.info("==> loc api call initiated")
            loc_api_output = await workflow.execute_activity(
                activities.get_loc,
                url,
                start_to_close_timeout=timedelta(seconds=600),  # activity timeout
                retry_policy=retry_policy_def,
            )
            logger.info("==> loc api call completed")
            return loc_api_output
        except Exception as err:
            logger.error(f"Error during hte call {str(err)}")
            return {"error": str(err)}

activities.py ==>

@activity.defn
async def get_loc(url: str) -> dict:
    logger.info("==> Activity get_loc")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:            
            return await response.json()

worker.py ==>

async def main():
    # connect to temporal server/client
    client = await Client.connect(constants.TEMPORAL_HOST)
    logger.info("==> Worker started.")

    # worker setup
    worker = Worker(
        client,
        task_queue=constants.TASK_QUEUE_1,
        workflows=[MainWorkflow],
        activities=[get_loc]  # register activity with worker
    )

    # start the worker
    await worker.run()
    logger.info("==> Worker activity completed.")

run_workflow.py ==>

async def main():
    # create client connected to server
    client = await Client.connect(constants.TEMPORAL_HOST)

    # execute the workflow
    logger.info("==> Starting Workflow.")
    handle = await client.start_workflow(MainWorkflow.run,
                                         constants.BASE_URL,
                                         id=constants.WORKFLOW_ID_1,
                                         task_queue=constants.TASK_QUEUE_1,
                                         )
    # execution_timeout = timedelta(seconds=30),
    result = await handle.result()
    logger.info("==> Workflow Completed.")
    # print(f"Result ==> {result.get('zone_id')}")
    print(f"Result ==> {result}")

main.py ==> (http endpoint)

app = Flask(__name__)
@app.route("/")
def home():
    return jsonify({
        "loc_id": "zone"
    }
    )

@app.route("/zone")
def get_zone():
    return jsonify({
        "zone_id": 1234
    })

if __name__ == "__main__":
    app.run(debug=True)

**Log snippet ==> ** [last line shows the flow till the activity and then the worker exits, no error in the terminal nor in the temporal UI]

2024-09-18 11:13:53,353 - INFO -app-log - ==> Worker started.
2024-09-18 11:13:58,720 - DEBUG -asyncio - Using proactor: IocpProactor
2024-09-18 11:13:59,041 - INFO -app-log - ==> Starting Workflow.
2024-09-18 11:13:59,146 - INFO -app-log - ==> loc api call initiated
2024-09-18 11:13:59,156 - DEBUG -temporalio.worker._activity - Running activity get_loc (token b’\n$927cd0a2-b79d-49f9-bec8-f6aeda70665c\x12\rmain_workflow\x1a$2c10242b-d5f3-4eea-9393-9bcdc41f0796 \x05(\x012\x011B\x07get_locJ\x08\x08\x01\x10\x9b\x80@\x18\x01’)
2024-09-18 11:13:59,156 - DEBUG -temporalio.activity - Starting activity ({‘activity_id’: ‘1’, ‘activity_type’: ‘get_loc’, ‘attempt’: 1, ‘namespace’: ‘default’, ‘task_queue’: ‘api_queue’, ‘workflow_id’: ‘main_workflow’, ‘workflow_run_id’: ‘2c10242b-d5f3-4eea-9393-9bcdc41f0796’, ‘workflow_type’: ‘MainWorkflow’})
2024-09-18 11:13:59,156 - INFO -app-log - ==> Activity get_loc

So to confirm, this code is giving you a start-to-close timeout on the activity but you are sure the activity is actually running and completing well within the 600 seconds? I only see in your logs that the activity is starting. Are you sure it’s not hung on making the actual HTTP call? Meaning you can confirm the activity code is completing (i.e. you can add a log after receiving response)? The only way a start-to-close timeout from an error would occur is if the activity is not completing within that time.

The worker doesn’t continue running, something is making the worker crash? Or are you saying you kill the worker?

  1. Yes, the activity is getting completed and not getting into a hung state. I have just modified the code to re-confirm the same
@activity.defn
async def get_loc(url: str) -> dict:
    logger.info("==> Activity get_loc")
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            logger.info(f"==> URL fetched {url}")
            return await response.json()

logs output

2024-09-24 07:38:30,011 - INFO -app-log - ==> loc api call initiated
2024-09-24 07:38:30,059 - DEBUG -temporalio.worker._activity - Running activity get_loc (token b'\n$bdb1923b-ee97-4b82-a646-5dce90821168\x12\rmain_workflow\x1a$b7dd2705-afaf-4378-bfe3-c0b2989d9ecd \x05(\x012\x011B\x07get_locJ\x08\x08\x01\x10\x9b\x80@\x18\x01')
2024-09-24 07:38:30,061 - DEBUG -temporalio.activity - Starting activity ({'activity_id': '1', 'activity_type': 'get_loc', 'attempt': 1, 'namespace': 'default', 'task_queue': 'api_queue', 'workflow_id': 'main_workflow', 'workflow_run_id': 'b7dd2705-afaf-4378-bfe3-c0b2989d9ecd', 'workflow_type': 'MainWorkflow'})
2024-09-24 07:38:30,061 - INFO -app-log - ==> Activity get_loc
2024-09-24 07:38:30,078 - INFO -app-log - ==> URL fetched http://127.0.0.1:5000/
  1. The worker is not manually killed. It aborts (crashes) at the end of the activity… The overall code is quite light i.e. not intense operations involved… debugging is tough as I do not see any error message anywhere.