Skip to content

Commit

Permalink
Generalized merging function (#72)
Browse files Browse the repository at this point in the history
This PR generalizes `Merge()` function and removes now redundant `Add()`
functions for numbers. When `Merge()` function is called on numerical
columns without specifying a custom merge option on the column, the
behavior of `Add()` will be observed (i.e. increments by default).
  • Loading branch information
kelindar authored Oct 29, 2022
1 parent 48677f6 commit 7d654c0
Show file tree
Hide file tree
Showing 24 changed files with 660 additions and 578 deletions.
File renamed without changes.
52 changes: 40 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ This package contains a **high-performance, columnar, in-memory storage engine**
- Support for **concurrent updates** using sharded latches to keep things fast.
- Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
- Support for **expiration** of rows based on time-to-live or expiration column.
- Support for **atomic increment/decrement** of numerical values, transactionally.
- Support for **atomic merging** of any values, transactionally.
- Support for **primary keys** for use-cases where offset can't be used.
- Support for **change data stream** that streams all commits consistently.
- Support for **concurrent snapshotting** allowing to store the entire collection into a file.
Expand Down Expand Up @@ -84,7 +84,7 @@ for _, v := range loadFromJson("players.json") {
While the previous example demonstrated how to insert many objects, it was doing it one by one and is rather inefficient. This is due to the fact that each `InsertObject()` call directly on the collection initiates a separate transacion and there's a small performance cost associated with it. If you want to do a bulk insert and insert many values, faster, that can be done by calling `Insert()` on a transaction, as demonstrated in the example below. Note that the only difference is instantiating a transaction by calling the `Query()` method and calling the `txn.Insert()` method on the transaction instead the one on the collection.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
for _, v := range loadFromJson("players.json") {
txn.InsertObject(v)
}
Expand Down Expand Up @@ -129,7 +129,7 @@ First, let's try to merge two queries by applying a `Union()` operation with the

```go
// How many rogues and mages?
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
txn.With("rogue").Union("mage").Count()
return nil
})
Expand All @@ -139,7 +139,7 @@ Next, let's count everyone who isn't a rogue, for that we can use a `Without()`

```go
// How many rogues and mages?
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
txn.Without("rogue").Count()
return nil
})
Expand All @@ -149,7 +149,7 @@ Now, you can combine all of the methods and keep building more complex queries.

```go
// How many rogues that are over 30 years old?
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
txn.With("rogue").WithFloat("age", func(v float64) bool {
return v >= 30
}).Count()
Expand All @@ -168,7 +168,7 @@ In order to access the results of the iteration, prior to calling `Range()` meth
In the example below we select all of the rogues from our collection and print out their name by using the `Range()` method and accessing the "name" column using a column reader which is created by calling `txn.String("name")` method.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
names := txn.String("name") // Create a column reader

return txn.With("rogue").Range(func(i uint32) {
Expand All @@ -181,7 +181,7 @@ players.Query(func(txn *Txn) error {
Similarly, if you need to access more columns, you can simply create the appropriate column reader(s) and use them as shown in the example before.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
names := txn.String("name")
ages := txn.Int64("age")

Expand All @@ -198,7 +198,7 @@ players.Query(func(txn *Txn) error {
Taking the `Sum()` of a (numeric) column reader will take into account a transaction's current filtering index.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
totalAge := txn.With("rouge").Int64("age").Sum()
totalRouges := int64(txn.Count())

Expand All @@ -221,7 +221,7 @@ In order to update certain items in the collection, you can simply call `Range()
In the example below we're selecting all of the rogues and updating both their balance and age to certain values. The transaction returns `nil`, hence it will be automatically committed when `Query()` method returns.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
balance := txn.Float64("balance")
age := txn.Int64("age")

Expand All @@ -232,18 +232,46 @@ players.Query(func(txn *Txn) error {
})
```

In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Add()` operation. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by _500_ atomically.
In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Merge()` operation. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by _500_ atomically.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
balance := txn.Float64("balance")

return txn.With("rogue").Range(func(i uint32) {
balance.Add(500.0) // Increment the "balance" by 500
balance.Merge(500.0) // Increment the "balance" by 500
})
})
```

While atomic increment/decrement for numerical values is relatively straightforward, this `Merge()` operation can be specified using `WithMerge()` option and also used for other data types, such as strings. In the example below we are creating a merge function that concatenates two strings together and when `MergeString()` is called, the new string gets appended automatically.

```go
// A merging function that simply concatenates 2 strings together
concat := func(value, delta string) string {
if len(value) > 0 {
value += ", "
}
return value + delta
}

// Create a column with a specified merge function
db := column.NewCollection()
db.CreateColumn("alphabet", column.ForString(column.WithMerge(concat)))

// Insert letter "A"
db.Insert(func(r column.Row) error {
r.SetString("alphabet", "A") // now contains "A"
return nil
})

// Insert letter "B"
db.QueryAt(0, func(r column.Row) error {
r.MergeString("alphabet", "B") // now contains "A, B"
return nil
})
```

## Expiring Values

Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `Insert...()` method on the collection that allows to insert an object with a time-to-live duration defined.
Expand Down
1 change: 1 addition & 0 deletions codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func main() {
}

dst, err := os.OpenFile("column_numbers.go", os.O_RDWR|os.O_CREATE, os.ModePerm)
defer dst.Close()
if err != nil {
panic(err)
}
Expand Down
20 changes: 9 additions & 11 deletions codegen/numbers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@ import (
// --------------------------- {{.Name}} ----------------------------

// make{{.Name}}s creates a new vector for {{.Type}}s
func make{{.Name}}s() Column {
func make{{.Name}}s(opts ...func(*option[{{.Type}}])) Column {
return makeNumeric(
func(buffer *commit.Buffer, idx uint32, value {{.Type}}) {
buffer.Put{{.Name}}(idx, value)
},
func(r *commit.Reader, fill bitmap.Bitmap, data []{{.Type}}) {
func(buffer *commit.Buffer, idx uint32, value {{.Type}}) { buffer.Put{{.Name}}(commit.Put, idx, value) },
func(r *commit.Reader, fill bitmap.Bitmap, data []{{.Type}}, opts option[{{.Type}}]) {
for r.Next() {
offset := r.IndexAtChunk()
switch r.Type {
case commit.Put:
fill[offset>>6] |= 1 << (offset & 0x3f)
data[offset] = r.{{.Name}}()
case commit.Add:
case commit.Merge:
fill[offset>>6] |= 1 << (offset & 0x3f)
data[offset] = r.AddTo{{.Name}}(data[offset])
data[offset] = r.Swap{{.Name}}(opts.Merge(data[offset], r.{{.Name}}()))
case commit.Delete:
fill.Remove(offset)
}
Expand All @@ -43,12 +41,12 @@ type {{.Type}}Writer struct {

// Set sets the value at the current transaction cursor
func (s {{.Type}}Writer) Set(value {{.Type}}) {
s.writer.Put{{.Name}}(s.txn.cursor, value)
s.writer.Put{{.Name}}(commit.Put, s.txn.cursor, value)
}

// Add atomically adds a delta to the value at the current transaction cursor
func (s {{.Type}}Writer) Add(delta {{.Type}}) {
s.writer.Add{{.Name}}(s.txn.cursor, delta)
// Merge atomically merges a delta to the value at the current transaction cursor
func (s {{.Type}}Writer) Merge(delta {{.Type}}) {
s.writer.Put{{.Name}}(commit.Merge, s.txn.cursor, delta)
}

// {{.Name}} returns a read-write accessor for {{.Type}} column
Expand Down
2 changes: 1 addition & 1 deletion collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (c *columns) Load(columnName string) (*column, bool) {
return nil, false
}

// LoadWithIndex loads a column by its name along with their computed indices.
// LoadWithIndex loads a column by its name along with the triggers.
func (c *columns) LoadWithIndex(columnName string) ([]*column, bool) {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
Expand Down
28 changes: 28 additions & 0 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"
"github.com/kelindar/simd"
)

// columnType represents a type of a column.
Expand Down Expand Up @@ -116,6 +117,33 @@ func ForKind(kind reflect.Kind) (Column, error) {
}
}

// --------------------------- Generic Options ----------------------------

type optionType interface {
simd.Number | ~string
}

// optInt represents options for variouos columns.
type option[T optionType] struct {
Merge func(value, delta T) T
}

// configure applies options
func configure[T optionType](opts []func(*option[T]), dst option[T]) option[T] {
for _, fn := range opts {
fn(&dst)
}
return dst
}

// WithMerge sets an optional merge function that allows you to merge a delta value to
// an existing value, atomically. The operation is performed transactionally.
func WithMerge[T optionType](fn func(value, delta T) T) func(*option[T]) {
return func(v *option[T]) {
v.Merge = fn
}
}

// --------------------------- Column ----------------------------

// column represents a column wrapper that synchronizes operations
Expand Down
2 changes: 1 addition & 1 deletion column_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s ttlWriter) Set(ttl time.Duration) {

// Extend extends time-to-live of the row current transaction cursor by a specified amount
func (s ttlWriter) Extend(delta time.Duration) {
s.rw.Add(int64(delta.Nanoseconds()))
s.rw.Merge(int64(delta.Nanoseconds()))
}

// readTTL converts expiration to a TTL
Expand Down
2 changes: 1 addition & 1 deletion column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *columnIndex) Apply(chunk commit.Chunk, r *commit.Reader) {
// on the actual column.
for r.Next() {
switch r.Type {
case commit.Put, commit.Add:
case commit.Put, commit.Merge:
if c.rule(r) {
c.fill.Set(uint32(r.Offset))
} else {
Expand Down
Loading

0 comments on commit 7d654c0

Please sign in to comment.