Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Native Clickhouse driver #3

Merged
merged 9 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/semantic.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,7 @@ on:

jobs:
semantic:
with:
CHECK_PR_TITLE_OR_ONE_COMMIT: true
uses: influxdata/validate-semantic-github-messages/.github/workflows/semantic.yml@main

13 changes: 7 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/gofrs/uuid v3.3.0+incompatible
github.com/golang/geo v0.0.0-20190916061304-5b978397cfec
github.com/google/flatbuffers v22.9.30-0.20221019131441-5792623df42e+incompatible
github.com/google/go-cmp v0.5.7
github.com/google/go-cmp v0.5.8
github.com/influxdata/gosnowflake v1.6.9
github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839
Expand All @@ -51,7 +51,7 @@ require (
github.com/uber/jaeger-client-go v2.28.0+incompatible
github.com/uber/jaeger-lib v2.4.1+incompatible // indirect
github.com/vertica/vertica-sql-go v1.1.1
go.uber.org/zap v1.16.0
go.uber.org/zap v1.22.0
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756
golang.org/x/net v0.7.0
golang.org/x/tools v0.1.12
Expand Down Expand Up @@ -88,6 +88,7 @@ require (
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.9.0 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.19.0 // indirect
github.com/aws/smithy-go v1.9.0 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deepmap/oapi-codegen v1.6.0 // indirect
github.com/dimchansky/utfbom v1.1.0 // indirect
Expand All @@ -102,21 +103,21 @@ require (
github.com/inconshreveable/mousetrap v1.0.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/klauspost/compress v1.14.2 // indirect
github.com/klauspost/compress v1.15.9 // indirect
github.com/mattn/go-colorable v0.1.9 // indirect
github.com/mattn/go-ieproxy v0.0.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
github.com/miekg/dns v1.1.25 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/pierrec/lz4/v4 v4.1.12 // indirect
github.com/pierrec/lz4/v4 v4.1.15 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/sirupsen/logrus v1.8.1 // indirect
github.com/spf13/pflag v1.0.3 // indirect
github.com/uber-go/tally v3.3.15+incompatible // indirect
go.opencensus.io v0.23.0 // indirect
go.uber.org/atomic v1.7.0 // indirect
go.uber.org/multierr v1.6.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4 // indirect
Expand Down
26 changes: 16 additions & 10 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+Z
github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM=
Expand Down Expand Up @@ -144,6 +143,8 @@ github.com/aws/aws-sdk-go-v2/service/sts v1.10.0 h1:1jh8J+JjYRp+QWKOsaZt7rGUgoyr
github.com/aws/aws-sdk-go-v2/service/sts v1.10.0/go.mod h1:jLKCFqS+1T4i7HDqCP9GM4Uk75YW1cS0o82LdxpMyOE=
github.com/aws/smithy-go v1.9.0 h1:c7FUdEqrQA1/UVKKCNDFQPNKGp4FQg3YW4Ck5SLTG58=
github.com/aws/smithy-go v1.9.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/immutable v0.3.0 h1:TVRhuZx2wG9SZ0LRdqlbs9S5BZ6Y24hJEHTCgWHZEIw=
github.com/benbjohnson/immutable v0.3.0/go.mod h1:uc6OHo6PN2++n98KHLxW8ef4W42ylHiQSENghE1ezxI=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
Expand Down Expand Up @@ -328,8 +329,8 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8 h1:e6P7q2lk1O+qJJb4BtCQXlK8vWEO8V1ZeuEdJNOqZyg=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
Expand Down Expand Up @@ -435,8 +436,8 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o
github.com/klauspost/asmfmt v1.3.1/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE=
github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg=
github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.14.2 h1:S0OHlFk/Gbon/yauFJ4FfJJF5V0fc5HbBTJazi28pRw=
github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk=
github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY=
github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc=
Expand Down Expand Up @@ -537,8 +538,9 @@ github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.8/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.11/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.12 h1:44l88ehTZAUGW4VlO1QC4zkilL99M6Y9MXNwEs0uzP8=
github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0=
github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down Expand Up @@ -662,21 +664,25 @@ go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.5.1/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw=
go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ=
go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0=
go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI=
go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0=
go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.4.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4=
go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU=
go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4=
go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU=
go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8=
go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA=
go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.14.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.16.0 h1:uFRZXykJGK9lLY4HtgSw44DnIcAM+kRBP7x5m+NpAOM=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.22.0 h1:Zcye5DUgBloQ9BaT4qc9BnjOFog5TvBSAGkJ3Nf70c0=
go.uber.org/zap v1.22.0/go.mod h1:H4siCOZOrAolnUPJEkfaSjDqyP+BDS0DdDWzwcgt3+U=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
Expand Down Expand Up @@ -1153,6 +1159,7 @@ gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand All @@ -1163,7 +1170,6 @@ honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.0.1-2020.1.4/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
honnef.co/go/tools v0.1.3 h1:qTakTkI6ni6LFD5sBwwsdSO+AQqbSIxOauHTTQKZ/7o=
honnef.co/go/tools v0.1.3/go.mod h1:NgwopIslSNH47DimFoV78dnkksY2EFtX0ajyb3K/las=
rsc.io/binaryregexp v0.2.0 h1:HfqmD5MEmC0zvwBuF187nq9mdnXjXsSivRiXN7SmRkE=
rsc.io/binaryregexp v0.2.0/go.mod h1:qTv7/COck+e2FymRvadv62gMdZztPaShugOCi3I+8D8=
Expand Down
177 changes: 177 additions & 0 deletions stdlib/sql/clickhouse.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
package sql

import (
"database/sql"
"fmt"
"time"

"github.com/InfluxCommunity/flux"
"github.com/InfluxCommunity/flux/codes"
"github.com/InfluxCommunity/flux/execute"
"github.com/InfluxCommunity/flux/internal/errors"
"github.com/InfluxCommunity/flux/values"
"github.com/lib/pq"
)

type ClickhouseRowReader struct {
Cursor *sql.Rows
columns []interface{}
columnTypes []flux.ColType
columnNames []string
}

func (m *ClickhouseRowReader) Next() bool {
next := m.Cursor.Next()
if next {
columnNames, err := m.Cursor.Columns()
if err != nil {
return false
}
m.columns = make([]interface{}, len(columnNames))
columnPointers := make([]interface{}, len(columnNames))
for i := 0; i < len(columnNames); i++ {
columnPointers[i] = &m.columns[i]
}
if err := m.Cursor.Scan(columnPointers...); err != nil {
return false
}
}
return next
}

func (m *ClickhouseRowReader) GetNextRow() ([]values.Value, error) {
row := make([]values.Value, len(m.columns))
for i, col := range m.columns {
switch col := col.(type) {
case bool, int64, uint64, float64, string:
row[i] = values.New(col)
case *bool:
row[i] = values.NewBool(*col)
case *string:
row[i] = values.NewString(*col)
case *int64:
row[i] = values.NewInt(*col)
case *uint64:
row[i] = values.NewUInt(*col)
case *float64:
row[i] = values.NewFloat(*col)
case time.Time:
row[i] = values.NewTime(values.ConvertTime(col))
case []uint8:
switch m.columnTypes[i] {
case flux.TInt:
newInt, err := UInt8ToInt64(col)
if err != nil {
return nil, err
}
row[i] = values.NewInt(newInt)
case flux.TFloat:
newFloat, err := UInt8ToFloat(col)
if err != nil {
return nil, err
}
row[i] = values.NewFloat(newFloat)
case flux.TTime:
t, err := time.Parse(layout, string(col))
if err != nil {
fmt.Print(err)
}
row[i] = values.NewTime(values.ConvertTime(t))
default:
row[i] = values.NewString(string(col))
}
case nil:
row[i] = values.NewNull(flux.SemanticType(m.columnTypes[i]))
default:
execute.PanicUnknownType(flux.TInvalid)
}
}
return row, nil
}

func (m *ClickhouseRowReader) InitColumnNames(n []string) {
m.columnNames = n
}

func (m *ClickhouseRowReader) InitColumnTypes(types []*sql.ColumnType) {
stringTypes := make([]flux.ColType, len(types))
for i := 0; i < len(types); i++ {
switch types[i].DatabaseTypeName() {
case "INT", "BIGINT", "SMALLINT", "TINYINT", "INT2", "INT4", "INT8", "SERIAL2", "SERIAL4", "SERIAL8":
stringTypes[i] = flux.TInt
case "FLOAT4", "FLOAT8":
stringTypes[i] = flux.TFloat
case "DATE", "TIME", "TIMESTAMP":
stringTypes[i] = flux.TTime
case "BOOL":
stringTypes[i] = flux.TBool
case "TEXT":
stringTypes[i] = flux.TString
case "Nullable(UInt64)":
stringTypes[i] = flux.TUInt
default:
stringTypes[i] = flux.TString
}
}
m.columnTypes = stringTypes
}

func (m *ClickhouseRowReader) ColumnNames() []string {
return m.columnNames
}

func (m *ClickhouseRowReader) ColumnTypes() []flux.ColType {
return m.columnTypes
}

func (m *ClickhouseRowReader) SetColumns(i []interface{}) {
m.columns = i
}

func (m *ClickhouseRowReader) Close() error {
if err := m.Cursor.Err(); err != nil {
return err
}
return m.Cursor.Close()
}

func NewClickhouseRowReader(r *sql.Rows) (execute.RowReader, error) {
reader := &ClickhouseRowReader{
Cursor: r,
}
cols, err := r.Columns()
if err != nil {
return nil, err
}
reader.InitColumnNames(cols)

types, err := r.ColumnTypes()
if err != nil {
return nil, err
}
reader.InitColumnTypes(types)
return reader, nil
}

// ClickhouseTranslateColumn translates flux colTypes into their corresponding Clickhouse column type
func ClickhouseColumnTranslateFunc() translationFunc {
c := map[string]string{
flux.TFloat.String(): "FLOAT",
flux.TInt.String(): "BIGINT",
flux.TUInt.String(): "BIGINT",
flux.TString.String(): "TEXT",
flux.TTime.String(): "TIMESTAMP",
flux.TBool.String(): "BOOL",
}
return func(f flux.ColType, colName string) (string, error) {
s, found := c[f.String()]
if !found {
return "", errors.Newf(codes.Internal, "ClickHouseQL does not support column type %s", f.String())
}
return clickhouseQuoteIdent(colName) + " " + s, nil
}
}

func clickhouseQuoteIdent(name string) string {
return pq.QuoteIdentifier(name)
}
2 changes: 2 additions & 0 deletions stdlib/sql/from.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ func createFromSQLSource(prSpec plan.ProcedureSpec, dsid execute.DatasetID, a ex
newRowReader = NewSnowflakeRowReader
case "mssql", "sqlserver":
newRowReader = NewMssqlRowReader
case "clickhouse":
newRowReader = NewClickhouseRowReader
case "awsathena":
newRowReader = NewAwsAthenaRowReader
case "bigquery":
Expand Down
8 changes: 8 additions & 0 deletions stdlib/sql/from_private_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,14 @@ func TestFromSqlUrlValidation(t *testing.T) {
Query: "",
},
ErrMsg: "",
}, {
Name: "ok clickhouse",
Spec: &FromSQLProcedureSpec{
DriverName: "clickhouse",
DataSourceName: "clickhouse://username:password@localhost:12345/default?secure=true",
Query: "",
},
ErrMsg: "",
}, {
Name: "ok vertica",
Spec: &FromSQLProcedureSpec{
Expand Down
7 changes: 7 additions & 0 deletions stdlib/sql/source_validator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,13 @@ func validateDataSource(validator url.Validator, driverName string, dataSourceNa
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "clickhouse":
// an example for clickhouse data source is: clickhouse://username:password@host1:9000/database?dial_timeout=200ms&max_execution_time=60
// this follows the URI semantics
u, err = neturl.Parse(dataSourceName)
if err != nil {
return errors.Newf(codes.Invalid, "invalid data source url: %v", err)
}
case "vertica", "vertigo":
// an example for vertica data source is: vertica://dbadmin:password@localhost:5433/VMart
// this follows the URI semantics
Expand Down
1 change: 1 addition & 0 deletions stdlib/sql/sql_test.flux
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mysqlDsn = "flux:flux@tcp(127.0.0.1:3306)/flux"
mariaDbDsn = "flux:flux@tcp(127.0.0.1:3307)/flux"
pgDsn = "postgresql://postgres@127.0.0.1:5432/postgres?sslmode=disable"
verticaDsn = "vertica://dbadmin@localhost:5433/flux"
clickhouseDsn = "clickhouse://clickhouse@127.0.0.1:9000/clickhouse?secure=false"
sqliteDsn = "file:///tmp/flux-integ-tests-sqlite.db"

// Some db engines will UPPERCASE table/column names when the identifiers are unquoted.
Expand Down
Loading
Loading