Signal a Schedule Flow

I am using new Schedule feature.
Suppose I have a SyncJob Workflow.
SyncActivitity has two methods: sync and resync.
Resync is a dominant operation. What I mean is that if a resync request comes, the current operation will be cancelled and resync will be done.
If a resync operation is running, the incoming sync requests will be skipped.

Resync is not periodic, it is one time (like trigger).
Sync operation can be periodic or triggered manually. When a one time sync request comes, if there is already an ongoing operation, the sync request will be ignored.

I have designed a workflow like

@WorkflowInterface
public interface SyncJobPeriodic {

  @WorkflowMethod
  public void execute(SyncReq syncReq);

  @SignalMethod
  void sync(SyncReq syncReq);

  @SignalMethod
  void resync(SyncReq syncReq);
}

SyncReq contains an id, assume it is workflow id. Also contains a type whether it is SYNC or RESYNC.
Only one workflow for given id should be active at a time. Its operation can be sync or resync. Periodic runs should trigger sync. Either sync or resync can be triggered manually.

I created a schedule for this purpose.
Overlap strategy is SCHEDULE_OVERLAP_POLICY_SKIP.
When a new SyncReq(id=4, type=sync) comes, scheduleHandle.trigger(SCHEDULE_OVERLAP_POLICY_SKIP) will be called. So if there is an ongoing sync, it will be continue and the request will be ignored.

Suppose a new SyncReq(id=4, type=resync) came, I cannot scheduleHandle.trigger(SCHEDULE_OVERLAP_POLICY_SKIP) because it has no other argument than the policy.

How can I be sure that sync operations will be skipped when a resync workflow is running?
(When sync operation is running, incoming resync request should cancel the sync.)

Also when the syncjob is not periodic, (it can be periodic after a new request), is using a schedule workflow a performance penalty?

I wouldn’t use schedules for this use case. I would handle all the logic as part of the workflow code. I would recommend using SignalWithStart to start the workflow. Given that SyncReq already contains the type of the request, I would change the signature to:

@WorkflowInterface
public interface SyncJobPeriodic {

  @WorkflowMethod
  public void execute();

  @SignalMethod
  void request(SyncReq syncReq);
}

Thanks for your reply.

In fact, I wrote a Workflow without using Schedule’s.


  private final SyncJobActivity activity;
  private CancellationScope activityScope;
  private Promise<Boolean> activityPromise;
  private SyncReq syncReq;
  private Duration period;
  private boolean stop = false;
  private boolean conditionChanged = false;

  public SyncJobWorkflowImpl() {
    // Create activity clients.
    activity =
        Workflow.newActivityStub(
            SyncJobActivity.class,
            ActivityOptions.newBuilder()
                .setScheduleToCloseTimeout(Duration.ofSeconds(1000))
                .setHeartbeatTimeout(Duration.ofSeconds(10))
                .setCancellationType(ActivityCancellationType.WAIT_CANCELLATION_COMPLETED)
                .build());
  }

  @Override
  public void execute(SyncReq syncReq) {
    this.syncReq = syncReq;

    do {
      this.conditionChanged = false;
      executeRequest();
      executePeriodically(this.syncReq);
    } while (!this.stop);
  }

  public void executePeriodically(SyncReq syncReq) {
    int counter = 0;

    do {
      this.conditionChanged = false;
      final Duration currentPeriod = this.period;
      boolean periodChanged = false;

      logger.info("Waiting for {}", currentPeriod);
      if (currentPeriod == null) {
        Workflow.await(() -> this.conditionChanged);
      } else {
        periodChanged = Workflow.await(currentPeriod, () -> this.conditionChanged);
      }

      if (periodChanged) {
        if (!Objects.equals(currentPeriod, this.period)) {
          logger.info("Period changed from {} to {}", currentPeriod, this.period);
          continue;
        }
      }

      if (this.stop) {
        logger.info("Stopping sync job");
        break;
      }

      logger.info("Syncing workflow for {}", this.syncReq.id());
      doSync();
      counter++;

    } while (counter < 10 && !this.stop);

    if (counter == 10 && !this.stop) {
      logger.info("Clearing history of sync job", this.syncReq.id());

      SyncJobWorkflow continueAsNew = Workflow.newContinueAsNewStub(SyncJobWorkflow.class);
      // Request that the new run will be invoked by the Temporal system:
      continueAsNew.execute(
          new SyncReq(
              this.syncReq.id(),
              this.syncReq.sourceTable(),
              this.syncReq.targetTable(),
              SyncTypeEnum.SYNC));
    }
  }

  @Override
  public void sync(SyncReq syncReq) {
    logger.info("sync signal received");
    this.syncReq = syncReq;
    this.conditionChanged = true;
  }

  @Override
  public void resync(SyncReq syncReq) {
    logger.info("resync signal received");
  }

  @Override
  public void updatePeriod(SyncReq syncReq, Duration newDuration) {
    logger.info("updatePeriod signal received");
    if (Objects.equals(this.period, newDuration)) return;
    this.period = newDuration;
    this.conditionChanged = true;
  }

  @Override
  public void cancel() {
    logger.info("cancel signal received");
    if (activityScope != null) {
      activityScope.cancel();
    }
    logger.info("cancel signal accepted");
  }

  @Override
  public void stop() {
    logger.info("stop signal received");
    this.stop = true;
    this.conditionChanged = true;
    cancel();
  }

  private void doSync() {
    logger.info("Syncing for {}", this.syncReq.id());
    run(() -> activityPromise = Async.function(activity::sync, this.syncReq));
  }

  private void doResync() {
    logger.info("Syncing for {}", this.syncReq.id());
    run(() -> activityPromise = Async.function(activity::resync, this.syncReq));
  }

  private void run(Runnable runnable) {
    activityScope = Workflow.newCancellationScope(runnable);

    // start cancellation scope
    activityScope.run();

    try {
      activityPromise.get();
    } catch (ActivityFailure e) {
      CanceledFailure c = (CanceledFailure) e.getCause();
      System.out.println("Canceled Failure: " + e.getMessage());
    }
  }

  private void executeRequest() {

    if (syncReq.type() == SyncTypeEnum.RESYNC) {
      doResync();
    } else if (syncReq.type() == SyncTypeEnum.SYNC) {
      doSync();
    }
  }

I think cancel(), stop() logic can be omitted. I can use untyped workflow cancel/terminate methods.
Period update signal might be omitted if I were using built-in Scheduling.

Would you compare this solution with a Schedule based solution?

Is waiting indefinitely (if there is no period) a problem?

      logger.info("Waiting for {}", currentPeriod);
      if (currentPeriod == null) {
        Workflow.await(() -> this.conditionChanged);
      } 

Also I have another idea using two Workflows: SyncWorkflow, ResyncWorkflow.
If a ResyncWorkflow has started, I will write a flag into the database. Delete the current schedule flow (SyncWorkflow).
If a SyncReq(type=sync) comes, I will look at the DB, if there is a flag, reject it.
At the end of resync, if there is a period for this id, I will reschedule SyncWorkflow according to the period.

Which one do you suggest?

I think your implementation is OK in principle.

It assumes that only the last signal value is going to be stored. Are you intentionally ignoring all intermediate signals?

Thanks for your reply.
My code is not complete, but only using the last sync is an implementation decision.