Apache camel and tempral integration

Hi All,

I am planning to integrate apache camel springboot with temporal and able to achive and able to use camel components like kafka,amqp to read data and handover payload to workflow .

Rather than writing the components code from scratch want to use camel and temporal using springboot .

	from("kafka:{{kafka.receive_cpo}}?" + "brokers=" + brokers + "&groupId=temporal-poc-consumer-group" +
	        "&autoOffsetReset=earliest" + "&securityProtocol=SASL_SSL" + "&saslMechanism=PLAIN" +
	        "&saslJaasConfig=" + saslJaasConfig + "&autoCommitEnable=true" + "&autoCommitIntervalMs=" +
	        // + "&specificAvroReader=true"
	        "&keyDeserializer=" + keyDeserializer + "&valueDeserializer=" + valueDeserializer

	        "&additionalProperties.schema.registry.url=" + schemaRegistryURL +
	        "&additionalProperties.basic.auth.credentials.source=USER_INFO" +
	        "&additionalProperties.basic.auth.user.info=" + userPassword)
	    .log("message consumed successfuly Headers: ${headers} Body: ${body} ")
	    .setProperty("CPOOrderNumber").jsonpath(".PurchaseOrders.CPOOrderNumber", String.class)

	    .process(new Processor() {
	        public void process(Exchange exchange) throws Exception {

	            String CPOOrderNumber = (String) exchange.getProperty("CPOOrderNumber");

	            System.out.println("CPOOrderNumber" + exchange.getProperty("CPOOrderNumber"));

	            System.out.println("getExchangeId" + exchange.getExchangeId());

	            WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget("localhost:56123").build());
	            WorkflowClient client = WorkflowClient.newInstance(service, WorkflowClientOptions.newBuilder().setNamespace("default").build());

	            WorkflowOptions cpoWorkflowOptions =
	                WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId("parent-workflow-" + CPOOrderNumber).build();

	            // Create the workflow client stub. It is used to start the workflow execution.
	            OrderorchestartionWorkflowInterface workflow = client.newWorkflowStub(OrderorchestartionWorkflowInterface.class, cpoWorkflowOptions);

	            System.out.println("invoking receiveCPO" + exchange.getExchangeId());

	            try {

	                workflow.receiveCPO(CPOOrderNumber, "parent-workflow-" + CPOOrderNumber);
	                //  throw new IllegalStateException("unreachable");
	            } catch (WorkflowException e) {


	                System.out.println("\nStack Trace:\n" + Throwables.getStackTraceAsString(e));

                Throwable cause = Throwables.getRootCause(e);
	                throw new Exception("commit kafka message");

	    }).log("finished sending message to temporal workflow");


I don’t know anything about Apache camel. But I believe that the code snippet you posted creates the WorkflowServiceStubs and WorkflowClient per request. WorkflowServiceStubs encapsulates a very heavyweight gRPC client. So it has to be created once per process lifetime and reused for all the requests.

Integration with Camel is really cool. One comment that I had was that imo the integration as shown is kinda backwards, meaning that Temporal should be the fault-tolerant orchestrator of Camel routes executions and not the other way around, so you could have an Activity that accepts a route and executes it. You might have a use case for this type of integration however.

Hi ,

Please find the below use case . We are planning to do pure event based model with temporal as workflow and want to use apache camel to read from topic and send it to workflow using stepid as correlation and call camel route from activity to send message to topic for micro service to consume .

If you are building your system on top of a legacy topic-based architecture then your architecture makes sense. If you are building a new system then you could eliminate all these queues and reduce the complexity 10x.

1 Like