diff --git a/README.md b/README.md index 6488393..8759ebe 100644 --- a/README.md +++ b/README.md @@ -17,6 +17,7 @@ This package contains a **high-performance, columnar, in-memory storage engine** - Optimized, cache-friendly **columnar data layout** that minimizes cache-misses. - Optimized for **zero heap allocation** during querying (see benchmarks below). - Optimized **batch updates/deletes**, an update during a transaction takes around `12ns`. +- Support for **SIMD-enabled aggregate functions** such as "sum", "avg", "min" and "max". - Support for **SIMD-enabled filtering** (i.e. "where" clause) by leveraging [bitmap indexing](https://github.com/kelindar/bitmap). - Support for **columnar projection** (i.e. "select" clause) for fast retrieval. - Support for **computed indexes** that are dynamically calculated based on provided predicate. @@ -24,6 +25,7 @@ This package contains a **high-performance, columnar, in-memory storage engine** - Support for **transaction isolation**, allowing you to create transactions and commit/rollback. - Support for **expiration** of rows based on time-to-live or expiration column. - Support for **atomic increment/decrement** of numerical values, transactionally. +- Support for **primary keys** for use-cases where offset can't be used. - Support for **change data stream** that streams all commits consistently. - Support for **concurrent snapshotting** allowing to store the entire collection into a file. @@ -37,6 +39,7 @@ The general idea is to leverage cache-friendly ways of organizing data in [struc - [Updating Values](#updating-values) - [Expiring Values](#expiring-values) - [Transaction Commit and Rollback](#transaction-commit-and-rollback) +- [Using Primary Keys](#using-primary-keys) - [Streaming Changes](#streaming-changes) - [Snapshot and Restore](#snapshot-and-restore) - [Complete Example](#complete-example) @@ -192,7 +195,7 @@ players.Query(func(txn *Txn) error { }) ``` -Taking the `Sum()` of a (numeric) column reader will take into account a transaction's current filtering index. +Taking the `Sum()` of a (numeric) column reader will take into account a transaction's current filtering index. ```go players.Query(func(txn *Txn) error { @@ -204,7 +207,7 @@ players.Query(func(txn *Txn) error { txn.WithInt("age", func(v float64) bool { return v < avgAge }) - + // get total balance for 'all rouges younger than the average rouge' balance := txn.Float64("balance").Sum() return nil @@ -243,31 +246,26 @@ players.Query(func(txn *Txn) error { ## Expiring Values -Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `InsertWithTTL()` method on the collection that allows to insert an object with a time-to-live duration defined. +Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `Insert...()` method on the collection that allows to insert an object with a time-to-live duration defined. In the example below we are inserting an object to the collection and setting the time-to-live to _5 seconds_ from the current time. After this time, the object will be automatically evicted from the collection and its space can be reclaimed. ```go -players.InsertObjectWithTTL(map[string]interface{}{ - "name": "Merlin", - "class": "mage", - "age": 55, - "balance": 500, -}, 5 * time.Second) // The time-to-live of 5 seconds +players.Insert(func(r column.Row) error { + r.SetString("name", "Merlin") + r.SetString("class", "mage") + r.SetTTL(5 * time.Second) // time-to-live of 5 seconds + return nil +}) ``` -On an interesting note, since `expire` column which is automatically added to each collection is an actual normal column, you can query and even update it. In the example below we query and conditionally update the expiration column. The example loads a time, adds one hour and updates it, but in practice if you want to do it you should use `Add()` method which can perform this atomically. +On an interesting note, since `expire` column which is automatically added to each collection is an actual normal column, you can query and even update it. In the example below we query and extend the time-to-live by 1 hour using the `Extend()` method. ```go players.Query(func(txn *column.Txn) error { - expire := txn.Int64("expire") - + ttl := txn.TTL() return txn.Range(func(i uint32) { - if v, ok := expire.Get(); ok && v > 0 { - oldExpire := time.Unix(0, v) // Convert expiration to time.Time - newExpire := expireAt.Add(1 * time.Hour).UnixNano() // Add some time - expire.Set(newExpire) - } + ttl.Extend(1 * time.Hour) // Add some time }) }) ``` @@ -304,6 +302,32 @@ players.Query(func(txn *column.Txn) error { }) ``` +## Using Primary Keys + +In certain cases it is useful to access a specific row by its primary key instead of an index which is generated internally by the collection. For such use-cases, the library provides `Key` column type that enables a seamless lookup by a user-defined _primary key_. In the example below we create a collection with a primary key `name` using `CreateColumn()` method with a `ForKey()` column type. Then, we use `InsertKey()` method to insert a value. + +```go +players := column.NewCollection() +players.CreateColumn("name", column.ForKey()) // Create a "name" as a primary-key +players.CreateColumn("class", column.ForString()) // .. and some other columns + +// Insert a player with "merlin" as its primary key +players.InsertKey("merlin", func(r column.Row) error { + r.SetString("class", "mage") + return nil +}) +``` + +Similarly, you can use primary key to query that data directly, without knowing the exact offset. Do note that using primary keys will have an overhead, as it requires an additional step of looking up the offset using a hash table managed internally. + +```go +// Query merlin's class +players.QueryKey("merlin", func(r column.Row) error { + class, _ := r.String("class") + return nil +}) +``` + ## Streaming Changes This library also supports streaming out all transaction commits consistently, as they happen. This allows you to implement your own change data capture (CDC) listeners, stream data into kafka or into a remote database for durability. In order to enable it, you can simply provide an implementation of a `commit.Logger` interface during the creation of the collection. diff --git a/collection.go b/collection.go index 4460c55..c903538 100644 --- a/collection.go +++ b/collection.go @@ -91,12 +91,20 @@ func NewCollection(opts ...Options) *Collection { func (c *Collection) next() uint32 { c.lock.Lock() idx := c.findFreeIndex(atomic.AddUint64(&c.count, 1)) - c.fill.Set(idx) c.lock.Unlock() return idx } +// free marks the index as free, atomically. +func (c *Collection) free(idx uint32) { + c.lock.Lock() + c.fill.Remove(idx) + atomic.StoreUint64(&c.count, uint64(c.fill.Count())) + c.lock.Unlock() + return +} + // findFreeIndex finds a free index for insertion func (c *Collection) findFreeIndex(count uint64) uint32 { fillSize := len(c.fill) @@ -147,16 +155,6 @@ func (c *Collection) Insert(fn func(Row) error) (index uint32, err error) { return } -// InsertWithTTL executes a mutable cursor transactionally at a new offset and sets the expiration time -// based on the specified time-to-live and returns the allocated index. -func (c *Collection) InsertWithTTL(ttl time.Duration, fn func(Row) error) (index uint32, err error) { - err = c.Query(func(txn *Txn) (innerErr error) { - index, innerErr = txn.InsertWithTTL(ttl, fn) - return - }) - return -} - // DeleteAt attempts to delete an item at the specified index for this collection. If the item // exists, it marks at as deleted and returns true, otherwise it returns false. func (c *Collection) DeleteAt(idx uint32) (deleted bool) { @@ -284,14 +282,6 @@ func (c *Collection) QueryAt(idx uint32, fn func(Row) error) error { }) } -// QueryAt jumps at a particular key in the collection, sets the cursor to the -// provided position and executes given callback fn. -func (c *Collection) QueryKey(key string, fn func(Row) error) error { - return c.Query(func(txn *Txn) error { - return txn.QueryKey(key, fn) - }) -} - // Query creates a transaction which allows for filtering and iteration over the // columns in this collection. It also allows for individual rows to be modified or // deleted during iteration (range), but the actual operations will be queued and @@ -319,26 +309,34 @@ func (c *Collection) Close() error { return nil } -// vacuum cleans up the expired objects on a specified interval. -func (c *Collection) vacuum(ctx context.Context, interval time.Duration) { - ticker := time.NewTicker(interval) - for { - select { - case <-ctx.Done(): - ticker.Stop() - return - case <-ticker.C: - now := time.Now().UnixNano() - c.Query(func(txn *Txn) error { - expire := txn.Int64(expireColumn) - return txn.With(expireColumn).Range(func(idx uint32) { - if expirateAt, ok := expire.Get(); ok && expirateAt != 0 && now >= expirateAt { - txn.DeleteAt(idx) - } - }) - }) - } - } +// --------------------------- Primary Key ---------------------------- + +// InsertKey inserts a row given its corresponding primary key. +func (c *Collection) InsertKey(key string, fn func(Row) error) error { + return c.Query(func(txn *Txn) error { + return txn.InsertKey(key, fn) + }) +} + +// UpsertKey inserts or updates a row given its corresponding primary key. +func (c *Collection) UpsertKey(key string, fn func(Row) error) error { + return c.Query(func(txn *Txn) error { + return txn.UpsertKey(key, fn) + }) +} + +// QueryKey queries/updates a row given its corresponding primary key. +func (c *Collection) QueryKey(key string, fn func(Row) error) error { + return c.Query(func(txn *Txn) error { + return txn.QueryKey(key, fn) + }) +} + +// DeleteKey deletes a row for a given primary key. +func (c *Collection) DeleteKey(key string) error { + return c.Query(func(txn *Txn) error { + return txn.DeleteKey(key) + }) } // --------------------------- column registry --------------------------- diff --git a/collection_test.go b/collection_test.go index d623827..12f63b0 100644 --- a/collection_test.go +++ b/collection_test.go @@ -338,11 +338,11 @@ func TestExpire(t *testing.T) { // Insert an object col.InsertObjectWithTTL(obj, time.Microsecond) col.Query(func(txn *Txn) error { - expire := txn.Int64(expireColumn) + ttl := txn.TTL() return txn.Range(func(idx uint32) { - value, _ := expire.Get() - expireAt := time.Unix(0, value) - expire.Set(expireAt.Add(1 * time.Microsecond).UnixNano()) + remaining, ok := ttl.TTL() + assert.True(t, ok) + assert.NotZero(t, remaining) }) }) assert.Equal(t, 1, col.Count()) @@ -355,6 +355,44 @@ func TestExpire(t *testing.T) { assert.Equal(t, 0, col.Count()) } +func TestExpireExtend(t *testing.T) { + col := loadPlayers(500) + assert.NoError(t, col.Query(func(txn *Txn) error { + ttl := txn.TTL() + return txn.Range(func(idx uint32) { + + // When loaded, we should n ot have any expiration set + _, hasExpiration := ttl.ExpiresAt() + assert.False(t, hasExpiration) + _, hasRemaining := ttl.TTL() + assert.False(t, hasRemaining) + + // Extend by 2 hours + ttl.Set(time.Hour) + ttl.Extend(time.Hour) + }) + })) + + // Now we should have expiration time set + assert.NoError(t, col.Query(func(txn *Txn) error { + ttl := txn.TTL() + return txn.Range(func(idx uint32) { + _, hasExpiration := ttl.ExpiresAt() + assert.True(t, hasExpiration) + _, hasRemaining := ttl.TTL() + assert.True(t, hasRemaining) + }) + })) + + // Reset back to zero + assert.NoError(t, col.Query(func(txn *Txn) error { + ttl := txn.TTL() + return txn.Range(func(idx uint32) { + ttl.Set(0) // Reset to zero + }) + })) +} + func TestCreateIndex(t *testing.T) { row := Object{ "age": 35, @@ -530,16 +568,20 @@ func TestInsertWithTTL(t *testing.T) { c := NewCollection() c.CreateColumn("name", ForString()) - idx, err := c.InsertWithTTL(time.Hour, func(r Row) error { - r.SetString("name", "Roman") + idx, err := c.Insert(func(r Row) error { + if _, ok := r.TTL(); !ok { + r.SetTTL(time.Hour) + r.SetString("name", "Roman") + } return nil }) + assert.Equal(t, uint32(0), idx) assert.NoError(t, err) assert.NoError(t, c.QueryAt(idx, func(r Row) error { - expire, ok := r.Int64(expireColumn) + ttl, ok := r.TTL() assert.True(t, ok) - assert.NotZero(t, expire) + assert.NotZero(t, ttl) return nil })) } @@ -594,7 +636,7 @@ func TestReplica(t *testing.T) { } }() - source.Insert(func (r Row) error { + source.Insert(func(r Row) error { r.SetAny("id", "bob") r.SetInt("cnt", 2) return nil @@ -603,7 +645,7 @@ func TestReplica(t *testing.T) { // give the replica stream a moment time.Sleep(100 * time.Millisecond) - target.Query(func (txn *Txn) error { + target.Query(func(txn *Txn) error { assert.Equal(t, 1, txn.Count()) return nil }) @@ -637,7 +679,7 @@ func newEmpty(capacity int) *Collection { }) // Load the items into the collection - out.CreateColumn("serial", ForKey()) + out.CreateColumn("serial", ForString()) out.CreateColumn("name", ForEnum()) out.CreateColumn("active", ForBool()) out.CreateColumn("class", ForEnum()) diff --git a/column_expire.go b/column_expire.go new file mode 100644 index 0000000..7d9bc30 --- /dev/null +++ b/column_expire.go @@ -0,0 +1,106 @@ +// Copyright (c) Roman Atachiants and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. + +package column + +import ( + "context" + "time" +) + +// --------------------------- Expiration (Vacuum) ---------------------------- + +// vacuum cleans up the expired objects on a specified interval. +func (c *Collection) vacuum(ctx context.Context, interval time.Duration) { + ticker := time.NewTicker(interval) + for { + select { + case <-ctx.Done(): + ticker.Stop() + return + case <-ticker.C: + c.Query(func(txn *Txn) error { + ttl, now := txn.TTL(), time.Now() + return txn.With(expireColumn).Range(func(idx uint32) { + if expiresAt, ok := ttl.ExpiresAt(); ok && now.After(expiresAt) { + txn.DeleteAt(idx) + } + }) + }) + } + } +} + +// --------------------------- Expiration (Column) ---------------------------- + +// TTL returns a read-write accessor for the time-to-live column +func (txn *Txn) TTL() ttlWriter { + return ttlWriter{ + rw: int64Writer{ + numericReader: numericReaderFor[int64](txn, expireColumn), + writer: txn.bufferFor(expireColumn), + }, + } +} + +type ttlWriter struct { + rw int64Writer +} + +// TTL returns the remaining time-to-live duration +func (s ttlWriter) TTL() (time.Duration, bool) { + if expireAt, ok := s.rw.Get(); ok && expireAt != 0 { + return readTTL(expireAt), true + } + return 0, false +} + +// ExpiresAt returns the expiration time +func (s ttlWriter) ExpiresAt() (time.Time, bool) { + if expireAt, ok := s.rw.Get(); ok && expireAt != 0 { + return time.Unix(0, expireAt), true + } + return time.Time{}, false +} + +// Set sets the time-to-live value at the current transaction cursor +func (s ttlWriter) Set(ttl time.Duration) { + s.rw.Set(writeTTL(ttl)) +} + +// Extend extends time-to-live of the row current transaction cursor by a specified amount +func (s ttlWriter) Extend(delta time.Duration) { + s.rw.Add(int64(delta.Nanoseconds())) +} + +// readTTL converts expiration to a TTL +func readTTL(expireAt int64) time.Duration { + return time.Unix(0, expireAt).Sub(time.Now()) +} + +// writeTTL converts ttl to expireAt +func writeTTL(ttl time.Duration) int64 { + if ttl > 0 { + return time.Now().Add(ttl).UnixNano() + } + return 0 +} + +// --------------------------- Expiration (Row) ---------------------------- + +// TTL retrieves the time left before the row will be cleaned up +func (r Row) TTL() (time.Duration, bool) { + if expireAt, ok := r.Int64(expireColumn); ok { + return readTTL(expireAt), true + } + return 0, false +} + +// SetTTL sets a time-to-live for a row +func (r Row) SetTTL(ttl time.Duration) { + if ttl > 0 { + r.SetInt64(expireColumn, time.Now().Add(ttl).UnixNano()) + } else { + r.SetInt64(expireColumn, 0) + } +} diff --git a/column_key.go b/column_key.go deleted file mode 100644 index 4a5f1c1..0000000 --- a/column_key.go +++ /dev/null @@ -1,95 +0,0 @@ -// Copyright (c) Roman Atachiants and contributors. All rights reserved. -// Licensed under the MIT license. See LICENSE file in the project root for details. - -package column - -import ( - "fmt" - "sync" - - "github.com/kelindar/column/commit" -) - -// --------------------------- Key ---------------------------- - -// columnKey represents the primary key column implementation -type columnKey struct { - columnString - name string // Name of the column - lock sync.RWMutex // Lock to protect the lookup table - seek map[string]uint32 // Lookup table for O(1) index seek -} - -// makeKey creates a new primary key column -func makeKey() Column { - return &columnKey{ - seek: make(map[string]uint32, 64), - columnString: columnString{ - chunks: make(chunks[string], 0, 4), - }, - } -} - -// Apply applies a set of operations to the column. -func (c *columnKey) Apply(chunk commit.Chunk, r *commit.Reader) { - fill, data := c.chunkAt(chunk) - from := chunk.Min() - - for r.Next() { - offset := r.Offset - int32(from) - switch r.Type { - case commit.Put: - value := string(r.Bytes()) - - fill[offset>>6] |= 1 << (offset & 0x3f) - data[offset] = value - c.lock.Lock() - c.seek[value] = uint32(r.Offset) - c.lock.Unlock() - - case commit.Delete: - fill.Remove(uint32(offset)) - c.lock.Lock() - delete(c.seek, string(data[offset])) - c.lock.Unlock() - } - } -} - -// OffsetOf returns the offset for a particular value -func (c *columnKey) OffsetOf(v string) (uint32, bool) { - c.lock.RLock() - idx, ok := c.seek[v] - c.lock.RUnlock() - return idx, ok -} - -// slice accessor for keys -type keySlice struct { - cursor *uint32 - writer *commit.Buffer - reader *columnKey -} - -// Set sets the value at the current transaction index -func (s keySlice) Set(value string) { - s.writer.PutString(commit.Put, *s.cursor, value) -} - -// Get loads the value at the current transaction index -func (s keySlice) Get() (string, bool) { - return s.reader.LoadString(*s.cursor) -} - -// Enum returns a enumerable column accessor -func (txn *Txn) Key() keySlice { - if txn.owner.pk == nil { - panic(fmt.Errorf("column: primary key column does not exist")) - } - - return keySlice{ - cursor: &txn.cursor, - writer: txn.bufferFor(txn.owner.pk.name), - reader: txn.owner.pk, - } -} diff --git a/column_numeric.go b/column_numeric.go index e98099f..d5a3086 100644 --- a/column_numeric.go +++ b/column_numeric.go @@ -1,3 +1,6 @@ +// Copyright (c) Roman Atachiants and contributors. All rights reserved. +// Licensed under the MIT license. See LICENSE file in the project root for details. + package column import ( diff --git a/column_strings.go b/column_strings.go index 84aa5bb..d37a4dc 100644 --- a/column_strings.go +++ b/column_strings.go @@ -6,6 +6,7 @@ package column import ( "fmt" "math" + "sync" "github.com/kelindar/bitmap" "github.com/kelindar/column/commit" @@ -303,3 +304,92 @@ func (txn *Txn) String(columnName string) stringWriter { writer: txn.bufferFor(columnName), } } + +// --------------------------- Key ---------------------------- + +// columnKey represents the primary key column implementation +type columnKey struct { + columnString + name string // Name of the column + lock sync.RWMutex // Lock to protect the lookup table + seek map[string]uint32 // Lookup table for O(1) index seek +} + +// makeKey creates a new primary key column +func makeKey() Column { + return &columnKey{ + seek: make(map[string]uint32, 64), + columnString: columnString{ + chunks: make(chunks[string], 0, 4), + }, + } +} + +// Apply applies a set of operations to the column. +func (c *columnKey) Apply(chunk commit.Chunk, r *commit.Reader) { + fill, data := c.chunkAt(chunk) + from := chunk.Min() + + for r.Next() { + offset := r.Offset - int32(from) + switch r.Type { + case commit.Put: + value := string(r.Bytes()) + + fill[offset>>6] |= 1 << (offset & 0x3f) + data[offset] = value + c.lock.Lock() + c.seek[value] = uint32(r.Offset) + c.lock.Unlock() + + case commit.Delete: + fill.Remove(uint32(offset)) + c.lock.Lock() + delete(c.seek, string(data[offset])) + c.lock.Unlock() + } + } +} + +// OffsetOf returns the offset for a particular value +func (c *columnKey) OffsetOf(v string) (uint32, bool) { + c.lock.RLock() + idx, ok := c.seek[v] + c.lock.RUnlock() + return idx, ok +} + +// slice accessor for keys +type keySlice struct { + cursor *uint32 + writer *commit.Buffer + reader *columnKey +} + +// Set sets the value at the current transaction index +func (s keySlice) Set(value string) error { + if _, ok := s.reader.OffsetOf(value); !ok { + s.writer.PutString(commit.Put, *s.cursor, value) + return nil + } + + return fmt.Errorf("column: unable to set duplicate key '%s'", value) +} + +// Get loads the value at the current transaction index +func (s keySlice) Get() (string, bool) { + return s.reader.LoadString(*s.cursor) +} + +// Enum returns a enumerable column accessor +func (txn *Txn) Key() keySlice { + if txn.owner.pk == nil { + panic(fmt.Errorf("column: primary key column does not exist")) + } + + return keySlice{ + cursor: &txn.cursor, + writer: txn.bufferFor(txn.owner.pk.name), + reader: txn.owner.pk, + } +} diff --git a/column_test.go b/column_test.go index 1fe6eba..1541319 100644 --- a/column_test.go +++ b/column_test.go @@ -251,14 +251,22 @@ func TestForKindInvalid(t *testing.T) { } func TestAtKey(t *testing.T) { - const serial = "c68a66f6-7b90-4b3a-8105-cfde490df780" + const testKey = "key=20" // Update a name players := loadPlayers(500) - players.QueryKey(serial, func(r Row) error { + players.CreateColumn("pk", ForKey()) + assert.NoError(t, players.Query(func(txn *Txn) error { + pk := txn.Key() + return txn.Range(func(idx uint32) { + pk.Set(fmt.Sprintf("key=%d", idx)) + }) + })) + + assert.NoError(t, players.QueryKey(testKey, func(r Row) error { r.SetEnum("name", "Roman") return nil - }) + })) // Read back and assert assertion := func(r Row) error { @@ -269,9 +277,9 @@ func TestAtKey(t *testing.T) { return nil } - assert.NoError(t, players.QueryKey(serial, assertion)) + assert.NoError(t, players.QueryKey(testKey, assertion)) assert.NoError(t, players.Query(func(txn *Txn) error { - assert.NoError(t, txn.QueryKey(serial, assertion)) + assert.NoError(t, txn.QueryKey(testKey, assertion)) return nil })) } @@ -286,7 +294,24 @@ func TestUpdateAtKeyWithoutPK(t *testing.T) { func TestSelectAtKeyWithoutPK(t *testing.T) { col := NewCollection() - assert.Error(t, col.QueryKey("test", func(r Row) error { + assert.Error(t, col.QueryKey("test", func(r Row) error { return nil })) + assert.Error(t, col.InsertKey("test", func(r Row) error { return nil })) + assert.Error(t, col.UpsertKey("test", func(r Row) error { return nil })) + assert.Error(t, col.DeleteKey("test")) +} + +func TestBulkUpdateDuplicatePK(t *testing.T) { + col := NewCollection() + col.CreateColumn("key", ForKey()) + assert.NoError(t, col.InsertKey("1", func(r Row) error { return nil })) + assert.NoError(t, col.InsertKey("2", func(r Row) error { return nil })) + + // If we attempt to change to an already persisted key, we should get an error + assert.NoError(t, col.Query(func(txn *Txn) error { + pk := txn.Key() + assert.Error(t, txn.QueryKey("1", func(Row) error { + return pk.Set("2") + })) return nil })) } @@ -461,8 +486,7 @@ func TestPKAccessor(t *testing.T) { assert.NoError(t, col.CreateColumn("name", ForKey())) // Insert a primary key value - _, err := col.Insert(func(r Row) error { - r.txn.Key().Set("Roman") + err := col.InsertKey("Roman", func(r Row) error { return nil }) assert.NoError(t, err) @@ -497,6 +521,24 @@ func TestIndexValue(t *testing.T) { assert.True(t, ok) } +func TestDuplicatePK(t *testing.T) { + col := NewCollection() + assert.NoError(t, col.CreateColumn("name", ForKey())) + + // Insert a primary key value + assert.NoError(t, col.InsertKey("Roman", func(r Row) error { + return nil + })) + + // Insert a duplicate + assert.Error(t, col.InsertKey("Roman", func(r Row) error { + return nil + })) + + // Must have one value + assert.Equal(t, 1, col.Count()) +} + func invoke(any interface{}, name string, args ...interface{}) []reflect.Value { inputs := make([]reflect.Value, len(args)) for i := range args { diff --git a/examples/cache/cache.go b/examples/cache/cache.go index 18b776a..1aec6f9 100644 --- a/examples/cache/cache.go +++ b/examples/cache/cache.go @@ -34,7 +34,7 @@ func (c *Cache) Get(key string) (value string, found bool) { // Set updates or inserts a new value func (c *Cache) Set(key, value string) { - if err := c.store.QueryKey(key, func(r column.Row) error { + if err := c.store.UpsertKey(key, func(r column.Row) error { r.SetString("val", value) return nil }); err != nil { diff --git a/txn.go b/txn.go index 9643de9..b8dd52f 100644 --- a/txn.go +++ b/txn.go @@ -5,6 +5,7 @@ package column import ( "errors" + "fmt" "sync" "sync/atomic" "time" @@ -14,7 +15,8 @@ import ( ) var ( - errNoKey = errors.New("column: collection does not have a key column") + errNoKey = errors.New("column: collection does not have a key column") + errUnkeyedInsert = errors.New("column: use InsertKey or UpsertKey methods instead") ) // --------------------------- Pool of Transactions ---------------------------- @@ -80,8 +82,8 @@ type Txn struct { cursor uint32 // The current cursor setup bool // Whether the transaction was set up or not owner *Collection // The target collection - index bitmap.Bitmap // The filtering index - dirty bitmap.Bitmap // The dirty chunks + index bitmap.Bitmap // The filtering index + dirty bitmap.Bitmap // The dirty chunks updates []*commit.Buffer // The update buffers columns []columnCache // The column mapping logger commit.Logger // The optional commit logger @@ -282,23 +284,6 @@ func (txn *Txn) Count() int { return int(txn.index.Count()) } -// QueryKey jumps at a particular key in the collection, sets the cursor to the -// provided position and executes given callback fn. -func (txn *Txn) QueryKey(key string, fn func(Row) error) error { - if txn.owner.pk == nil { - return errNoKey - } - - if idx, ok := txn.owner.pk.OffsetOf(key); ok { - return txn.QueryAt(idx, fn) - } - - // If not found, insert at a new index - idx, err := txn.insert(fn, 0) - txn.bufferFor(txn.owner.pk.name).PutString(commit.Put, idx, key) - return err -} - // DeleteAt attempts to delete an item at the specified index for this transaction. If the item // exists, it marks at as deleted and returns true, otherwise it returns false. func (txn *Txn) DeleteAt(index uint32) bool { @@ -318,36 +303,30 @@ func (txn *Txn) deleteAt(idx uint32) { // InsertObject adds an object to a collection and returns the allocated index. func (txn *Txn) InsertObject(object Object) (uint32, error) { - return txn.insertObject(object, 0) + return txn.InsertObjectWithTTL(object, 0) } // InsertObjectWithTTL adds an object to a collection, sets the expiration time // based on the specified time-to-live and returns the allocated index. func (txn *Txn) InsertObjectWithTTL(object Object, ttl time.Duration) (uint32, error) { - return txn.insertObject(object, time.Now().Add(ttl).UnixNano()) -} - -// Insert executes a mutable cursor transactionally at a new offset. -func (txn *Txn) Insert(fn func(Row) error) (uint32, error) { - return txn.insert(fn, 0) -} - -// InsertWithTTL executes a mutable cursor transactionally at a new offset and sets the expiration time -// based on the specified time-to-live and returns the allocated index. -func (txn *Txn) InsertWithTTL(ttl time.Duration, fn func(Row) error) (uint32, error) { - return txn.insert(fn, time.Now().Add(ttl).UnixNano()) -} - -// insertObject inserts all of the keys of a map, if previously registered as columns. -func (txn *Txn) insertObject(object Object, expireAt int64) (uint32, error) { - return txn.insert(func(Row) error { + return txn.Insert(func(r Row) error { + r.SetTTL(ttl) for k, v := range object { if _, ok := txn.columnAt(k); ok { txn.bufferFor(k).PutAny(commit.Put, txn.cursor, v) } } return nil - }, expireAt) + }) +} + +// Insert executes a mutable cursor transactionally at a new offset. +func (txn *Txn) Insert(fn func(Row) error) (uint32, error) { + if txn.owner.pk != nil { + return 0, errUnkeyedInsert + } + + return txn.insert(fn, 0) } // insert creates an insertion cursor for a given column and expiration time. @@ -357,26 +336,16 @@ func (txn *Txn) insert(fn func(Row) error, expireAt int64) (uint32, error) { idx := txn.owner.next() txn.bufferFor(rowColumn).PutOperation(commit.Insert, idx) - // If no expiration was specified, simply insert - if expireAt == 0 { - return idx, txn.QueryAt(idx, fn) + // If there was an error during insertion, free the index so it can be re-used + if err := txn.QueryAt(idx, fn); err != nil { + txn.owner.free(idx) + return idx, err } - // If expiration was specified, set it - return idx, txn.QueryAt(idx, func(r Row) error { - r.SetInt64(expireColumn, expireAt) - return fn(r) - }) + return idx, nil } -// DeleteAll marks all of the items currently selected by this transaction for deletion. The -// actual delete will take place once the transaction is committed. -func (txn *Txn) DeleteAll() { - txn.initialize() - txn.index.Range(func(x uint32) { - txn.deleteAt(x) - }) -} +// --------------------------- Iteration ---------------------------- // Range selects and iterates over result set. In each iteration step, the internal // transaction cursor is updated and can be used by various column accessors. @@ -392,10 +361,86 @@ func (txn *Txn) Range(fn func(idx uint32)) error { return nil } +// DeleteAll marks all of the items currently selected by this transaction for deletion. The +// actual delete will take place once the transaction is committed. +func (txn *Txn) DeleteAll() { + txn.initialize() + txn.index.Range(func(x uint32) { + txn.deleteAt(x) + }) +} + +// --------------------------- Primary Key ---------------------------- + +// InsertKey inserts a row given its corresponding primary key. +func (txn *Txn) InsertKey(key string, fn func(Row) error) error { + if txn.owner.pk == nil { + return errNoKey + } + + if idx, ok := txn.owner.pk.OffsetOf(key); ok { + return fmt.Errorf("column: key '%s' already exists at offset %d", key, idx) + } + + // If not found, insert at a new index + idx, err := txn.insert(fn, 0) + txn.bufferFor(txn.owner.pk.name).PutString(commit.Put, idx, key) + return err +} + +// UpsertKey inserts or updates a row given its corresponding primary key. +func (txn *Txn) UpsertKey(key string, fn func(Row) error) error { + if txn.owner.pk == nil { + return errNoKey + } + + if idx, ok := txn.owner.pk.OffsetOf(key); ok { + return txn.QueryAt(idx, fn) + } + + // If not found, insert at a new index + idx, err := txn.insert(fn, 0) + txn.bufferFor(txn.owner.pk.name).PutString(commit.Put, idx, key) + return err +} + +// QueryKey queries/updates a row given its corresponding primary key. +func (txn *Txn) QueryKey(key string, fn func(Row) error) error { + if txn.owner.pk == nil { + return errNoKey + } + + if idx, ok := txn.owner.pk.OffsetOf(key); ok { + return txn.QueryAt(idx, fn) + } + + return fmt.Errorf("column: key '%s' was not found", key) +} + +// DeleteKey deletes a row for a given primary key. +func (txn *Txn) DeleteKey(key string) error { + if txn.owner.pk == nil { + return errNoKey + } + + if idx, ok := txn.owner.pk.OffsetOf(key); ok { + txn.deleteAt(idx) + return nil + } + + return fmt.Errorf("column: key '%s' was not found", key) +} + +// --------------------------- Commit & Rollback ---------------------------- + // Rollback empties the pending update and delete queues and does not apply any of // the pending updates/deletes. This operation can be called several times for // a transaction in order to perform partial rollbacks. func (txn *Txn) rollback() { + txn.owner.lock.Lock() + atomic.StoreUint64(&txn.owner.count, uint64(txn.owner.fill.Count())) + txn.owner.lock.Unlock() + txn.reset() } @@ -505,16 +550,6 @@ func (txn *Txn) commitMarkers(chunk commit.Chunk, fill bitmap.Bitmap, buffer *co txn.owner.lock.Unlock() } -// findMarkers finds a set of insert/deletes -func (txn *Txn) findMarkers() (*commit.Buffer, bool) { - for _, u := range txn.updates { - if !u.IsEmpty() && u.Column == rowColumn { - return u, true - } - } - return nil, false -} - // commitCapacity grows all columns until they reach the max index func (txn *Txn) commitCapacity(last commit.Chunk) { txn.owner.lock.Lock() @@ -535,3 +570,15 @@ func (txn *Txn) commitCapacity(last commit.Chunk) { column.Grow(max) }) } + +// --------------------------- Buffer Lookups ---------------------------- + +// findMarkers finds a set of insert/deletes +func (txn *Txn) findMarkers() (*commit.Buffer, bool) { + for _, u := range txn.updates { + if !u.IsEmpty() && u.Column == rowColumn { + return u, true + } + } + return nil, false +} diff --git a/txn_test.go b/txn_test.go index 57faa5e..f2c35d5 100644 --- a/txn_test.go +++ b/txn_test.go @@ -465,13 +465,13 @@ func TestUpsertKey(t *testing.T) { c := NewCollection() c.CreateColumn("key", ForKey()) c.CreateColumn("val", ForString()) - assert.NoError(t, c.QueryKey("1", func(r Row) error { + assert.NoError(t, c.UpsertKey("1", func(r Row) error { r.SetString("val", "Roman") return nil })) count := 0 - assert.NoError(t, c.QueryKey("1", func(r Row) error { + assert.NoError(t, c.UpsertKey("1", func(r Row) error { count++ return nil })) @@ -484,14 +484,111 @@ func TestUpsertKeyNoColumn(t *testing.T) { c.CreateColumn("key", ForKey()) assert.Panics(t, func() { - c.QueryKey("1", func(r Row) error { + c.UpsertKey("1", func(r Row) error { r.Enum("xxx") return nil }) }) } -func TestDuplicateKey(t *testing.T) { +func TestDeleteKey(t *testing.T) { + c := NewCollection() + c.CreateColumn("key", ForKey()) + c.CreateColumn("val", ForString()) + assert.NoError(t, c.InsertKey("1", func(r Row) error { + r.SetString("val", "Roman") + return nil + })) + + // Only one should succeed + assert.NoError(t, c.DeleteKey("1")) + assert.Error(t, c.DeleteKey("1")) + assert.Equal(t, 0, c.Count()) +} + +func TestInsertKey(t *testing.T) { + c := NewCollection() + c.CreateColumn("key", ForKey()) + + // Only one should succeed + assert.NoError(t, c.InsertKey("1", func(r Row) error { + return nil + })) + assert.Error(t, c.InsertKey("1", func(r Row) error { + return nil + })) + assert.Equal(t, 1, c.Count()) +} + +func TestQueryKey(t *testing.T) { + c := NewCollection() + c.CreateColumn("key", ForKey()) + c.CreateColumn("val", ForString()) + + assert.Error(t, c.QueryKey("1", func(r Row) error { + return nil + })) + + assert.NoError(t, c.InsertKey("1", func(r Row) error { + r.SetString("val", "Roman") + return nil + })) + + assert.NoError(t, c.QueryKey("1", func(r Row) error { + return nil + })) +} + +func TestChangeKey(t *testing.T) { + c := NewCollection() + c.CreateColumn("key", ForKey()) + + // Try to change the key from "1" to "2" + assert.NoError(t, c.InsertKey("1", func(r Row) error { return nil })) + assert.NoError(t, c.QueryKey("1", func(r Row) error { + r.SetKey("2") + return nil + })) + + // Must now have "2" + assert.NoError(t, c.QueryKey("2", func(r Row) error { return nil })) + assert.Equal(t, 1, c.Count()) +} + +func TestRollbackInsert(t *testing.T) { + col := NewCollection() + assert.NoError(t, col.CreateColumn("name", ForString())) + + // Insert successfully + idx0, err := col.Insert(func(r Row) error { + return nil + }) + assert.NoError(t, err) + assert.Equal(t, uint32(0), idx0) + + // Insert with error + idx1, err := col.Insert(func(r Row) error { + return fmt.Errorf("error") + }) + assert.Error(t, err) + assert.Equal(t, uint32(1), idx1) + + // Should only have 1 element + assert.Equal(t, 1, col.Count()) +} + +func TestUnkeyedInsert(t *testing.T) { + col := NewCollection() + assert.NoError(t, col.CreateColumn("key", ForKey())) + + // Insert should fail, as one should use InsertKey() method + _, err := col.Insert(func(r Row) error { + return nil + }) + assert.Error(t, err) +} + +func TestDuplicateKeyColumn(t *testing.T) { c := NewCollection() assert.NoError(t, c.CreateColumn("key1", ForKey())) assert.Error(t, c.CreateColumn("key2", ForKey())) @@ -513,8 +610,7 @@ func TestRowMethods(t *testing.T) { c.CreateColumn("float32", ForFloat32()) c.CreateColumn("float64", ForFloat64()) - c.Insert(func(r Row) error { - r.SetKey("key") + c.InsertKey("key", func(r Row) error { r.SetBool("bool", true) r.SetAny("name", "Roman") @@ -575,9 +671,7 @@ func TestRow(t *testing.T) { wg.Add(2) go c.Query(func(txn *Txn) error { - txn.Insert(func(r Row) error { - name := txn.Key() - name.Set("Roman") + txn.InsertKey("Roman", func(r Row) error { return nil }) wg.Done()