-
Notifications
You must be signed in to change notification settings - Fork 51
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Add thread safe transactional in-memory datastore #947
Conversation
Codecov Report
@@ Coverage Diff @@
## develop #947 +/- ##
===========================================
+ Coverage 57.11% 57.75% +0.64%
===========================================
Files 164 167 +3
Lines 19020 19408 +388
===========================================
+ Hits 10863 11210 +347
- Misses 7173 7204 +31
- Partials 984 994 +10
|
13fffcc
to
802b146
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall, great implementation!!
There's some notable suggestions, and some minimal ones.
But looking great! Should be easy enough to resolve everything
datastore/memory/memory.go
Outdated
// NewStore constructs an empty Store. | ||
func NewStore() (d *Store) { | ||
return &Store{ | ||
values: btree.NewMap[string, []byte](2), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: I can't actually find any info/docs on the degree parameter of 2
in this NewMap
call.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It represents the number of items and children a node will have. A degree of 2 means a node will have 1-3 items and 2-4 children.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What made you choose 2?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest exposing this param, as it sounds very use-case specific (can be done outside of the PR, but might be worth a ticket+todo now)
datastore/memory/txn.go
Outdated
for k, v := range t.ops { | ||
if _, handled := handledOps[k]; handled { | ||
continue | ||
} | ||
|
||
if v.delete { | ||
continue | ||
} | ||
e := dsq.Entry{Key: k.String(), Size: len(v.value)} | ||
if !q.KeysOnly { | ||
e.Value = v.value | ||
} | ||
re = append(re, e) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: Appending the ops
entries to the results slice re
will produce an un-ordered slice. Specifically, it will produce one ordered slice with extra unordered elements.
There's a few options here.
-
We leave it as is, but need to ensure that within
NaiveQueryApply
we make sure thatNaiveOrder
is executing properly. As the name suggests, it will effectively do a sort on the results, then pass it along.
This is likely exactly whats happening now, but its pretty in-effecient, ideally should be avoided -
We insert "in-order" in this function here, but that might be awkward as it will require a bunch of scans to find the insert position since we're insert records from an unordered map into an ordered slice.
-
We merge the
re
values slice, with a new sorted slice from the ops map. Then its just merging two sorted arrays which is pretty easy (similar to the Mergeiterator design below) -
Looking at how Badger does this, it effectively produces two iterators, one for
values
and another forops
. Then it combines them into aMergeIterator
, which will consume both iterators, returning the smaller of the two values returned from the two individual iterators. -
Iunno..something else?? Maybe ops is a linked list, but that seems dumb 🙃.
Note: eventually this whole func can be closer to the badger impl which is a "streaming" iterator system, instead of loading all store values into an array.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just noting that Option 2 can be made really quite efficient if we want to avoid anything too fancy here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the txn store to a btree so the ops are ordered. Check out the latest changes and let me know what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the txn store to a btree so the ops are ordered. Check out the latest changes and let me know what you think.
I did originally have this as an option, but removed it. Its basically trading efficiency between the individual ops (Put, Delete) and Commit. So individual ops are more expensive so the Commit is cheaper. Its certainly a valid option, just wanted to outline the (potential) downsides.
datastore/memory/txn.go
Outdated
t.target.mu.Lock() | ||
defer t.target.mu.Unlock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This part makes things a bit tricky w.r.t the mutexs on the main store. In general, this transaction implementation doesn't protect against conflicts, its more akin to a basic batch, exposed as a transaction interface.
We could create a new lock for the target store, called txnmu
or something (transaction mutex) which is a Read/Write mutex. The main store operations (Get, Set, Delete, etc...) can apply a read lock. And the transaction Commit()
can apply a write
lock.
This still won't give us "ACID" transactions, which we are pretty far from, but does give us better protection/semantics, as it means that all the operations from the transactions are applied, without getting overwritten during a commit.
Technically at the moment we have this "all operations" committed system with the current design, but thats because we are being very aggressive with the locks, which aren't necessary as Ive mentioned on the main store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check out the latest changes and let me know what you think.
I gave it a quick scan and it looks good - will probably review in more detail in a bit, but nothing jumped out as needing serious attention. I do have a couple of general questions though - why do we need this, and what are we planning on using it for? In my eyes any serious use case would be much better off using badger-im or similar, and from memory it was surprisingly performant/lightweight too have you compared the benchmarks against this implementation? |
The main reason I created this is that after a discussion with John, we came to the conclusion that we don't want to use badger-im for the version store because of the overhead involved. Also, the currently used |
Are you guys sure the badger-im overhead is significant, and worth the risk to avoid? And are we really sure we want to preserve the current versionedFetcher mechanic of creating a new temp-store, instead of directly fetching from the underlying persisted-stores? |
6c3c06a
to
31f4147
Compare
Yeah, Badger (even in memory) does a lot on initialization. Heres the overhead of the two implementations, using |
Heres the benchmark code in case anyone is curious: var resultBadger *badgerds.Datastore
var resultBtree *memoryds.Datastore
func BenchmarkBadgerNew(b *testing.B) {
var d *badgerds.Datastore
for i := 0; i < b.N; i++ {
opts := badgerds.Options{Options: badger.DefaultOptions("").WithInMemory(true)}
d, _ = badgerds.NewDatastore("", &opts)
}
resultBadger = d
}
func BenchmarkBtreeNew(b *testing.B) {
var d *memoryds.Datastore
for i := 0; i < b.N; i++ {
d = memoryds.NewDatastore()
}
resultBtree = d
} |
Sweet - cheers for running that John - they look like sensible benchmarks. Badger-im is much heavier than I had in my head (maybe the vague memory in my head was just read-write benches?). I'd still be tempted to stick with badger anyway for time-travelling, as that is still only 0.4ms per query, but I do see why you guys are avoiding it :) |
I don't understand the version fetcher enough at the moment to comment on this. @jsimnz can you add your input? |
datastore/memory/txn.go
Outdated
@@ -16,23 +16,24 @@ import ( | |||
|
|||
ds "github.com/ipfs/go-datastore" | |||
dsq "github.com/ipfs/go-datastore/query" | |||
"github.com/tidwall/btree" | |||
) | |||
|
|||
// basicTxn implements ds.Txn | |||
type basicTxn struct { | |||
mu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: The Txn Mutex is now no longer necessary since youre using a btree instead of a map to store txn ops.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
true
datastore/memory/txn.go
Outdated
iterOpsHasMore := iterOps.Next() | ||
for iter.Next() { | ||
e := dsq.Entry{} | ||
if op, exists := t.ops[ds.NewKey(iter.Key())]; exists { | ||
handledOps[ds.NewKey(iter.Key())] = struct{}{} | ||
if op.delete { | ||
continue | ||
} | ||
e.Key = iter.Key() | ||
e.Size = len(op.value) | ||
if !q.KeysOnly { | ||
e.Value = op.value | ||
for { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: This either needs clear documentation on the intent/design of this entire looping mechanic, or it needs to be re-worked.
Im (slightly) feeling like it should be reworked as its a little all over the place. 2 nested for loops, then 2 nested if loops, with break
s and continue
s to track, followed by an additional for loop with its own break
and continue
. Feels like it could be more succinct.
Happy to make suggestions if you want :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reworked the looping mechanic. The nested loops are unavoidable if we care about the order of the keys. But you'll see that it's much simplified.
datastore/memory/memory.go
Outdated
mu sync.Mutex | ||
// Datastore uses a btree for internal storage. | ||
type Datastore struct { | ||
txnmu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: I think we talked about this during the standup by might not have effectively communicated - this could be a sync.RWMutex
and each regular Datastore operation can acquire a readlock, and the transaction commit can acquire a writelock.
That way regular (non-txn) ops dont step on eachother (read lock) and you can make sure the transaction is atomic (no other ops can happen when a txn is being commited)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it. Let me know if that's what you meant.
datastore/memory/memory.go
Outdated
|
||
// Datastore uses a btree for internal storage. | ||
type Datastore struct { | ||
txnmu sync.RWMutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: This datastore will run very poorly at scale - it is essentially single threaded and would represent a very serious bottleneck in any real-world system. The transaction also does not not thread-safe (particularly the iterators in Query), and the testing is really insufficient for any serious-usage.
Given that this type is public-public, it would be worth documenting this as it seems irresponsible for us to suggest to users that they should keep their data in it in it's present form. Looks like it'll work well for for the version stuff so long as planner remains single-threaded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This datastore will run very poorly at scale - it is essentially single threaded and would represent a very serious bottleneck in any real-world system. The transaction also does not not thread-safe (particularly the iterators in Query)
What makes you say that? I disagree completely with that statement. I might be missing something in my analysis but if you're making that statement, please give an example to prove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which bit?
The datastore itself is locked behind a single mutex - only single operation can be handled at once - regardless of where or what it might be, so your 256 core server is still only able to handle a single op at a time.
In txn.Query you are iterating over multiple iterators in-tandem without any locks, that doesn't look like it is safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, looks like @jsimnz and I got duped. tidwall/btree#22
The package does implement locks but not for the Map version. I had them initially and will put them back.
The datastore itself is locked behind a single mutex - only single operation can be handled at once - regardless of where or what it might be, so your 256 core server is still only able to handle a single op at a time.
(answering this as if lock was implemented). There can be "unlimited" reads at once but only one write. I don't think having multiple write at once is safe for a database.
In txn.Query you are iterating over multiple iterators in-tandem without any locks, that doesn't look like it is safe?
This was implemented with the thought that btee.Map implemented locks. Will fix.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(answering this as if lock was implemented). There can be "unlimited" reads at once but only one write. I don't think having multiple write at once is safe for a database.
Nothing can read if a write lock is held - a single write will lock any thread out of the whole datastore
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would you expect anything different?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are many ways in which you can reduce the scope of any given lock, although at the moment due to the issue flagged in Put/Delete you aren't ever actually locking this mutex.
For example consider a store with two keys A and B. There is no reason to lock reads of the B value if the A value is being written to.
In larger real-world situations you can chunk this, so that you dont have a mutex per value etc
It is actually fairly easy to see this chunking mechanic in SQL implementations - there are a small number of operations (like creating indexes in Oracle...) that will lock an entire table, but I dont recall coming across one that would lock the entire database.
UPDATE: It might be quite unpleasant to do this properly if locking outside of the btree as the code is trying to do so now, you might need to change that, or accept and document that this is essentially a single-thread-only bottleneck.
) | ||
|
||
// Datastore uses a btree for internal storage. | ||
type Datastore struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thought: If we are going to maintain our own datastore we should probably look at implementing the IterableDatastore interface too so that we dont have to shim it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that will be implemented in a separate PR :)
datastore/memory/memory.go
Outdated
|
||
// Put implements ds.Put | ||
func (d *Datastore) Put(ctx context.Context, key ds.Key, value []byte) (err error) { | ||
d.txnmu.RLock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
todo: It does look like this should be Lock and not RLock, as you are writing? Same for in Delete
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No this is correct but there needs to be an other lock (see response above regarding no locks on btree.Map)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the mutex can be dropped then - if you are only calling ReadLock, it is not doing anything at all besides creating an illusion of thread-safety.
EDIT: To clarify - I am extremely skeptical that this doesnt need to be a WriteLock, but will see once you have made your other changes as it sounds like you have a cunning plan :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@AndrewSisley It is very much doing something :)
The hint is that its called txnmu
and not just mu
. Basically, non-transactional operations ds.Put(), ds.Delete, etc...
will acquire a read lock as they are single operations on the Btree (The Btree is already thread safe btw).
The goal is here is regarding the actual Txn
type. During Commit
, the basicTxn
will aquire a write lock on that datastore txnmu
so that it can commit all the ops contained within the transaction, without other operations conflicting with it. This is required for atomicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Screams] You guys are exposing the lock and locking on it outside of the host object? I missed that - can you please please hide that behind a function on DataStore, with a very large and clear warning in the function documentation:
func (d *Datastore) Lock() {
d.txnmu.Lock()
}
Directly locking on an object you don't own is about the easiest way I know of in programming to generate really painful to debug and frequently occuring lock related issues 😅 😅 😅
Sorry for being a bit dramatic, but that is really smelly 😆 On the plus side, the code makes sense now 😁
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The datastore owns the object. It was to be specific that this lock would only be used during a transaction.
// best effort allocation | ||
re := make([]dsq.Entry, 0, t.ds.values.Len()+t.ops.Len()) | ||
iter := t.ds.values.Iter() | ||
iterOps := t.ops.Iter() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
to expand on my earlier comment RE thread-safety in transactions, even if foo.Iter()
returned a perfectly cloned instance of the datastore that was 100% threadsafe, you'd still have a race condition here between calling t.ds.values.Iter()
and t.ops.Iter()
.
I am somewhat doubtful that the iterables are that safe too, and I would be supprised if the values that they yield are set in iterator instantiation - meaning that when you iterate through them below their values may be changing causing Next
and Value
calls to return a jumbled mess.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indeed they aren't safe. tidwall/btree#22. Will fix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one is certainly a difficult one, but it is independent of the other comment you made (However they are related on the general concept of thread safety :) ).
The part that makes this difficult is the "Snapshot Isolation" mechanics missing from this implementation in general. Due to its complexity, I recommend we leave the SI mechanics for a follow up PR.
As far as ACID (Atomic, Consistent, Isolation, Durable) goes, this implementation is fairly atomic, semi consistent, not isolated, and durable. But its a process, and the only place this is used atm is the VersionedFetcher. Work will continue before its an officially supported general-purpose in-memory store.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the Write lock issue is very much a blocker for me, so just marking this as 'request changes'.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy with the design etc - was a good call, thanks Fred
2c3e71c
to
dc5e25d
Compare
dc5e25d
to
cea676e
Compare
mu is widely used in the go community to represent a sync.Mutex
fix typo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Few minor things, but its up to you on how you'd want to (or need to) handle them.
Marking this as approved for now, as its in a good enough state 👍.
if t.readOnly { | ||
return ErrReadOnlyTxn | ||
} | ||
t.ops.Set(dsItem{key: key.String(), version: t.getTxnVersion(), val: value}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Can you run through the relationship between txn.dsVersion
and txn.txnVersion
? Especially since in the commitHandler
we run ds.nextVerision
before we actually set the value in the ds btree.
Seems like we don't need both of them. Lmk.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
txn.dsVersion
is the version of the datastore when the transaction was initiated. txn.txnVersion
Is used simply to have a version that is higher than that of the txn.dsVersion
while we operate the transaction. I could remove it and simply add 1 to txn.dsVersion
and we would get the same result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed txnVersion
from both the Datastore and the transaction.
✅
} | ||
|
||
// Query implements ds.Query | ||
func (t *basicTxn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: Technically Query
needs to track the read keys like we do in Get
via the isGet
bool, since iterating is "reading" each key, so they need to be tracked for conflict checks. But whether we need this atm is debatable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I'm already doing what you're suggesting.
txnVersion: t.getTxnVersion(), | ||
}, | ||
) | ||
t.ds.clearOldInFlightTxn(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: This seems like a heavy function to call on every txn.Discard
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are no other in flight transactions, it's a no-op. Otherwise, their might just be a handfull of other in-flight transaction and the btree iter will be quite fast. I wouldn't qualify that as a heavy function. The alternative was to run in on time.After(x)
in the purgeOldVersions
loop.
datastore/memory/txn.go
Outdated
for iter.Next() { | ||
expectedItem := t.ds.get(ctx, ds.NewKey(iter.Item().key), t.getDSVersion()) | ||
latestItem := t.ds.get(ctx, ds.NewKey(iter.Item().key), t.ds.getVersion()) | ||
if latestItem.isDeleted || latestItem.version != expectedItem.version { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: I'm not 100% convinced on this conflict routine. Specifically the isDelete
check.
If the last operation to an item before the start of a transaction is a Delete()
then the latest version will be isDelete = true
. If the current transaction Updates that value, that is a non-conflicting change, but this will mark is a conflict because of the isDelete
check.
If im understanding this correctly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the check of isDeleted
.
✅
d9b68fc
to
aceed6e
Compare
…k#947) Relevant issue(s) sourcenetwork#946 Description This PR provides a thread safe in memory store that supports both ds.Batching and ds.TxnFeature. It uses a btree as storage.
Relevant issue(s)
Resolves #946
Description
This PR provides a thread safe in memory store that supports both
ds.Batching
andds.TxnFeature
. It uses a btree as storage.Tasks
How has this been tested?
make test
Specify the platform(s) on which this was tested: