Skip to content

Generalized merging function #72

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
52 changes: 40 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ This package contains a **high-performance, columnar, in-memory storage engine**
- Support for **concurrent updates** using sharded latches to keep things fast.
- Support for **transaction isolation**, allowing you to create transactions and commit/rollback.
- Support for **expiration** of rows based on time-to-live or expiration column.
- Support for **atomic increment/decrement** of numerical values, transactionally.
- Support for **atomic merging** of any values, transactionally.
- Support for **primary keys** for use-cases where offset can't be used.
- Support for **change data stream** that streams all commits consistently.
- Support for **concurrent snapshotting** allowing to store the entire collection into a file.
Expand Down Expand Up @@ -84,7 +84,7 @@ for _, v := range loadFromJson("players.json") {
While the previous example demonstrated how to insert many objects, it was doing it one by one and is rather inefficient. This is due to the fact that each `InsertObject()` call directly on the collection initiates a separate transacion and there's a small performance cost associated with it. If you want to do a bulk insert and insert many values, faster, that can be done by calling `Insert()` on a transaction, as demonstrated in the example below. Note that the only difference is instantiating a transaction by calling the `Query()` method and calling the `txn.Insert()` method on the transaction instead the one on the collection.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
for _, v := range loadFromJson("players.json") {
txn.InsertObject(v)
}
Expand Down Expand Up @@ -129,7 +129,7 @@ First, let's try to merge two queries by applying a `Union()` operation with the

```go
// How many rogues and mages?
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
txn.With("rogue").Union("mage").Count()
return nil
})
Expand All @@ -139,7 +139,7 @@ Next, let's count everyone who isn't a rogue, for that we can use a `Without()`

```go
// How many rogues and mages?
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
txn.Without("rogue").Count()
return nil
})
Expand All @@ -149,7 +149,7 @@ Now, you can combine all of the methods and keep building more complex queries.

```go
// How many rogues that are over 30 years old?
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
txn.With("rogue").WithFloat("age", func(v float64) bool {
return v >= 30
}).Count()
Expand All @@ -168,7 +168,7 @@ In order to access the results of the iteration, prior to calling `Range()` meth
In the example below we select all of the rogues from our collection and print out their name by using the `Range()` method and accessing the "name" column using a column reader which is created by calling `txn.String("name")` method.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
names := txn.String("name") // Create a column reader

return txn.With("rogue").Range(func(i uint32) {
Expand All @@ -181,7 +181,7 @@ players.Query(func(txn *Txn) error {
Similarly, if you need to access more columns, you can simply create the appropriate column reader(s) and use them as shown in the example before.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
names := txn.String("name")
ages := txn.Int64("age")

Expand All @@ -198,7 +198,7 @@ players.Query(func(txn *Txn) error {
Taking the `Sum()` of a (numeric) column reader will take into account a transaction's current filtering index.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
totalAge := txn.With("rouge").Int64("age").Sum()
totalRouges := int64(txn.Count())

Expand All @@ -221,7 +221,7 @@ In order to update certain items in the collection, you can simply call `Range()
In the example below we're selecting all of the rogues and updating both their balance and age to certain values. The transaction returns `nil`, hence it will be automatically committed when `Query()` method returns.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
balance := txn.Float64("balance")
age := txn.Int64("age")

Expand All @@ -232,18 +232,46 @@ players.Query(func(txn *Txn) error {
})
```

In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Add()` operation. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by _500_ atomically.
In certain cases, you might want to atomically increment or decrement numerical values. In order to accomplish this you can use the provided `Merge()` operation. Note that the indexes will also be updated accordingly and the predicates re-evaluated with the most up-to-date values. In the below example we're incrementing the balance of all our rogues by _500_ atomically.

```go
players.Query(func(txn *Txn) error {
players.Query(func(txn *column.Txn) error {
balance := txn.Float64("balance")

return txn.With("rogue").Range(func(i uint32) {
balance.Add(500.0) // Increment the "balance" by 500
balance.Merge(500.0) // Increment the "balance" by 500
})
})
```

While atomic increment/decrement for numerical values is relatively straightforward, this `Merge()` operation can be specified using `WithMerge()` option and also used for other data types, such as strings. In the example below we are creating a merge function that concatenates two strings together and when `MergeString()` is called, the new string gets appended automatically.

```go
// A merging function that simply concatenates 2 strings together
concat := func(value, delta string) string {
if len(value) > 0 {
value += ", "
}
return value + delta
}

// Create a column with a specified merge function
db := column.NewCollection()
db.CreateColumn("alphabet", column.ForString(column.WithMerge(concat)))

// Insert letter "A"
db.Insert(func(r column.Row) error {
r.SetString("alphabet", "A") // now contains "A"
return nil
})

// Insert letter "B"
db.QueryAt(0, func(r column.Row) error {
r.MergeString("alphabet", "B") // now contains "A, B"
return nil
})
```

## Expiring Values

Sometimes, it is useful to automatically delete certain rows when you do not need them anymore. In order to do this, the library automatically adds an `expire` column to each new collection and starts a cleanup goroutine aynchronously that runs periodically and cleans up the expired objects. In order to set this, you can simply use `Insert...()` method on the collection that allows to insert an object with a time-to-live duration defined.
Expand Down
1 change: 1 addition & 0 deletions codegen/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func main() {
}

dst, err := os.OpenFile("column_numbers.go", os.O_RDWR|os.O_CREATE, os.ModePerm)
defer dst.Close()
if err != nil {
panic(err)
}
Expand Down
20 changes: 9 additions & 11 deletions codegen/numbers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,19 @@ import (
// --------------------------- {{.Name}} ----------------------------

// make{{.Name}}s creates a new vector for {{.Type}}s
func make{{.Name}}s() Column {
func make{{.Name}}s(opts ...func(*option[{{.Type}}])) Column {
return makeNumeric(
func(buffer *commit.Buffer, idx uint32, value {{.Type}}) {
buffer.Put{{.Name}}(idx, value)
},
func(r *commit.Reader, fill bitmap.Bitmap, data []{{.Type}}) {
func(buffer *commit.Buffer, idx uint32, value {{.Type}}) { buffer.Put{{.Name}}(commit.Put, idx, value) },
func(r *commit.Reader, fill bitmap.Bitmap, data []{{.Type}}, opts option[{{.Type}}]) {
for r.Next() {
offset := r.IndexAtChunk()
switch r.Type {
case commit.Put:
fill[offset>>6] |= 1 << (offset & 0x3f)
data[offset] = r.{{.Name}}()
case commit.Add:
case commit.Merge:
fill[offset>>6] |= 1 << (offset & 0x3f)
data[offset] = r.AddTo{{.Name}}(data[offset])
data[offset] = r.Swap{{.Name}}(opts.Merge(data[offset], r.{{.Name}}()))
case commit.Delete:
fill.Remove(offset)
}
Expand All @@ -43,12 +41,12 @@ type {{.Type}}Writer struct {

// Set sets the value at the current transaction cursor
func (s {{.Type}}Writer) Set(value {{.Type}}) {
s.writer.Put{{.Name}}(s.txn.cursor, value)
s.writer.Put{{.Name}}(commit.Put, s.txn.cursor, value)
}

// Add atomically adds a delta to the value at the current transaction cursor
func (s {{.Type}}Writer) Add(delta {{.Type}}) {
s.writer.Add{{.Name}}(s.txn.cursor, delta)
// Merge atomically merges a delta to the value at the current transaction cursor
func (s {{.Type}}Writer) Merge(delta {{.Type}}) {
s.writer.Put{{.Name}}(commit.Merge, s.txn.cursor, delta)
}

// {{.Name}} returns a read-write accessor for {{.Type}} column
Expand Down
2 changes: 1 addition & 1 deletion collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ func (c *columns) Load(columnName string) (*column, bool) {
return nil, false
}

// LoadWithIndex loads a column by its name along with their computed indices.
// LoadWithIndex loads a column by its name along with the triggers.
func (c *columns) LoadWithIndex(columnName string) ([]*column, bool) {
cols := c.cols.Load().([]columnEntry)
for _, v := range cols {
Expand Down
28 changes: 28 additions & 0 deletions column.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/kelindar/bitmap"
"github.com/kelindar/column/commit"
"github.com/kelindar/simd"
)

// columnType represents a type of a column.
Expand Down Expand Up @@ -116,6 +117,33 @@ func ForKind(kind reflect.Kind) (Column, error) {
}
}

// --------------------------- Generic Options ----------------------------

type optionType interface {
simd.Number | ~string
}

// optInt represents options for variouos columns.
type option[T optionType] struct {
Merge func(value, delta T) T
}

// configure applies options
func configure[T optionType](opts []func(*option[T]), dst option[T]) option[T] {
for _, fn := range opts {
fn(&dst)
}
return dst
}

// WithMerge sets an optional merge function that allows you to merge a delta value to
// an existing value, atomically. The operation is performed transactionally.
func WithMerge[T optionType](fn func(value, delta T) T) func(*option[T]) {
return func(v *option[T]) {
v.Merge = fn
}
}

// --------------------------- Column ----------------------------

// column represents a column wrapper that synchronizes operations
Expand Down
2 changes: 1 addition & 1 deletion column_expire.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (s ttlWriter) Set(ttl time.Duration) {

// Extend extends time-to-live of the row current transaction cursor by a specified amount
func (s ttlWriter) Extend(delta time.Duration) {
s.rw.Add(int64(delta.Nanoseconds()))
s.rw.Merge(int64(delta.Nanoseconds()))
}

// readTTL converts expiration to a TTL
Expand Down
2 changes: 1 addition & 1 deletion column_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func (c *columnIndex) Apply(chunk commit.Chunk, r *commit.Reader) {
// on the actual column.
for r.Next() {
switch r.Type {
case commit.Put, commit.Add:
case commit.Put, commit.Merge:
if c.rule(r) {
c.fill.Set(uint32(r.Offset))
} else {
Expand Down
Loading