Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed duplicate primary keys #68

Merged
merged 2 commits into from
Jun 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 41 additions & 17 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ This package contains a **high-performance, columnar, in-memory storage engine**
- Optimized, cache-friendly **columnar data layout** that minimizes cache-misses.
- Optimized for **zero heap allocation** during querying (see benchmarks below).
- Optimized **batch updates/deletes**, an update during a transaction takes around `12ns`.
- Support for **SIMD-enabled aggregate functions** such as "sum", "avg", "min" and "max".
- Support for **SIMD-enabled filtering** (i.e. "where" clause) by leveraging [bitmap indexing](https://github.com/kelindar/bitmap).
- Support for **columnar projection** (i.e. "select" clause) for fast retrieval.
- Support for **computed indexes** that are dynamically calculated based on provided predicate.
- Support for **concurrent updates** using sharded latches to keep things fast.
- Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
- Support for **expiration** of rows based on time-to-live or expiration column.
- Support for **atomic increment/decrement** of numerical values, transactionally.
- Support for **primary keys** for use-cases where offset can't be used.
- Support for **change data stream** that streams all commits consistently.
- Support for **concurrent snapshotting** allowing to store the entire collection into a file.

Expand All @@ -37,6 +39,7 @@ The general idea is to leverage cache-friendly ways of organizing data in [struc
- [Updating Values](#updating-values)
- [Expiring Values](#expiring-values)
- [Transaction Commit and Rollback](#transaction-commit-and-rollback)
- [Using Primary Keys](#using-primary-keys)
- [Streaming Changes](#streaming-changes)
- [Snapshot and Restore](#snapshot-and-restore)
- [Complete Example](#complete-example)
Expand Down Expand Up @@ -192,7 +195,7 @@ players.Query(func(txn *Txn) error {
})
```

Taking the `Sum()` of a (numeric) column reader will take into account a transaction's current filtering index.
Taking the `Sum()` of a (numeric) column reader will take into account a transaction's current filtering index.

```go
players.Query(func(txn *Txn) error {
Expand All @@ -204,7 +207,7 @@ players.Query(func(txn *Txn) error {
txn.WithInt("age", func(v float64) bool {
return v < avgAge
})

// get total balance for 'all rouges younger than the average rouge'
balance := txn.Float64("balance").Sum()
return nil
Expand Down Expand Up @@ -243,31 +246,26 @@ players.Query(func(txn *Txn) error {

## Expiring Values

Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `InsertWithTTL()` method on the collection that allows to insert an object with a time-to-live duration defined.
Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `Insert...()` method on the collection that allows to insert an object with a time-to-live duration defined.

In the example below we are inserting an object to the collection and setting the time-to-live to _5 seconds_ from the current time. After this time, the object will be automatically evicted from the collection and its space can be reclaimed.

```go
players.InsertObjectWithTTL(map[string]interface{}{
"name": "Merlin",
"class": "mage",
"age": 55,
"balance": 500,
}, 5 * time.Second) // The time-to-live of 5 seconds
players.Insert(func(r column.Row) error {
r.SetString("name", "Merlin")
r.SetString("class", "mage")
r.SetTTL(5 * time.Second) // time-to-live of 5 seconds
return nil
})
```

On an interesting note, since `expire` column which is automatically added to each collection is an actual normal column, you can query and even update it. In the example below we query and conditionally update the expiration column. The example loads a time, adds one hour and updates it, but in practice if you want to do it you should use `Add()` method which can perform this atomically.
On an interesting note, since `expire` column which is automatically added to each collection is an actual normal column, you can query and even update it. In the example below we query and extend the time-to-live by 1 hour using the `Extend()` method.

```go
players.Query(func(txn *column.Txn) error {
expire := txn.Int64("expire")

ttl := txn.TTL()
return txn.Range(func(i uint32) {
if v, ok := expire.Get(); ok && v > 0 {
oldExpire := time.Unix(0, v) // Convert expiration to time.Time
newExpire := expireAt.Add(1 * time.Hour).UnixNano() // Add some time
expire.Set(newExpire)
}
ttl.Extend(1 * time.Hour) // Add some time
})
})
```
Expand Down Expand Up @@ -304,6 +302,32 @@ players.Query(func(txn *column.Txn) error {
})
```

## Using Primary Keys

In certain cases it is useful to access a specific row by its primary key instead of an index which is generated internally by the collection. For such use-cases, the library provides `Key` column type that enables a seamless lookup by a user-defined _primary key_. In the example below we create a collection with a primary key `name` using `CreateColumn()` method with a `ForKey()` column type. Then, we use `InsertKey()` method to insert a value.

```go
players := column.NewCollection()
players.CreateColumn("name", column.ForKey()) // Create a "name" as a primary-key
players.CreateColumn("class", column.ForString()) // .. and some other columns

// Insert a player with "merlin" as its primary key
players.InsertKey("merlin", func(r column.Row) error {
r.SetString("class", "mage")
return nil
})
```

Similarly, you can use primary key to query that data directly, without knowing the exact offset. Do note that using primary keys will have an overhead, as it requires an additional step of looking up the offset using a hash table managed internally.

```go
// Query merlin's class
players.QueryKey("merlin", func(r column.Row) error {
class, _ := r.String("class")
return nil
})
```

## Streaming Changes

This library also supports streaming out all transaction commits consistently, as they happen. This allows you to implement your own change data capture (CDC) listeners, stream data into kafka or into a remote database for durability. In order to enable it, you can simply provide an implementation of a `commit.Logger` interface during the creation of the collection.
Expand Down
76 changes: 37 additions & 39 deletions collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,20 @@ func NewCollection(opts ...Options) *Collection {
func (c *Collection) next() uint32 {
c.lock.Lock()
idx := c.findFreeIndex(atomic.AddUint64(&c.count, 1))

c.fill.Set(idx)
c.lock.Unlock()
return idx
}

// free marks the index as free, atomically.
func (c *Collection) free(idx uint32) {
c.lock.Lock()
c.fill.Remove(idx)
atomic.StoreUint64(&c.count, uint64(c.fill.Count()))
c.lock.Unlock()
return
}

// findFreeIndex finds a free index for insertion
func (c *Collection) findFreeIndex(count uint64) uint32 {
fillSize := len(c.fill)
Expand Down Expand Up @@ -147,16 +155,6 @@ func (c *Collection) Insert(fn func(Row) error) (index uint32, err error) {
return
}

// InsertWithTTL executes a mutable cursor transactionally at a new offset and sets the expiration time
// based on the specified time-to-live and returns the allocated index.
func (c *Collection) InsertWithTTL(ttl time.Duration, fn func(Row) error) (index uint32, err error) {
err = c.Query(func(txn *Txn) (innerErr error) {
index, innerErr = txn.InsertWithTTL(ttl, fn)
return
})
return
}

// DeleteAt attempts to delete an item at the specified index for this collection. If the item
// exists, it marks at as deleted and returns true, otherwise it returns false.
func (c *Collection) DeleteAt(idx uint32) (deleted bool) {
Expand Down Expand Up @@ -284,14 +282,6 @@ func (c *Collection) QueryAt(idx uint32, fn func(Row) error) error {
})
}

// QueryAt jumps at a particular key in the collection, sets the cursor to the
// provided position and executes given callback fn.
func (c *Collection) QueryKey(key string, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.QueryKey(key, fn)
})
}

// Query creates a transaction which allows for filtering and iteration over the
// columns in this collection. It also allows for individual rows to be modified or
// deleted during iteration (range), but the actual operations will be queued and
Expand Down Expand Up @@ -319,26 +309,34 @@ func (c *Collection) Close() error {
return nil
}

// vacuum cleans up the expired objects on a specified interval.
func (c *Collection) vacuum(ctx context.Context, interval time.Duration) {
ticker := time.NewTicker(interval)
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
now := time.Now().UnixNano()
c.Query(func(txn *Txn) error {
expire := txn.Int64(expireColumn)
return txn.With(expireColumn).Range(func(idx uint32) {
if expirateAt, ok := expire.Get(); ok && expirateAt != 0 && now >= expirateAt {
txn.DeleteAt(idx)
}
})
})
}
}
// --------------------------- Primary Key ----------------------------

// InsertKey inserts a row given its corresponding primary key.
func (c *Collection) InsertKey(key string, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.InsertKey(key, fn)
})
}

// UpsertKey inserts or updates a row given its corresponding primary key.
func (c *Collection) UpsertKey(key string, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.UpsertKey(key, fn)
})
}

// QueryKey queries/updates a row given its corresponding primary key.
func (c *Collection) QueryKey(key string, fn func(Row) error) error {
return c.Query(func(txn *Txn) error {
return txn.QueryKey(key, fn)
})
}

// DeleteKey deletes a row for a given primary key.
func (c *Collection) DeleteKey(key string) error {
return c.Query(func(txn *Txn) error {
return txn.DeleteKey(key)
})
}

// --------------------------- column registry ---------------------------
Expand Down
64 changes: 53 additions & 11 deletions collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,11 @@ func TestExpire(t *testing.T) {
// Insert an object
col.InsertObjectWithTTL(obj, time.Microsecond)
col.Query(func(txn *Txn) error {
expire := txn.Int64(expireColumn)
ttl := txn.TTL()
return txn.Range(func(idx uint32) {
value, _ := expire.Get()
expireAt := time.Unix(0, value)
expire.Set(expireAt.Add(1 * time.Microsecond).UnixNano())
remaining, ok := ttl.TTL()
assert.True(t, ok)
assert.NotZero(t, remaining)
})
})
assert.Equal(t, 1, col.Count())
Expand All @@ -355,6 +355,44 @@ func TestExpire(t *testing.T) {
assert.Equal(t, 0, col.Count())
}

func TestExpireExtend(t *testing.T) {
col := loadPlayers(500)
assert.NoError(t, col.Query(func(txn *Txn) error {
ttl := txn.TTL()
return txn.Range(func(idx uint32) {

// When loaded, we should n ot have any expiration set
_, hasExpiration := ttl.ExpiresAt()
assert.False(t, hasExpiration)
_, hasRemaining := ttl.TTL()
assert.False(t, hasRemaining)

// Extend by 2 hours
ttl.Set(time.Hour)
ttl.Extend(time.Hour)
})
}))

// Now we should have expiration time set
assert.NoError(t, col.Query(func(txn *Txn) error {
ttl := txn.TTL()
return txn.Range(func(idx uint32) {
_, hasExpiration := ttl.ExpiresAt()
assert.True(t, hasExpiration)
_, hasRemaining := ttl.TTL()
assert.True(t, hasRemaining)
})
}))

// Reset back to zero
assert.NoError(t, col.Query(func(txn *Txn) error {
ttl := txn.TTL()
return txn.Range(func(idx uint32) {
ttl.Set(0) // Reset to zero
})
}))
}

func TestCreateIndex(t *testing.T) {
row := Object{
"age": 35,
Expand Down Expand Up @@ -530,16 +568,20 @@ func TestInsertWithTTL(t *testing.T) {
c := NewCollection()
c.CreateColumn("name", ForString())

idx, err := c.InsertWithTTL(time.Hour, func(r Row) error {
r.SetString("name", "Roman")
idx, err := c.Insert(func(r Row) error {
if _, ok := r.TTL(); !ok {
r.SetTTL(time.Hour)
r.SetString("name", "Roman")
}
return nil
})

assert.Equal(t, uint32(0), idx)
assert.NoError(t, err)
assert.NoError(t, c.QueryAt(idx, func(r Row) error {
expire, ok := r.Int64(expireColumn)
ttl, ok := r.TTL()
assert.True(t, ok)
assert.NotZero(t, expire)
assert.NotZero(t, ttl)
return nil
}))
}
Expand Down Expand Up @@ -594,7 +636,7 @@ func TestReplica(t *testing.T) {
}
}()

source.Insert(func (r Row) error {
source.Insert(func(r Row) error {
r.SetAny("id", "bob")
r.SetInt("cnt", 2)
return nil
Expand All @@ -603,7 +645,7 @@ func TestReplica(t *testing.T) {
// give the replica stream a moment
time.Sleep(100 * time.Millisecond)

target.Query(func (txn *Txn) error {
target.Query(func(txn *Txn) error {
assert.Equal(t, 1, txn.Count())
return nil
})
Expand Down Expand Up @@ -637,7 +679,7 @@ func newEmpty(capacity int) *Collection {
})

// Load the items into the collection
out.CreateColumn("serial", ForKey())
out.CreateColumn("serial", ForString())
out.CreateColumn("name", ForEnum())
out.CreateColumn("active", ForBool())
out.CreateColumn("class", ForEnum())
Expand Down
Loading