We created a mechanism that we called "The Federator" for making data processed on one Redshift cluster be available on other Redshift clusters. This post follows the introduction in the previous part 1 post, and describes how we solved the challenge of dealing with large data volumes.
Dimension Table Treatment
The previous post described how we would compare a summary of a table in Redshift to the data on S3, and selectively publish or subscribe that data if there was a difference.
With dimension tables with at most a few million rows, we could unload (publish) or copy (subscribe) a whole table in very little time. The simplicity of this approach was very beneficial - we had experienced problems with commercial software which did row based CDC from a SQL Server source - it occasionally created duplicates in Redshift when under very high load, and these duplicates weren't subsequently corrected Replacing the whole table at a time prevented this kind of problem.
This meant that a summary of a table on Redshift would be compared to a single manifest on S3. Note that because we were using the parallel unload functionality, there would be a large number of data files created by this unload, but there would be one manifest created for these. Note also that previous publications from a table would be available on S3 for a configurable period - the script identified the most recent one and compared the database summary to that.
This was great for dimension tables, but we had some fact tables with over 7Bn records for which this approach would be too slow.
Fact Table Treatment - The Initial Solution
The large fact tables were designed to be insert only, i.e. no updates would happen to them during normal processing. This meant that we should only need to publish and subscribe the newly inserted rows.
Our initial implementation ignored the update timestamp because it wasn't meant to ever be used. We took the maximum insert timestamp in the database, and compared it to the manifests on S3. As shown in the diagram below, the S3 manifest filenames specified the insert timestamp range of the data contained. We could then publish the range of insert timestamps that hadn't already been published, or subscribe in a range that hadn't already been subscribed.
This had the useful feature that it would reliably publish and subscribe the relevant batches of data, regardless of how many batches had occurred since the last publish or subscribe activity.
There was only one problem. The fact tables weren't actually being treated as read only. Instead they were being updated for bug fixes and for improvements. Initially we worked around this by having some automated checksums which would raise an alert if a mismatch was found between the tables. This was done using Splunk database access - we ran an MD5 query on each cluster in turn and compared the results in line in the Splunk search. Thus it was little effort to find out when a change had been made, but the next step was then to reload then entire multi billion row fact table. This was very inconvenient to schedule, and quite often very inefficient given that the fixes might only have touched a few rows. A common pattern was to update recent batches to improve key assignment, and with our initial solution, this was causing us to need to reload the entire table.
Fact Table Treatment - The Improved Solution
Rather than automating process workarounds to the above challenges, we modified the approach. The change was to logically partition the large fact tables by insert timestamp, and then selectively publish and update each one of these individually.
This meant that each batch, identified by an insert timestamp, would exist as a separate manifest on S3. We only needed to run a single select statement on the database to identify all of the corresponding details on Redshift.
As shown in the flowchart below, each batch i.e. each insert timestamp partition was then compared between the database and S3 to see if the maximum update timestamp and record count matched.
Under normal insert-only operation on the large fact tables, this mechanism would identify that a batch had been inserted into the database that did not exist on S3, and it would be unloaded. All other batches would be ignored because they were already on S3. The corresponding subscribe logic would then load this data into the reporting cluster.
If there have been any changes to any of the historical partitions, either deletions or updates, this same process will identify which partitions have changed and will selectively publish or subscribe only those partitions.
This gracefully handles updates to historical data without user intervention, and is robust and self-correcting. It is especially efficient where a small number of batches may or may not have changed, for example in the scenario where recent batches are updated to improve key assignment.
These mechanisms rely on the update timestamp being populated when changes are made, so it is recommended that there is a layer of auditing for this. We periodically ran MD5 checksums via multi-database Splunk access on the tables to see if anything was being changed.