Skip to content

Partitioning Oplog Information

zkasheff edited this page Dec 12, 2013 · 4 revisions

This document describes using partitioned collections and other things to redo how objects are deleted out of the oplog and oplog.refs collections.

The Problem

Oplog trimming, as of TokuMX 1.3, can be problematic. A background thread deletes oplog entries one by one, and another thread garbage collects the beginning of the oplog to reclaim space. This has the following issues:

  • A non-trivial amount of CPU can be used for this process
  • If the optimizer is behind the deleter thread, and many garbage entries are piled up at the beginning, finding the first row of the oplog can be time consuming
  • Long queries will induce a lot of garbage to pile up at the beginning of the oplog, exasperating the first two issues
  • We still depend on the block allocator to reclaim space. This may not happen immedietely and may be unpredictable, based on the layout of the nodes in the oplog
  • The optimizer thread does nothing to reclaim garbage in the oplog.refs collection. If garbage accumulates in the oplog.refs collection, the user needs to manually do a reIndex to (hopefully) reclaim the space.

The High Level Solution

At a high level, we want the oplog to be broken up into multiple dictionaries so that once a day (or possibly once an hour), we can just drop a dictionary that represents a day's worth of data (or possibly an hour's worth). This reclaims the space immedietely and cheaply.

We have two high level ideas of how to do this:

  • create a new collection type, called partitioned collections, that are implemented at the NamespaceDetails layer and below. Have the oplog be a partitioned collection.
  • Have replication logic manually do partitioning and manage multiple collections.

I want partitioned collections. We will partition the oplog and oplog.refs, and once a day or hour, drop a partition.

The oplog is a bit of a user facing collection. Users, and tests, are used to querying it to get results. Changing it to something that is manually managed by replication and tests seems to be more effort. Also, such a change will make it difficult, if not impossible, to have TokuMX 1.3 servers replicate from these servers, making upgrade more challenging. Implementing a partitioned collection also seems easier to unit test.

So, the oplog is a partitioned collection, with each partition storing roughly a day's worth of oplog data. Every day, a new partition is created for the new data, and the oldest partition is dropped. Cursors over the partitioned collection will seamlessly transition from one collection to another under the covers.

Oplog.refs will also be a partitioned collection, but some maintenance needs to go into keeping track of whether a partition can be dropped. We cannot blindly drop a partition, because the oplog may have some a row still referencing it. More details of this follow below.

Solution Details

Partitioned Collections

What they are

Let's first address what the oplog will become. We add a new collection type called "partitioned collections". The concept is what it sounds:

  • underneath the covers, the collection will be broken into smaller logical collections, that are partitioned.

Here are the rules for the partitioned collection. Most of this is for simplicity of the project on the first go around. If we want to expand partitioned collections into a more fully featured, um, feature, we can expand on these and find solutions.

  • There are no secondary indexes. Only the _id index. Secondary indexes are a nice feature for down the road.
  • Collections must be partitioned on the _id index, and the _id index must be the primary key. This is for simplicity. A different primary key means maitaining a secondary index on the _id index.
  • No index creation, no index drops, no hot indexes. This comes from the above bullet points.
  • No fast updates. Honestly, this is laziness. There is likely no real reason why this wouldn't "just work".
  • No other fancy features that I can't think of at the moment.

By definition, partitioned collections have N partitions to define where the data resides. Each partition has an inclusive upper bound value for its data. The last partition has an upper bound of infinity. So, suppose partition 'i' has a defined upper bound of 'M_i'. All keys 'k' in partition 'i' have the property such that 'M_i-1' < k <= 'M_i'. As an example, if we have 5 partitions, with upper bounds of 10, 20, 30, 40, and infinity, and we had keys of 1 to 50. Documents 1 through 10 would be in the first partition, 11-20 in the second, 21-30 in the third, etc...

The number of partitions must be something "reasonable", as reads and writes need to do a binary search on the partitions before finding the appropriate partition to operate on. As a result, having 100 or 1000 partitions is probably fine. Having 1 million may not be.

Partitioned Collections will have two additional basic commands for manipulating the collection:

  • dropPartition. Self-explanatory
  • addPartition. This adds a partition at the end. The user can define the bound capping the previous partition, so long as it is greater than or equal to the max document in the previous partition. These commands cannot run in multi-statement transactions. We can probably make it work, but I don't want to focus on that effort, nor do I want to commit to it.

There will also be miscellaneous commands for querying the state of the partitioned collection, such as how many partitions exist, what they are called, what their bounds are, etc... These can be figured out as we develop the feature.

How they work

Partitioned collections will have a metadata dictionary describing the state of the partitions. Additionally, they will also have an in-memory data structure holding this same information. The reason for this is speed of finding the appropriate partition for operations. This is similar to what we already have in place for normal collections.

Underneath the covers, partitioned collections will subclass the following:

  • NamespaceDetails
  • IndexDetails
  • Cursor (or maybe IndexCursor, not sure which)

Basically, partitioned collections need their own implementation of these objects. The NamespaceDetails subclass will do the following:

  • disable the operations we mentioned above, like adding/dropping an index.
  • write new methods for insert, delete, and update. The old methods use the XXX_multiple API. These will need to be overwritten.

The IndexDetails subclass will be associated with the sole _id index and maintain the following:

  • an actual IndexDetails for each partition. Whether each of these are opened when the collection is opened, or opened lazily is not yet determined.
  • a reference to the metadata dictionary on disk for maintaining partition info
  • implementations of update/delete/insert that the NamespaceDetails subclass will use
  • modify other methods if necessary (like whatever deletes or drops the collection)

The Cursor subclass will have knowledge of the metadata of the partition, and implement a cursor on top of other IndexCursors. Somehow, the special optimizations like count cursors will have to be disabled for partitioned collections. I have not dived too deep into the code yet to learn how to do this. I have not looked into the subclassing of cursors deep enough to know how other things like IndexScanCursor or BasicCursor would interact here. But the idea is simple. Any cursor that is to run over a partitioned collection will, underneath the covers, create a PartitionedCursor (or some other name). The implementation of PartitionedCursor will be smart enough such that when one partition ends, to continue scanning the next appropriate partition. To make it simpler, we can make the PartitionedCursors never use FieldRangeVectors underneath. The underlying cursors they use can all have start/end bounds, and the PartitionedCursor will prune out rows that don't match the passed in bounds. This will make queries more inefficient, but for the first go, should be correct and good enough.

The commands addPartition and dropPartition will be treated as schema changes, and will hold write locks throughout their duration, similar to dropIndex. They will not work in multi-statement transactions. Just to reiterate, this decision is out of current laziness. We can likely one day easily support these operations in a multi-statement transaction. (Personal Note: be careful of committed deletes at the end of the latest partition when creating a new partition, not properly taking them into account may cause read transactions to break).

Another question to address is how widely we will support partitioned collections. If we are to allow any collection to be partitioned, then we have to support the replication of addPartition and dropPartition. If not, we can restrict their usage to the local database.

The cloner will also need to change to support partitioned collections. I have not given much thought to this problem.

Breaking Up oplog.refs

The challenge with partitioning the oplog.refs collection is that we don't know when it is safe to remove a partition. We don't know if any GTID still in the oplog has a reference to the partition we are to delete. In fact, if a transaction has been alive long enough, it may have uncommitted data to commit to that partition. I could not think of an elegant way to store a partitioned collection and keep track of when a partition was safe to delete.

Here is what we will do. Somewhere, in some dictionary, we will store the maximum GTID that has a reference in a given partition. This dictionary may be the metadata dictionary of the partitioned oplog.refs, or it may be something else. When a transaction that has spilled into oplog.refs gets a GTID assigned, in an alternate transaction stack, we will check the maximum GTID (assuming it is the maximum, otherwise we do nothing) for the partitions touched (there may be more than one). We use an alternate transaction stack because we don't want to bundle the row lock used to updated this value with all the other work done by this transaction. This can hurt concurrency. It's ok to do this work in a separate transaction because we don't need this max GTID to be consistent with the data after a crash. We just need the max GTID to be some "safe" value. If the actual maximum is something less than the recorded maximum, that's ok.

The benefit of this solution is that secondaries can still seamlessly query oplog.refs as it did in 1.3. This makes upgrade much simpler.

When we wish to drop a partition, we first check the maximum GTID recorded. If that GTID has already been deleted due to trimming, we can safely drop tihs partition.

Changes to Replication Using These New Tools

Normal Replication

Normal replication seems straightforward.

Once a day or hour, a new partition will need to be added to the oplog, and a new oplog reference collection will be created. At that time, if data is old enough, we may also drop the oldest partition in the oplog and one or more oplog reference collections, if the associated GTIDs referencing them are dead.

The nice things is syncing requires no changes. Any query that used to work on the oplog will still work on the oplog.

Initial Sync

Initial sync changes are more work. We need to support the clone of a partitioned collection. This is work.

Upgrade Challenges

One upgrade challenge is dealing with new oplog entries. If we expose partitioned collections to be replicated, we need to add oplog messages for "addPartition" and "dropPartition". This is no different than the challenge presented by the new $inc optimization as well. So, this particular challenge is not introduced by this work.

Another upgrade challenge is converting existing non-partitioned oplog and oplog.refs collections into partitioned ones. We will need to add a "convertToPartitioned" command (that will likely need to be replicated if partitioned collections are supported widely), and we need to run this on startup on the oplog and oplog.refs collections. The conversion will likely create a giant partition that is the entire oplog. So, suppose expireOplogDays is 14, and on startup, the oplog has 14 days of data. The "convertToPartitioned" command will create a partition that has 14 days of data. Trying to break up the existing data into separate partitions will be costly for the user, so we should'nt do it. However, the user needs to be aware of the oplog space usage over the initial thirteen days after upgrade. Thirteen more partitions each having a day's worth of data will be created, and then the first giant partition will be dropped. So, on that thirteenth day, users will have 27 days of oplog data lying around. Users need to be aware that for a period of time, their oplog may grow unless they manually decide to drop the giant partition after a few days (and run for a period of time with less oplog data than specified via expireOplogDays). This is something users need to decide on their own how to manage.

Open Questions as of now

  • Will the initial implementation allow partitioned collections outside of the local database? If so, replication of new commands need to be implemented.
  • At some point, will we want to make partitioned collections more fully featured? Adding secondary indexes should not be too difficult. It's just work. Having them work with some of our other fancier features like bulk loading or hot indexing may be difficult if not impossible. Then again, practically no thought has gone into them.
  • Does the answer to the first question depend on the second? Does it make sense to delay making partitioned collections be replicated until they are more fully featured and we know what we want to do with it?
Clone this wiki locally