Whats optimal settings for quick xdc replication? what metrics can one track?

have an xdc setup and have about 1500 worflows with reasonable amount of history (say 200 to 300 activity)… i am seeing that the first time replication is actually takeing some time ( 3+ hours already), i tried increasing the number of front ends in my active cluster and no of workers in the standby cluster, but that did not make much difference, is there any metrics i can track to figure out if active and standby clusters are in sync?

Between which component is responsible for replication? worker or hisotry?

seems this is answered here XDC Replication in Practice - #25 by yux

for instance my active cluster i have 752336 entries in replication_tasks and 140 entry in replicaiton_tasks_dlq table.

in my secondary (standby) cluster i have 0 task in replication_tasks and 6158 entries in replicaiton_tasks_dlq

I dont see these numbers chaning for a very long time now (5+ hours)…

:::::::::::::::::::::::::::::::::::::::::::Active Cluster :::::::::::::::::::::::::::::::::::


use temporal;select (select count(1) from replication_tasks ) replication_tasks , (select count(1) from replication_tasks_dlq ) replication_dlq; 
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed

+-------------------+-----------------+
| replication_tasks | replication_dlq |
+-------------------+-----------------+
|            752336 |             140 |
+-------------------+-----------------+
1 row in set (4.445 sec)


 tctl -ns mynamespace wf la --op | wc;

     3544     35447    648552

:::::::::::::::::::::::::::::::Standby Cluster :::::::::::::::::::::::::::::::::::::::::


use temporal;select (select count(1) from replication_tasks ) replication_tasks , (select count(1) from replication_tasks_dlq ) replication_dlq; 
Database changed
+-------------------+-----------------+
| replication_tasks | replication_dlq |
+-------------------+-----------------+
|                 0 |            6158 |
+-------------------+-----------------+
1 row in set (0.003 sec)

 tctl -ns mynamespace  wf la --op | wc;

     3382     33827    618906


how can i ensure that all the workflows have actaully been copied to the secondary/standby region?

The history does the replication job.
Besides adding hosts, there are some replication configs that you can speed up the replication rate.

  1. history.ReplicationTaskFetcherAggregationInterval (in remote/standby cluster): this configs the frequency of polling. Try to use a small value: 5-10s.
  2. history.replicatorTaskBatchSize (in local/active cluster): this configs the batch size of replication response. Try to increase as need.
  3. history.ReplicationTaskProcessorHostQPS and history.ReplicationTaskProcessorShardQPS (in remote/standby cluster): those configs the rate of applying replication tasks. Try to increase as need.(I assume the default value should be able to support normal load.)

One issue I see with your information, when the replication tasks go into DLQ, those tasks encounter errors and cannot be replicated. Do you see any error related to replication from the remote/standby cluster to help us debug the issue?

If you don’t have new workflow start in the active cluster, you can use your query to check the replication task in active cluster. Eventually, it will go zero.
Or you can monitor the standby task queue latency/count, the task count will drop to zero after all workflows replicate over.

If you have constant workflow start traffic in the active cluster, you can monitor the standby task queue latency to be stable.

shoudl this be done in dynamic-config.yaml?

In my active worker i see errors like

{"level":"error","ts":"2022-01-24T13:45:33.817Z","msg":"Failed to get replication tasks","service":"history","error":"context deadline exceeded","logging-call-at":"replicationTaskFetcher.go:431","stacktrace":"go.temporal.io/server/common/log.(*zapLogger).Error\n\t/temporal/common/log/zap_logger.go:142\ngo.temporal.io/server/service/history.(*replicationTaskFetcherWorker).getMessages\n\t/temporal/service/history/replicationTaskFetcher.go:431\ngo.temporal.io/server/service/history.(*replicationTaskFetcherWorker).fetchTasks\n\t/temporal/service/history/replicationTaskFetcher.go:374"}

in my standby clusers i see errors like

"level":"error","ts":"2022-01-24T13:47:51.486Z","msg":"Error updating ack level for shard","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"active","error":"Failed to update shard. Previous range ID: 1; new range ID: 2","operation-result":"OperationFailed","logging-call-at":"queueAckMgr.go:225","stacktrace":"go.temporal.io/server/common/log.(*zapLogger).Error\n\t/temporal/common/log/zap_logger.go:142\ngo.temporal.io/server/service/history.(*queueAckMgrImpl).updateQueueAckLevel\n\t/temporal/service/history/queueAckMgr.go:225\ngo.temporal.io/server/service/history.(*queueProcessorBase).processorPump\n\t/temporal/service/history/queueProcessor.go:223"}
{"level":"info","ts":"2022-01-24T13:47:51.486Z","msg":"none","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","lifecycle":"Stopping","component":"shard-context","shard-id":236,"logging-call-at":"controller_impl.go:229"}
{"level":"info","ts":"2022-01-24T13:47:51.486Z","msg":"none","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","lifecycle":"Stopping","component":"shard-engine","logging-call-at":"context_impl.go:1270"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"none","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"history-engine","lifecycle":"Stopping","logging-call-at":"historyEngine.go:270"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"none","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"standby","lifecycle":"Stopping","component":"transfer-queue-processor","logging-call-at":"queueProcessor.go:156"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Queue processor pump shut down.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"standby","logging-call-at":"queueProcessor.go:231"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Task processor shutdown.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"standby","logging-call-at":"taskProcessor.go:155"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"none","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"standby","lifecycle":"Stopped","component":"transfer-queue-processor","logging-call-at":"queueProcessor.go:169"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"none","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"active","lifecycle":"Stopping","component":"transfer-queue-processor","logging-call-at":"queueProcessor.go:156"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Queue processor pump shut down.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"active","logging-call-at":"queueProcessor.go:231"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Task processor shutdown.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"active","logging-call-at":"taskProcessor.go:155"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"none","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"transfer-queue-processor","cluster-name":"active","lifecycle":"Stopped","component":"transfer-queue-processor","logging-call-at":"queueProcessor.go:169"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Timer queue processor pump shutting down.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"standby","component":"timer-queue-processor","logging-call-at":"timerQueueProcessorBase.go:191"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Timer processor exiting.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"standby","component":"timer-queue-processor","logging-call-at":"timerQueueProcessorBase.go:192"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Task processor shutdown.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"standby","component":"timer-queue-processor","logging-call-at":"taskProcessor.go:155"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Timer queue processor stopped.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"standby","component":"timer-queue-processor","logging-call-at":"timerQueueProcessorBase.go:172"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Timer queue processor pump shutting down.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"active","component":"timer-queue-processor","logging-call-at":"timerQueueProcessorBase.go:191"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Timer processor exiting.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"active","component":"timer-queue-processor","logging-call-at":"timerQueueProcessorBase.go:192"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Task processor shutdown.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"active","component":"timer-queue-processor","logging-call-at":"taskProcessor.go:155"}
{"level":"info","ts":"2022-01-24T13:47:51.487Z","msg":"Timer queue processor stopped.","service":"history","shard-id":236,"address":"10.aaa.bbb.ccc:7234","component":"timer-queue-processor","cluster-name":"active","component":"timer-queue-processor","logging-call-at":"timerQueueProcessorBase.go:172"}

if i restart the workers in standby i see

{"level":"info","ts":"2022-01-24T15:03:41.564Z","msg":"cleaning up replication task queue","service":"history","shard-id":264,"address":"10.yyy.ddd.eee:7234","read-level":4194311,"logging-call-at":"replicationTaskProcessor.go:488"}

and after a while i see

{"level":"error","ts":"2022-01-24T15:06:45.607Z","msg":"Error updating ack level for shard","service":"history","shard-id":110,"address":"10.xxx.yyy.zzz:7234","component":"transfer-queue-processor","cluster-name":"standby","error":"Failed to update shard. Previous range ID: 6; new range ID: 7","operation-result":"OperationFailed","logging-call-at":"queueAckMgr.go:225","stacktrace":"go.temporal.io/server/common/log.(*zapLogger).Error\n\t/temporal/common/log/zap_logger.go:142\ngo.temporal.io/server/service/history.(*queueAckMgrImpl).updateQueueAckLevel\n\t/temporal/service/history/queueAckMgr.go:225\ngo.temporal.io/server/service/history.(*queueProcessorBase).processorPump\n\t/temporal/service/history/queueProcessor.go:223"}

Once this error occurs the replication ftask fetcher , task process , Timer queue process pumps sotps/shuts down and i think history pods stops working… However they continue to report healty and ready for a while, but eventually the pod crashes and is replaced by new pod. this goes on!

i ran multiple tests: (i had about 3500 worlflows in my active/primary)

First time after 1600 workflow got replicatated I saw ""Failed to update shard. " in standy and replication stopped.

Second time it just stopped after 1 or 2 worklows got replicated.

I am able to consistently reproduce this. in all my test all my workflows never got replicated completely.

Can you please guide ( i am still using the default settings, did not add more workers are changed the configs

This error indicates the active cluster did not connect to the standby. Try to remove the standby cluster and add it again to see if you still see this error in active cluster.

tctl admin cluster remove-remote-cluster --cluster standby

tctl admin cluster upsert-remote-cluster  --frontend_address <standby address>

The updating ack level error seems to be a result of pod restart. After the pod restart, the range id will be updated. Is there anything caused pod restart? resource exhausted?

no. resource usage is not even 30%, cpu or memory… network wise i dont see packet losses.

I tried doing a rrc in both primary and standy and it did not help,i infact tried deleting the cluster_metadata_info table as well. that too did not help.

on the standby , i deleted the entire temporal and visiblity schema and setup as a fresh cluster that also did not help much :frowning:
my active cluster is in use by our dev floks so cant delete it or recreate workflows…

i think both the clusters are able t connect to each other , when i bring down my standy by making replicas=0, i see errors in active’s history pod like "Tansport: Error while dialing dial tcp 10.xxx.yyy.zzz:7233: connect: connection refused " as against “context deadline exceeded”

We released a patch to some issues around multi-clusters last Friday. Could you also try this patch to see if it resolve the issue?

thanks @yux, i upgraded to 1.14.4 ,unfortunately the problem still persists.

Thanks for all the help @yux

Finally, we figured out that ther was no issue with the configurations.

It appears that most of my workflows keeps sleeping/waiting for an event.
and By design temporal do not replicate sleeping workflows… So one way of force replicating is to send a fake signal to all open workflows . This forces the replication to happen immediately.

I use a shell script in tctl to figure out all open workflow and send them a signal.

Please beware this is just a one time activity… any newly created workflow and any workflow which gets a signal in due course go get replicated on their own

Here is the shell script i use

#remove the command file to send signals (from previous run)
rm -rf force_replicate.sh

#create a  fresh command file with all open workflows -> generates  approproate  tctl command to send fake singals to workflows
tctl -ns mynamespace wf la --op  | awk '{$1="echo -n \"" ; $2=" --> \"  ;tctl -ns \"mynamespace\" wf s  --name fakesignal --wid ";print $1 $3 $2 $3}' |awk 'NR>1' > force_replicate.sh

# prove execute permission
chmod +x force_replicate.sh

# send  fake signals now..
./force_replicate.sh