Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WithAllocBufferColStrProvider string column allocator for batch insert performance boost #1181

Merged
merged 3 commits into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/column/column_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

45 changes: 45 additions & 0 deletions lib/column/column_gen_option.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Licensed to ClickHouse, Inc. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. ClickHouse, Inc. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package column

import "github.com/ClickHouse/ch-go/proto"

// ColStrProvider defines provider of proto.ColStr
type ColStrProvider func() proto.ColStr

// colStrProvider provide proto.ColStr for Column() when type is String
var colStrProvider ColStrProvider = defaultColStrProvider

// defaultColStrProvider defines sample provider for proto.ColStr
func defaultColStrProvider() proto.ColStr {
return proto.ColStr{}
}

// issue: https://github.com/ClickHouse/clickhouse-go/issues/1164
// WithAllocBufferColStrProvider allow pre alloc buffer cap for proto.ColStr
// It is more suitable for scenarios where a lot of data is written in batches
func WithAllocBufferColStrProvider(cap int) {
colStrProvider = func() proto.ColStr {
return proto.ColStr{Buf: make([]byte, 0, cap)}
}
}

// WithColStrProvider more flexible than WithAllocBufferColStrProvider, such as use sync.Pool
func WithColStrProvider(provider ColStrProvider) {
colStrProvider = provider
}
194 changes: 194 additions & 0 deletions tests/issues/1164_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package issues

import (
"context"
"fmt"
"github.com/ClickHouse/clickhouse-go/v2"
"github.com/ClickHouse/clickhouse-go/v2/lib/column"
clickhouse_tests "github.com/ClickHouse/clickhouse-go/v2/tests"
"github.com/stretchr/testify/require"
"testing"
)

func TestIssue1164(t *testing.T) {
var (
conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{
"max_execution_time": 60,
"allow_experimental_object_type": true,
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
)
ctx := context.Background()
require.NoError(t, err)
const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()"
err = conn.Exec(ctx, ddl)
require.NoError(t, err)
defer func() {
conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164")
}()

column.WithAllocBufferColStrProvider(4096)

batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164")
require.NoError(t, err)

for i := 0; i < 10000; i++ {
appendErr := batch.Append(fmt.Sprintf("some_text_%d", i))
require.NoError(t, appendErr)
}

err = batch.Send()
require.NoError(t, err)
}

func BenchmarkIssue1164(b *testing.B) {
// result:
//cpu: Intel(R) Xeon(R) CPU E5-26xx v4
//BenchmarkIssue1164
//BenchmarkIssue1164/default-10000
//BenchmarkIssue1164/default-10000-8 100 11533744 ns/op 1992731 B/op 40129 allocs/op
//BenchmarkIssue1164/preAlloc-10000
//BenchmarkIssue1164/preAlloc-10000-8 104 11136623 ns/op 1991154 B/op 40110 allocs/op
//BenchmarkIssue1164/default-50000
//BenchmarkIssue1164/default-50000-8 22 49932579 ns/op 11592053 B/op 200150 allocs/op
//BenchmarkIssue1164/preAlloc-50000
//BenchmarkIssue1164/preAlloc-50000-8 24 49687163 ns/op 11573934 B/op 200148 allocs/op
b.Run("default-10000", func(b *testing.B) {
var (
conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{
"max_execution_time": 60,
"allow_experimental_object_type": true,
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
)
ctx := context.Background()
require.NoError(b, err)
const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()"
err = conn.Exec(ctx, ddl)
require.NoError(b, err)
defer func() {
conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164")
}()

b.ReportAllocs()
for k := 0; k < b.N; k++ {
batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164")
require.NoError(b, err)

for i := 0; i < 10000; i++ {
appendErr := batch.Append(fmt.Sprintf("some_text_%d", i))
require.NoError(b, appendErr)
}

err = batch.Send()
require.NoError(b, err)
}

})
b.Run("preAlloc-10000", func(b *testing.B) {
var (
conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{
"max_execution_time": 60,
"allow_experimental_object_type": true,
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
)
ctx := context.Background()
require.NoError(b, err)
const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()"
err = conn.Exec(ctx, ddl)
require.NoError(b, err)
defer func() {
conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164")
}()

column.WithAllocBufferColStrProvider(4096)

b.ReportAllocs()
for k := 0; k < b.N; k++ {
batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164")
require.NoError(b, err)

for i := 0; i < 10000; i++ {
appendErr := batch.Append(fmt.Sprintf("some_text_%d", i))
require.NoError(b, appendErr)
}

err = batch.Send()
require.NoError(b, err)
}

})
b.Run("default-50000", func(b *testing.B) {
var (
conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{
"max_execution_time": 60,
"allow_experimental_object_type": true,
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
)
ctx := context.Background()
require.NoError(b, err)
const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()"
err = conn.Exec(ctx, ddl)
require.NoError(b, err)
defer func() {
conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164")
}()

b.ReportAllocs()
for k := 0; k < b.N; k++ {
batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164")
require.NoError(b, err)

for i := 0; i < 50000; i++ {
appendErr := batch.Append(fmt.Sprintf("some_text_%d", i))
require.NoError(b, appendErr)
}

err = batch.Send()
require.NoError(b, err)
}

})
b.Run("preAlloc-50000", func(b *testing.B) {
var (
conn, err = clickhouse_tests.GetConnection("issues", clickhouse.Settings{
"max_execution_time": 60,
"allow_experimental_object_type": true,
}, nil, &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
})
)
ctx := context.Background()
require.NoError(b, err)
const ddl = "CREATE TABLE test_1164 (Col1 String) Engine MergeTree() ORDER BY tuple()"
err = conn.Exec(ctx, ddl)
require.NoError(b, err)
defer func() {
conn.Exec(ctx, "DROP TABLE IF EXISTS test_1164")
}()

column.WithAllocBufferColStrProvider(4096)

b.ReportAllocs()
for k := 0; k < b.N; k++ {
batch, err := conn.PrepareBatch(ctx, "INSERT INTO test_1164")
require.NoError(b, err)

for i := 0; i < 50000; i++ {
appendErr := batch.Append(fmt.Sprintf("some_text_%d", i))
require.NoError(b, appendErr)
}

err = batch.Send()
require.NoError(b, err)
}

})

}
Loading