Thanks for your reply.
In fact, I wrote a Workflow without using Schedule’s.
private final SyncJobActivity activity;
private CancellationScope activityScope;
private Promise<Boolean> activityPromise;
private SyncReq syncReq;
private Duration period;
private boolean stop = false;
private boolean conditionChanged = false;
public SyncJobWorkflowImpl() {
// Create activity clients.
activity =
Workflow.newActivityStub(
SyncJobActivity.class,
ActivityOptions.newBuilder()
.setScheduleToCloseTimeout(Duration.ofSeconds(1000))
.setHeartbeatTimeout(Duration.ofSeconds(10))
.setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
.build());
}
@Override
public void execute(SyncReq syncReq) {
this.syncReq = syncReq;
do {
this.conditionChanged = false;
executeRequest();
executePeriodically(this.syncReq);
} while (!this.stop);
}
public void executePeriodically(SyncReq syncReq) {
int counter = 0;
do {
this.conditionChanged = false;
final Duration currentPeriod = this.period;
boolean periodChanged = false;
logger.info("Waiting for {}", currentPeriod);
if (currentPeriod == null) {
Workflow.await(() -> this.conditionChanged);
} else {
periodChanged = Workflow.await(currentPeriod, () -> this.conditionChanged);
}
if (periodChanged) {
if (!Objects.equals(currentPeriod, this.period)) {
logger.info("Period changed from {} to {}", currentPeriod, this.period);
continue;
}
}
if (this.stop) {
logger.info("Stopping sync job");
break;
}
logger.info("Syncing workflow for {}", this.syncReq.id());
doSync();
counter++;
} while (counter < 10 && !this.stop);
if (counter == 10 && !this.stop) {
logger.info("Clearing history of sync job", this.syncReq.id());
SyncJobWorkflow continueAsNew = Workflow.newContinueAsNewStub(SyncJobWorkflow.class);
// Request that the new run will be invoked by the Temporal system:
continueAsNew.execute(
new SyncReq(
this.syncReq.id(),
this.syncReq.sourceTable(),
this.syncReq.targetTable(),
SyncTypeEnum.SYNC));
}
}
@Override
public void sync(SyncReq syncReq) {
logger.info("sync signal received");
this.syncReq = syncReq;
this.conditionChanged = true;
}
@Override
public void resync(SyncReq syncReq) {
logger.info("resync signal received");
}
@Override
public void updatePeriod(SyncReq syncReq, Duration newDuration) {
logger.info("updatePeriod signal received");
if (Objects.equals(this.period, newDuration)) return;
this.period = newDuration;
this.conditionChanged = true;
}
@Override
public void cancel() {
logger.info("cancel signal received");
if (activityScope != null) {
activityScope.cancel();
}
logger.info("cancel signal accepted");
}
@Override
public void stop() {
logger.info("stop signal received");
this.stop = true;
this.conditionChanged = true;
cancel();
}
private void doSync() {
logger.info("Syncing for {}", this.syncReq.id());
run(() -> activityPromise = Async.function(activity::sync, this.syncReq));
}
private void doResync() {
logger.info("Syncing for {}", this.syncReq.id());
run(() -> activityPromise = Async.function(activity::resync, this.syncReq));
}
private void run(Runnable runnable) {
activityScope = Workflow.newCancellationScope(runnable);
// start cancellation scope
activityScope.run();
try {
activityPromise.get();
} catch (ActivityFailure e) {
CanceledFailure c = (CanceledFailure) e.getCause();
System.out.println("Canceled Failure: " + e.getMessage());
}
}
private void executeRequest() {
if (syncReq.type() == SyncTypeEnum.RESYNC) {
doResync();
} else if (syncReq.type() == SyncTypeEnum.SYNC) {
doSync();
}
}
I think cancel(), stop() logic can be omitted. I can use untyped workflow cancel/terminate methods.
Period update signal might be omitted if I were using built-in Scheduling.
Would you compare this solution with a Schedule based solution?
Is waiting indefinitely (if there is no period) a problem?
logger.info("Waiting for {}", currentPeriod);
if (currentPeriod == null) {
Workflow.await(() -> this.conditionChanged);
}
Also I have another idea using two Workflows: SyncWorkflow, ResyncWorkflow.
If a ResyncWorkflow has started, I will write a flag into the database. Delete the current schedule flow (SyncWorkflow).
If a SyncReq(type=sync) comes, I will look at the DB, if there is a flag, reject it.
At the end of resync, if there is a period for this id, I will reschedule SyncWorkflow according to the period.
Which one do you suggest?