We would like to use Temporal to orchestrate a large number of ETL transformations of blockchain data. We are a small team, and we want to be able to process 100+ chains, and 10-50 ETLs per chain. Our previous setup was based on a dyanmodb master table → event stream → lambda → dynamodb target tables, however, managing such a large number of services has become a pain. Other issues include losing events after 24 hours, and not being able to replay all events out of the box for new ETLs.
It looks like temporal can solve all of that, but we are not confident on the design we should follow. Here is what we have right now:
- One eternal temporal workflow per chain implemented in Go
- The workflow calls and awaits on an activity that syncs block-batches to master-table. The activity reports back the last block successfully synced or an error. On success, we call the activity again with the next batch. After a few days, the workflow will catch up with the chain height which grows at 1 block per 12 seconds.
- We need to process all the blocks added to the master-table through ETL activities. An ETL activity will read data from the master-table, enrich it, transform it, and save it to the target tables (idempotent). We don’t see the point returning the data back to the workflow to decouple transformation from writing. 99% of the time it will be the transform that fails, and if a write fails, we are ok to redo the transform.
- Every ETL job has its own block height (up to what block# this ETL job has processed blocks without error). ETLs are independent and new ETLs may be added or removed in the future through signals.
- Every few seconds, we loop through every ETL spec
- if there is a pending ETL activity for that spec, we ignore/continue. This means that a workflow.goroutine is working on this ETL right now (heartbeating, not timed-out); else
- else, we check if the current synced block height is > the ETL block height; if it is, we start an activity in a goroutine for the ETL to catch up
The design also includes the following signals/queries
- Every N blocks, save the ETLS block heights to checkpoints table, so we can inform users if a certain chart is up-to-date or not
- Every M blocks, flush signals and continue-as-new to clean history
- On signal, update an ETL’s higher block to force reprocessing blocks in the next run (all writes are idempotent)
- On signal, add new ETL spec, or remove ETL spec
- On workflow init, load ETL block heights from checkpoints table; to allow for workflow termination and restart