Skip to content

Commit

Permalink
Iterable ordered map alternative with improved performance (#1152)
Browse files Browse the repository at this point in the history
* Improve ordered map performance (#1151)

* add MapIter benchmark

* use MapIter

* rename Range to Iter

* fix iter map test

* Update interface name

Co-authored-by: Kuba Kaflik <jakub@kaflik.pl>

* Update interface name

Co-authored-by: Kuba Kaflik <jakub@kaflik.pl>

* Update interface name

* add IterableOrderedMapInsertRead example

---------

Co-authored-by: Kuba Kaflik <jakub@kaflik.pl>
  • Loading branch information
hanjm and jkaflik authored Dec 13, 2023
1 parent a6e001e commit 9d8573d
Show file tree
Hide file tree
Showing 5 changed files with 285 additions and 7 deletions.
19 changes: 18 additions & 1 deletion bind.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,12 @@ package clickhouse
import (
std_driver "database/sql/driver"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"reflect"
"regexp"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/column"
"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
)

Expand Down Expand Up @@ -320,6 +320,23 @@ func format(tz *time.Location, scale TimeUnit, v any) (string, error) {
values = append(values, fmt.Sprintf("%s, %s", name, val))
}

return "map(" + strings.Join(values, ", ") + ")", nil
case column.IterableOrderedMap:
values := make([]string, 0)
iter := v.Iterator()
for iter.Next() {
key, value := iter.Key(), iter.Value()
name, err := format(tz, scale, key)
if err != nil {
return "", err
}
val, err := format(tz, scale, value)
if err != nil {
return "", err
}
values = append(values, fmt.Sprintf("%s, %s", name, val))
}

return "map(" + strings.Join(values, ", ") + ")", nil
}
switch v := reflect.ValueOf(v); v.Kind() {
Expand Down
9 changes: 7 additions & 2 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,14 @@ package clickhouse_api
import (
"context"
"fmt"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
"math/rand"
"os"
"strconv"
"testing"
"time"

clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -155,6 +156,10 @@ func TestMapInsertRead(t *testing.T) {
require.NoError(t, MapInsertRead())
}

func TestIterableOrderedMapInsertRead(t *testing.T) {
require.NoError(t, IterableOrderedMapInsertRead())
}

func TestMultiHostConnect(t *testing.T) {
require.NoError(t, MultiHostVersion())
require.NoError(t, MultiHostRoundRobinVersion())
Expand Down
96 changes: 96 additions & 0 deletions examples/clickhouse_api/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"context"
"fmt"
"strconv"

"github.com/ClickHouse/clickhouse-go/v2/lib/column"
)

func MapInsertRead() error {
Expand Down Expand Up @@ -80,3 +82,97 @@ func MapInsertRead() error {
rows.Close()
return rows.Err()
}

func IterableOrderedMapInsertRead() error {
conn, err := GetNativeConnection(nil, nil, nil)
if err != nil {
return err
}
ctx := context.Background()
defer func() {
conn.Exec(ctx, "DROP TABLE example")
}()
conn.Exec(context.Background(), "DROP TABLE IF EXISTS example")
err = conn.Exec(ctx, `
CREATE TABLE example (
Col1 Map(String, String)
) Engine Memory
`)
if err != nil {
return err
}

batch, err := conn.PrepareBatch(ctx, "INSERT INTO example")
if err != nil {
return err
}
var i int64
for i = 0; i < 10; i++ {
om := NewOrderedMap()
kv1 := strconv.Itoa(int(i))
kv2 := strconv.Itoa(int(i + 1))
om.Put(kv1, kv1)
om.Put(kv2, kv2)
err := batch.Append(om)
if err != nil {
return err
}
}
if err := batch.Send(); err != nil {
return err
}
rows, err := conn.Query(ctx, "SELECT * FROM example")
if err != nil {
return err
}
for rows.Next() {
var col1 OrderedMap
if err := rows.Scan(&col1); err != nil {
return err
}
fmt.Printf("row: col1=%v\n", col1)
}
rows.Close()
return rows.Err()
}

// OrderedMap is a simple (non thread safe) ordered map
type OrderedMap struct {
Keys []any
Values []any
}

func NewOrderedMap() column.IterableOrderedMap {
return &OrderedMap{}
}

func (om *OrderedMap) Put(key any, value any) {
om.Keys = append(om.Keys, key)
om.Values = append(om.Values, value)
}

func (om *OrderedMap) Iterator() column.MapIterator {
return NewOrderedMapIterator(om)
}

type OrderedMapIter struct {
om *OrderedMap
iterIndex int
}

func NewOrderedMapIterator(om *OrderedMap) column.MapIterator {
return &OrderedMapIter{om: om, iterIndex: -1}
}

func (i *OrderedMapIter) Next() bool {
i.iterIndex++
return i.iterIndex < len(i.om.Keys)
}

func (i *OrderedMapIter) Key() any {
return i.om.Keys[i.iterIndex]
}

func (i *OrderedMapIter) Value() any {
return i.om.Values[i.iterIndex]
}
42 changes: 41 additions & 1 deletion lib/column/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@ package column
import (
"database/sql/driver"
"fmt"
"github.com/ClickHouse/ch-go/proto"
"reflect"
"strings"
"time"

"github.com/ClickHouse/ch-go/proto"
)

// https://github.com/ClickHouse/ClickHouse/blob/master/src/Columns/ColumnMap.cpp
Expand All @@ -42,6 +43,17 @@ type OrderedMap interface {
Keys() <-chan any
}

type MapIterator interface {
Next() bool
Key() any
Value() any
}

type IterableOrderedMap interface {
Put(key any, value any)
Iterator() MapIterator
}

func (col *Map) Reset() {
col.keys.Reset()
col.values.Reset()
Expand Down Expand Up @@ -94,6 +106,13 @@ func (col *Map) ScanRow(dest any, i int) error {
value.Set(col.row(i))
return nil
}
if om, ok := dest.(IterableOrderedMap); ok {
keys, values := col.orderedRow(i)
for i := range keys {
om.Put(keys[i], values[i])
}
return nil
}
if om, ok := dest.(OrderedMap); ok {
keys, values := col.orderedRow(i)
for i := range keys {
Expand Down Expand Up @@ -163,6 +182,27 @@ func (col *Map) AppendRow(v any) error {
return nil
}

if orderedMap, ok := v.(IterableOrderedMap); ok {
var size int64
iter := orderedMap.Iterator()
for iter.Next() {
key, value := iter.Key(), iter.Value()
size++
if err := col.keys.AppendRow(key); err != nil {
return err
}
if err := col.values.AppendRow(value); err != nil {
return err
}
}
var prev int64
if n := col.offsets.Rows(); n != 0 {
prev = col.offsets.col.Row(n - 1)
}
col.offsets.col.Append(prev + size)
return nil
}

if orderedMap, ok := v.(OrderedMap); ok {
var size int64
for key := range orderedMap.Keys() {
Expand Down
126 changes: 123 additions & 3 deletions tests/map_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,11 @@ import (
"context"
"database/sql/driver"
"fmt"
"github.com/stretchr/testify/require"
"reflect"
"testing"

"github.com/stretchr/testify/require"

"github.com/ClickHouse/clickhouse-go/v2"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -215,8 +217,9 @@ func TestMapFlush(t *testing.T) {

// a simple (non thread safe) ordered map
type OrderedMap struct {
keys []any
values map[any]any
keys []any
values map[any]any
valuesIter []any
}

func NewOrderedMap() *OrderedMap {
Expand All @@ -240,6 +243,7 @@ func (om *OrderedMap) Put(key any, value any) {
}
om.keys = append(om.keys, key)
om.values[key] = value
om.valuesIter = append(om.valuesIter, value)
}

func (om *OrderedMap) Keys() <-chan any {
Expand Down Expand Up @@ -354,3 +358,119 @@ func TestMapValuer(t *testing.T) {
}
require.Equal(t, 1000, i)
}

func (om *OrderedMap) KeysUseChanNoGo() <-chan any {
ch := make(chan any, len(om.keys))
for _, key := range om.keys {
ch <- key
}
close(ch)
return ch
}

func (om *OrderedMap) KeysUseSlice() []any {
return om.keys
}

func (om *OrderedMap) Iter() MapIter {
return &mapIter{om: om, iterIndex: -1}
}

type MapIter interface {
Next() bool
Key() any
Value() any
}

type mapIter struct {
om *OrderedMap
iterIndex int
}

func (i *mapIter) Next() bool {
i.iterIndex++
return i.iterIndex < len(i.om.keys)
}

func (i *mapIter) Key() any {
return i.om.keys[i.iterIndex]
}

func (i *mapIter) Value() any {
return i.om.valuesIter[i.iterIndex]
}

func BenchmarkOrderedMapUseChanGo(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for key := range m.Keys() {
_, _ = m.Get(key)
}
}
}

func BenchmarkOrderedMapKeysUseChanNoGo(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for key := range m.KeysUseChanNoGo() {
_, _ = m.Get(key)
}
}
}

func BenchmarkOrderedMapKeysUseSlice(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
for key := range m.KeysUseSlice() {
_, _ = m.Get(key)
}
}
}

func BenchmarkOrderedMapKeysUseIter(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
iter := m.Iter()
for iter.Next() {
_ = iter.Key()
_ = iter.Value()
}
}
}

func BenchmarkOrderedMapReflectMapIter(b *testing.B) {
m := NewOrderedMap()
for i := 0; i < 10; i++ {
m.Put(i, i)
}
b.ResetTimer()
b.ReportAllocs()
for i := 0; i < b.N; i++ {
value := reflect.Indirect(reflect.ValueOf(m.values))
iter := value.MapRange()
for iter.Next() {
_ = iter.Key().Interface()
_ = iter.Value().Interface()
}
}
}

0 comments on commit 9d8573d

Please sign in to comment.