Change Data Capture (CDC) describes the process of recording and communicating how a collection of data changes. There are several ways to do this, ranging from the rather simple to the seemingly quite clever. However, in many cases the cleverness comes at a cost, one that you might not have realized.
The cost of cleverness is often invisible to the CDC provider, and is borne instead by the recipient. It is not necessarily a bad call to move cost from the CDC provider to the recipient, but it's worth knowing the cost. In several cases this cost is disproportionate, increasing each recipient's resource requirements up from potentially constant space and work to the (potentially much) more expensive "maintain a mirror of all the data".
For example, depending on your CDC representation Materialize spends a surprising amount of resources simply "reassembling your data", as its first step. Kafka data modeled with key-based compaction, or "upsert" behavior, needs a second copy of the data maintained with random access before you can work effectively with it. Alternatively, Materialize's PostgreSQL source is incredibly cheap, because PostgreSQL can present its CDC data in a form that doesn't impose a disproportionate downstream cost.
We will make the point with some simple examples, where the CDC recipient is charged with tasks only as complicated as
- Maintaining the number of records in total.
- Maintaining the number of records for each value of one column.
- Maintaining the number of records for each value of two columns.
These are pretty simple tasks, but they become harder the more clever our CDC provider is.
Of course, this all depends on how you record CDC data, so let's start there!
We'll focus our attention on Change Data Capture for a collection of relational data: a multiset of elements ("rows") each of which have distinct attributes ("columns"). The conclusions generalize, perhaps moreso, to more rich data models.
The most straightforward representation of a change (in my mind, at least) is a list of rows that are now in, and a list of rows that are now out. To go from the prior collection to the new collection, we remove rows that are out and incorporate rows that are in. This happens to be the representiation that differential dataflow uses internally, upon which Materialize is implemented. This representation is not especially clever, in that any change at all to a row results in a republication of the entire row, often twice (to both remove the old and insert the new rows).
For example, here is an insertion, update, and removal of yours truly from some data set, where we have both an age and ZIP code as columns (it's Brooklyn).
Each row has either insert
or remove
to indicate in or out, and a time
to indicate which changes should be bundled up when.
frank mcsherry 45 11201 insert time1
frank mcsherry 45 11201 remove time2
frank mcsherry 46 11201 insert time2
frank mcsherry 46 11201 remove time3
A more clever representation can be seen with Debezium, which transmits each change with a pairs of records: before
and after
.
This single transmission couples both states of the changed row, and from a list of these pairs you could produce the two lists about of records in and out.
What's clever about this representation is that by coupling the changes, there is the opportunity to more efficiently draw attention to the changes.
One could, for example, represent the { before, after }
pair by reporting before
and only the columns that have changed in after
, let's call it changed
.
This can both use less space and more directly call attention to the changes.
The three updates above might be represented (without the detail) as:
{ before: None, after: { ... }, time: time1 }
{ before: { ... }, after: { ... }, time: time2 }
{ before: { ... }, after: None, time: time3 }
Getting more clever, collections often have primary keys.
These are columns that mean to uniquely identify a row, where any one value occurs at most once in that column in the collection, at any time.
This is exciting, especially for clever people, because it is a concise way to reference the contents of before
without having to present them:
the prior value of the record (before
) has already been presented to the recipient, and is identified by some key
, so why not transmit { key, after }
instead?
The recipient can look up before
, and retract it.
If there is no before
that means that this is an insertion of a new record; if after
is NULL
(a special value) that means that you should just delete before
and not replace it with anything.
You can see this representation in Kafka's key-based retention.
Let's imagine a primary key and rewrite thae above example as
3263827 { frank mcsherry 45 11201 } time1
3263827 { frank mcsherry 46 11201 } time2
3263827 None time3
Pushing the very limits of cleverness, let's combine these two techniques.
If before
has been transmitted already, we could transmit as little as { key, changed }
, indicating only the primary key and the changed column values.
This could be tiny, or at least finally proportional to the size of the change, rather than depending somehow on shuttling entire (potentially large!) records around.
This could be end up being as concise as, in some imagined syntax
3263827 { frank mcsherry 45 11201 } time1
3263827 { age: 46 } time2
3263827 None time3
There are probably additional clever things beyond these, or perhaps orthogonal to them, but we'll just talk about these in this post.
Our discussion so far has been about the CDC provider: the one producing the Change Data Capture stream. Presumably though, you capture data with the intent of using it somehow. How you hope to use it is what leads us to our more nuanced evaluation of cleverness.
There are some pretty straightforward uses, and we'll knock them out because I think they do highlight the cleverness of the techniques, without grim downsides.
-
You might want to mirror your data to another OLTP database.
This database almost certainly supports point look-ups (referencing data by
key
) and can receive even the most clever of representations and fish out thebefore
records and update them. Depending on the database implementation, you may even have to go fish them up in order to update them, so there's potentially relatively little marginal cost to doing so. -
You might want to land your data in a analytic data warehouse.
This data warehouse probably doesn't support efficient point look-ups, but instead can efficiently merge your updates periodically. Batch warehouses economize on the costs of data updates by only redoing the work periodically, work that would be expensive to perform continually because of the cost of looking up
before
values without indexes.
If this is your plan for CDC, I think all the cleverness above is probably just raw unbridled cleverness, and you should be delighted.
However.
There are even cooler things you can do with CDC streams, faster and leaner and more capable things you can do, and they start to reveal that the cleverness is really a trade-off. Fundamentally, these things need to be more complicated than simply mirroring your data, and more responsive than periodic batch updates would support.
Let's say you want to keep track of how many records are in your CDC stream. It's not a very complicated task; most tasks are strictly more complicated than this: maintaining histograms, maintaining averages, maintaining even more complicated SQL.
But let's just start with keeping track of how many records are in your CDC stream.
Let's do the thought experiment of maintaining SELECT COUNT(*)
for each of the representations we've discussed.
For the insert
/ remove
representation it is a matter of maintaining #insert - #remove
which is just adding things up; super easy.
Same with the before
/ after
representation of the same information.
However, if you are the recipient of a stream of { key, after }
tuples, or { key, changed }
tuples, or any of the CDC representations that optimize out the before
field, what does your implementation look like?
A key
-optimized CDC representation asks you to determine the prior state of a record.
To maintain the total count, for any key
you need to know whether it already exists, in which case the count does not change, or whether it does not already exist, in which case you should increment the count by one. If after
is None
indicating a deletion, you can probably rely on before
existing and just decrement the count.
So, you have to maintain all of the key
values you've seen.
That's kind of annoying, and potentially quite a lot of data.
At least, it is proportional to the size of the input data, rather than proportional to the size of the thing you are maintaining: a single count.
The work you have to do for each update is also much greater: random access into a potentially large pile of data, versus just incrementing or decrementing the count.
Maintaining SELECT COUNT(*)
from a key
-optimized CDC representation can require substantially more resources that what the naive CDC formats require: nothing.
Clever folks may realize that the problem with the clever approaches is that you couldn't tell insertions from updates. That's a pretty easy fix, in that you could just add that information to the CDC messages. This fixes up the problem with maintaining the count, and perhaps it fixes up all problems?
This time we aren't just maintaining a total count, but a count of the number of records with some value for one column. Any update tells us the new value, and so it's not so hard to figure out which count to increment, but we also need to find out which counts to decrement. At least if we want to maintain the correct answer for data that might contain deletions, and most CDC data does (it's why you are using CDC instead of an append-only event stream).
We can re-do the thought experiment above, where the insert
/ remove
and before
/ after
representations require almost no additional resources.
At the same time, any CDC representation that optimizes out the before
value of all columns obliges the recipient, at least one who needs to maintain SELECT column, COUNT(*)
, to mirror the corresponding data, to determine how to correctly update the results.
How much data needs to be mirrored?
All of the { key, age }
entries for all of the records.
If you get a { key, changed }
, even with the ability to distinguish between inserts and updates, you need the specific prior age
associated with key
, which means you need to maintain the full map from key
to age
.
Even though the result likely has some small number of counts, one for each age, the recipient must maintain all distinct keys of a potentially large collection, and their ages.
Again, clever folks might realize that the problem is leaving out the before
values of columns, not the part of the optimization that narrows the columns down to only those that have changed. What if we ship { key, before_cols, after_cols }
, thereby presenting the only the changed columns but with their prior values?
We now need to maintain a count with two columns used as the key.
Let's imagine for the moment that movement between ZIP codes is uncorrelated with birthdays: the changes that flow in will likely change either age
or zip
.
Unfortunately, to correctly update counts when say an age
changes, we'll need to track down the zip
of the corresponding key
, both to determine which count to increment and which count to decrement.
We can again re-do the reasoning, and again the naive approaches have almost no cost for the recipient.
The key
-optimized representations require the full map from key
to both age
and zip
, again proportional to the input data rather than the maintained aggregates.
At this point it may seem that in order to avoid disproportionate costs for the reciever, you need to transmit full before
and after
information for the columns they require.
I'm not myself aware of a way to avoid it.
That information can be compressed, for example into { before, changed }
, but it appears fundamental that you either present the relevant information as part of the CDC stream, or ask the recipient to maintain it for you.
In the cases above, simple and naive CDC representations result in more efficient implementations for the recipients.
That doesn't mean you should alway use them: your OLTP CDC provider is potentially critical infrastructure under great load, and you should offload as much work as possible.
However, by using the more sophisticated CDC representations, you are introducing a downstream cost.
If you didn't realize that, it's worth a think; see if you can tolerate setting wal_level
to logical
in PostgreSQL, or use direct replication out of PostgreSQL rather than through Kafka.
What about Debezium, then?
Seems pretty good in the round-up above, and uses Kafka to decouple your infrastructure.
At the time of writing, to the best of my understanding, Debezium has some deduplication gotchas that mean that you cannot rely on the before
and after
being correct, or at least correct in the order you receive them, and you kinda need to maintain a second copy of the data if you want to be certain.
Materialize wants to be certain, and so it maintains the second copy, at some cost.
But the representation they've chosen seems fine.
All in all, these choices represent trade-offs. Especially with Change Data Capture, which means to be fast and cheap, it can be valuable to understand that you may be imposing a cost, either in latency or resources, for your downstream uses of your data. Being aware of the costs puts you in a position to reduce them, and unlock valuable potential use cases.