Hi All,
I am planning to integrate apache camel springboot with temporal and able to achive and able to use camel components like kafka,amqp to read data and handover payload to workflow .
Rather than writing the components code from scratch want to use camel and temporal using springboot .
from("kafka:{{kafka.receive_cpo}}?" + "brokers=" + brokers + "&groupId=temporal-poc-consumer-group" +
"&autoOffsetReset=earliest" + "&securityProtocol=SASL_SSL" + "&saslMechanism=PLAIN" +
"&saslJaasConfig=" + saslJaasConfig + "&autoCommitEnable=true" + "&autoCommitIntervalMs=" +
autoCommitIntervalMs
// + "&specificAvroReader=true"
+
"&keyDeserializer=" + keyDeserializer + "&valueDeserializer=" + valueDeserializer
+
"&additionalProperties.schema.registry.url=" + schemaRegistryURL +
"&additionalProperties.basic.auth.credentials.source=USER_INFO" +
"&additionalProperties.basic.auth.user.info=" + userPassword)
.log("message consumed successfuly Headers: ${headers} Body: ${body} ")
.setProperty("CPOOrderNumber").jsonpath(".PurchaseOrders.CPOOrderNumber", String.class)
.process(new Processor() {
public void process(Exchange exchange) throws Exception {
String CPOOrderNumber = (String) exchange.getProperty("CPOOrderNumber");
System.out.println("CPOOrderNumber" + exchange.getProperty("CPOOrderNumber"));
System.out.println("getExchangeId" + exchange.getExchangeId());
WorkflowServiceStubs service = WorkflowServiceStubs.newInstance(WorkflowServiceStubsOptions.newBuilder().setTarget("localhost:56123").build());
WorkflowClient client = WorkflowClient.newInstance(service, WorkflowClientOptions.newBuilder().setNamespace("default").build());
WorkflowOptions cpoWorkflowOptions =
WorkflowOptions.newBuilder().setTaskQueue(TASK_QUEUE).setWorkflowId("parent-workflow-" + CPOOrderNumber).build();
// Create the workflow client stub. It is used to start the workflow execution.
OrderorchestartionWorkflowInterface workflow = client.newWorkflowStub(OrderorchestartionWorkflowInterface.class, cpoWorkflowOptions);
System.out.println("invoking receiveCPO" + exchange.getExchangeId());
try {
workflow.receiveCPO(CPOOrderNumber, "parent-workflow-" + CPOOrderNumber);
// throw new IllegalStateException("unreachable");
} catch (WorkflowException e) {
/*
*/
System.out.println("\nStack Trace:\n" + Throwables.getStackTraceAsString(e));
Throwable cause = Throwables.getRootCause(e);
throw new Exception("commit kafka message");
}
}
}).log("finished sending message to temporal workflow");
Thanks
phani