Skip to content

TokuMX Bulk Loading

John Esmet edited this page Jul 15, 2013 · 15 revisions

Introduction

We wish to improve the user experience with importing bulk data. Straight inserts into TokuMX collections are fast, but the fractal tree bulk loader is faster due to multithreading, especially when secondary indexes are defined before the import.

Design

Introduce a bulk load mode to the Client object. Clients enter bulk load mode with the name of a collection. Bulk load mode requires an existing multi-statement transaction and creates a child transaction for the load. Once locked, the empty collections will utilize a bulk loader for future inserts from the calling Client. The collection may not be updated, queried, or deleted from.

API

db.runCommand({ 'beginLoad' : [ 'coll1' , ... ] })

...

db.runCommand({ 'commitLoad' : 1 }) or db.runCommand({ 'abortLoad' : 1 })

Example workflow

db.runCommand({ 'beginTransaction' : 1 });

db.coll1.drop();

db.coll2.insert({ z: 1 })

db.coll1.ensureIndex({ a: 1 })

db.coll1.ensureIndex({ b: 1 })

db.runCommand({ 'beginLoad' : [ 'coll1', 'coll2' ] })

db.coll2.insert({ z: 100 }) // does not utilize a loader, just uses inserts. queries are ok.

db.coll1.insert({ a: 101, b: 102 }) // utilizes a loader for all indexes. queries are not ok.

db.runCommand({ 'commitLoad' : 1 }) // or db.runCommand({ 'abortLoad' : 1 }) to abort the load

db.runCommand({ 'commitTransaction' : 1 }) // or db.runCommand({ 'rollbackTransaction' : 1 }) to roll back

// queries are now ok for both

Issues

  • Collections under-going bulk load must not be the target of any other clients' writes.

    • Proposed solution: Maintain the connection id of the loading client. Reject all other clients for write operations.
  • Big transactional imports and replication can cause slave delay. On commit, there will be a massive oplog entry to process. New transactions will not get processed to secondaries until the big txn finishes (which can take a very long time).

    • Proposed solution: Send uncommitted load inserts to the slave, which will be processed by a local loader. On commit, the secondary load commits, etc. Aborting is still cheap (that is, there is no massive transaction to rollback on the secondary).
  • Loads into a sharded environment will need to run the begin/abort load commands on each shard in the cluster since client inserts will be routed to any one of the shards by the mongos process. Further, the balancer will need to be disabled during load.

    • Proposed solution: We'll just need to make those changes to mongos.
  • If we load a very large collection into one chunk and it doesn't get a chance to split until after the load is done, this causes two problems:

    • splitVector can only return 8192 split points, so if the collection is larger than (chunk size) * 8k, it'll fail (same problem as trying to shard an existing large collection).
    • It will be expensive to do all the migrations that will be necessary after you finish the load.

    Therefore, it would be better to pre-split the collection before the load happens. We may want to enforce this somehow. It probably only makes sense as a way to restore from a copy of an already sharded collection (so you have the chunk boundaries up front). This is not really a sensible thing to do for an initial load into a sharded collection, if the collection is very large. In this case it would almost certainly be better to just do normal inserts and let balancing happen.

  • Replication write concern won't work on a per-insert basis if everything is done in one transaction. This matters because the mongoimport and mongorestore tools allow users to specify what the write concern should be during load.

    • This probably doesn't matter too much, since a bulk load would be an all-or-nothing operation anyway. Write concern makes sense in Vanilla where there are no transactions.
  • The mongoimport and mongorestore tools allow users to upsert backed up data into collections that already have data. The loader can't be used here since the tree isn't searchable until it's finished building.

    • We can easily detect whether or not upserts are going to be used and not use a loader in this case. It's up to Tokutek and friends to advise against upsert mode.

Testing

  • Conditions to initiate load:
    • ns must not exist
      • especially if it's provisionally created or provisionally dropped in another transaction
    • Indexes must be an array of index spec objects
      • each spec's ns field, if provided, must be correct
      • name must be provided
    • Options must be an object
  • Conditions during load:
    • Only the loading client may access the ns
    • No writes other than insert on the ns
      • no updates w/ upsert
      • no findAndModify w/ upsert
    • Queries of all flavors must return the empty set, with no error
      • non-upsert updates should always match nothing
      • deletes should always match nothing
    • drop/dropIndexes/dropDatabase() must fail, from any client, and not affect the load
    • abort transaction should automatically abort load
    • commit transaction should fail if a load is in progress. it should succeed once the load is committed/aborted.
  • Conditions after load
    • Unique indexes are properly verified.
    • Sparse indexes are indeed sparse (how do we test that?)
    • Committing a load makes its data visible to the loading client immediately, and visible to other clients after transaction commit.
      • Index options should be correct
        • ns field must be the correct value if not provided in beginLoad
      • Collection options should be correct
    • After aborting the transaction, the collection should be in a sane state.
      • the collection should revert to non-existent
      • it should become visible to the calling client and other clients immediately.
      • should be able to later create the ns
        • using insert
        • using create command
  • Replication conditions:
    • Primary should replicate begin/commit load commands
    • Secondary should enter load state for the ns
      • All reads return the empty set
      • update/delete oplog entries should never be processed for the ns
    • begin/commit load should be atomic with respect to fail-over
  • Sharding conditions:
    • mongos should reject begin/commit load commands

Action items

  • Enhance the ydb put multiple API to support multiple secondary keys per primary row
  • Modify mongoimport / mongorestore.
  • Later: More parallelism in the fractal tree bulk loader? Performance measurements will tell.
Clone this wiki locally