diff --git a/bind.go b/bind.go index 5fb7d14..18d4521 100644 --- a/bind.go +++ b/bind.go @@ -132,11 +132,11 @@ func format(tz *time.Location, v interface{}) string { case time.Time: switch v.Location().String() { case "Local": - return fmt.Sprintf("toDateTime(%d)", v.Unix()) + return fmt.Sprintf("to_datetime(%d)", v.Unix()) case tz.String(): - return v.Format("toDateTime('2006-01-02 15:04:05')") + return v.Format("to_datetime('2006-01-02 15:04:05')") } - return v.Format("toDateTime('2006-01-02 15:04:05', '" + v.Location().String() + "')") + return v.Format("to_datetime('2006-01-02 15:04:05', '" + v.Location().String() + "')") case []interface{}: // tuple elements := make([]string, 0, len(v)) for _, e := range v { diff --git a/bind_test.go b/bind_test.go index ca54a57..42da3e9 100644 --- a/bind_test.go +++ b/bind_test.go @@ -126,8 +126,8 @@ func TestFormatTime(t *testing.T) { tz, err = time.LoadLocation("Europe/London") ) if assert.NoError(t, err) { - if assert.Equal(t, "toDateTime('2022-01-12 15:00:00')", format(t1.Location(), t1)) { - assert.Equal(t, "toDateTime('2022-01-12 15:00:00', 'UTC')", format(tz, t1)) + if assert.Equal(t, "to_datetime('2022-01-12 15:00:00')", format(t1.Location(), t1)) { + assert.Equal(t, "to_datetime('2022-01-12 15:00:00', 'UTC')", format(tz, t1)) } } } diff --git a/examples/native/batch/main.go b/examples/native/batch/main.go index 2707b7e..f48296c 100644 --- a/examples/native/batch/main.go +++ b/examples/native/batch/main.go @@ -31,7 +31,7 @@ func example() error { var ( ctx = context.Background() conn, err = proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -47,30 +47,30 @@ func example() error { if err != nil { return err } - if err := conn.Exec(ctx, `DROP TABLE IF EXISTS example`); err != nil { + if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil { return err } err = conn.Exec(ctx, ` - CREATE TABLE IF NOT EXISTS example ( - Col1 UInt8 - , Col2 String - , Col3 FixedString(3) - , Col4 UUID - , Col5 Map(String, UInt8) - , Col6 Array(String) - , Col7 Tuple(String, UInt8, Array(Map(String, String))) + CREATE STREAM IF NOT EXISTS example ( + Col1 uint8 + , Col2 string + , Col3 fixed_string(3) + , Col4 uuid + , Col5 map(string, uint8) + , Col6 array(string) + , Col7 tuple(string, uint8, array(map(string, string))) , Col8 DateTime - ) Engine = Null + ) `) if err != nil { return err } - batch, err := conn.PrepareBatch(ctx, "INSERT INTO example") + batch, err := conn.PrepareBatch(ctx, "INSERT INTO example (* except _tp_time)") if err != nil { return err } - for i := 0; i < 500_000; i++ { + for i := 0; i < 100; i++ { err := batch.Append( uint8(42), "ClickHouse", "Inc", diff --git a/examples/native/bind/main.go b/examples/native/bind/main.go index 3638643..f80a268 100644 --- a/examples/native/bind/main.go +++ b/examples/native/bind/main.go @@ -30,7 +30,7 @@ func example() error { var ( ctx = context.Background() conn, err = proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -42,18 +42,21 @@ func example() error { return err } const ddl = ` - CREATE TEMPORARY TABLE example ( - Col1 UInt8 - , Col2 String + CREATE STREAM example ( + Col1 uint8 + , Col2 string , Col3 DateTime ) ` + if err := conn.Exec(ctx, `drop stream if exists example`); err != nil { + return err + } if err := conn.Exec(ctx, ddl); err != nil { return err } datetime := time.Now() { - batch, err := conn.PrepareBatch(ctx, "INSERT INTO example") + batch, err := conn.PrepareBatch(ctx, "INSERT INTO example (* except _tp_time)") if err != nil { return err } @@ -73,13 +76,13 @@ func example() error { Col3 time.Time } { - if err := conn.QueryRow(ctx, `SELECT * FROM example WHERE Col1 = $1 AND Col3 = $2`, 2, datetime).ScanStruct(&result); err != nil { + if err := conn.QueryRow(ctx, `SELECT * except _tp_time FROM example WHERE _tp_time > earliest_ts() AND Col1 = $1 AND Col3 = $2 LIMIT 1`, 2, datetime).ScanStruct(&result); err != nil { return err } fmt.Println(result) } { - if err := conn.QueryRow(ctx, `SELECT * FROM example WHERE Col1 = @Col1 AND Col3 = @Col2`, + if err := conn.QueryRow(ctx, `SELECT * except _tp_time FROM example WHERE _tp_time > earliest_ts() AND Col1 = @Col1 AND Col3 = @Col2 LIMIT 1`, proton.Named("Col1", 4), proton.Named("Col2", datetime), ).ScanStruct(&result); err != nil { diff --git a/examples/native/dynamic-scan-types/main.go b/examples/native/dynamic-scan-types/main.go index 4932a2c..786500d 100644 --- a/examples/native/dynamic-scan-types/main.go +++ b/examples/native/dynamic-scan-types/main.go @@ -28,7 +28,7 @@ import ( func example() error { conn, err := proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", diff --git a/examples/native/main.go b/examples/native/main.go index 35619cb..37a18ac 100644 --- a/examples/native/main.go +++ b/examples/native/main.go @@ -28,7 +28,7 @@ import ( func main() { conn, err := proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -62,28 +62,28 @@ func main() { fmt.Printf("name: %s, value: %s, type=%s\n", s.Name, s.Value, s.Type) } - if err = conn.Exec(context.Background(), "TUNCATE TABLE X"); err == nil { + if err = conn.Exec(context.Background(), "TRUNCATE STREAM X"); err == nil { panic("unexpected") } if exception, ok := err.(*proton.Exception); ok { fmt.Printf("Catch exception [%d]\n", exception.Code) } const ddl = ` - CREATE TABLE example ( - Col1 UInt64 - , Col2 FixedString(2) - , Col3 Map(String, String) - , Col4 Array(String) + CREATE STREAM example ( + Col1 uint64 + , Col2 fixed_string(2) + , Col3 map(string, string) + , Col4 array(string) , Col5 DateTime64(3) - ) Engine Memory + ) ` - if err := conn.Exec(context.Background(), "DROP TABLE IF EXISTS example"); err != nil { + if err := conn.Exec(context.Background(), "DROP STREAM IF EXISTS example"); err != nil { log.Fatal(err) } if err := conn.Exec(context.Background(), ddl); err != nil { log.Fatal(err) } - batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example") + batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example (* except _tp_time)") if err != nil { log.Fatal(err) } @@ -107,9 +107,10 @@ func main() { ctx := proton.Context(context.Background(), proton.WithProgress(func(p *proton.Progress) { fmt.Println("progress: ", p) })) - + ctx, cancel := context.WithTimeout(ctx, time.Duration(10)*time.Second) + defer cancel() var count uint64 - if err := conn.QueryRow(ctx, "SELECT COUNT() FROM example").Scan(&count); err != nil { + if err := conn.QueryRow(ctx, "SELECT count() FROM example WHERE _tp_time > earliest_ts() LIMIT 1").Scan(&count); err != nil { log.Fatal(err) } fmt.Println("count", count) @@ -117,7 +118,7 @@ func main() { Col1 uint64 Count uint64 `ch:"count"` } - if err := conn.QueryRow(ctx, "SELECT Col1, COUNT() AS count FROM example WHERE Col1 = $1 GROUP BY Col1", 42).ScanStruct(&result); err != nil { + if err := conn.QueryRow(ctx, "SELECT Col1, count() AS count FROM example WHERE _tp_time > earliest_ts() AND Col1 = $1 GROUP BY Col1 LIMIT 1", 42).ScanStruct(&result); err != nil { log.Fatal(err) } fmt.Println("result", result) diff --git a/examples/native/scan_struct/main.go b/examples/native/scan_struct/main.go index 3370423..92fc201 100644 --- a/examples/native/scan_struct/main.go +++ b/examples/native/scan_struct/main.go @@ -28,7 +28,7 @@ import ( func example() error { conn, err := proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -46,7 +46,9 @@ func example() error { if err != nil { return err } - ctx := proton.Context(context.Background(), proton.WithSettings(proton.Settings{ + ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(3)) + defer cancel() + ctx = proton.Context(ctx, proton.WithSettings(proton.Settings{ "max_block_size": 10, }), proton.WithProgress(func(p *proton.Progress) { fmt.Println("progress: ", p) @@ -57,15 +59,15 @@ func example() error { } return err } - if err := conn.Exec(ctx, `DROP TABLE IF EXISTS example`); err != nil { + if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil { return err } err = conn.Exec(ctx, ` - CREATE TABLE IF NOT EXISTS example ( - Col1 UInt8, - Col2 String, + CREATE STREAM IF NOT EXISTS example ( + Col1 uint8, + Col2 string, Col3 DateTime - ) engine=Memory + ) `) if err != nil { return err @@ -89,7 +91,7 @@ func example() error { ColumnWithName time.Time `ch:"Col3"` } - if err = conn.Select(ctx, &result, "SELECT Col1, Col2, Col3 FROM example"); err != nil { + if err = conn.Select(ctx, &result, "SELECT Col1, Col2, Col3 FROM example WHERE _tp_time > earliest_ts() LIMIT 10"); err != nil { return err } diff --git a/examples/native/simple/main.go b/examples/native/simple/main.go index 770b3d0..33066c4 100644 --- a/examples/native/simple/main.go +++ b/examples/native/simple/main.go @@ -28,7 +28,7 @@ import ( func example() error { conn, err := proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -53,21 +53,23 @@ func example() error { }), proton.WithProfileInfo(func(p *proton.ProfileInfo) { fmt.Println("profile info: ", p) })) + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(5)) + defer cancel() if err := conn.Ping(ctx); err != nil { if exception, ok := err.(*proton.Exception); ok { fmt.Printf("Catch exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) } return err } - if err := conn.Exec(ctx, `DROP TABLE IF EXISTS example`); err != nil { + if err := conn.Exec(ctx, `DROP STREAM IF EXISTS example`); err != nil { return err } err = conn.Exec(ctx, ` - CREATE TABLE IF NOT EXISTS example ( - Col1 UInt8, - Col2 String, + CREATE STREAM IF NOT EXISTS example ( + Col1 uint8, + Col2 string, Col3 DateTime - ) engine=Memory + ) `) if err != nil { return err @@ -84,8 +86,7 @@ func example() error { if err := batch.Send(); err != nil { return err } - - rows, err := conn.Query(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE Col1 >= $1 AND Col2 <> $2 AND Col3 <= $3", 0, "xxx", time.Now()) + rows, err := conn.Query(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE _tp_time > earliest_ts() AND Col1 >= $1 AND Col2 <> $2 AND Col3 <= $3 LIMIT 12", 0, "xxx", time.Now()) if err != nil { return err } diff --git a/examples/native/write-async/main.go b/examples/native/write-async/main.go index b79c22c..feb0eb9 100644 --- a/examples/native/write-async/main.go +++ b/examples/native/write-async/main.go @@ -27,19 +27,19 @@ import ( ) const ddl = ` -CREATE TEMPORARY TABLE example ( - Col1 UInt64 - , Col2 String - , Col3 Array(UInt8) +CREATE STREAM example ( + Col1 uint64 + , Col2 string + , Col3 array(int) , Col4 DateTime -) +) ` func main() { var ( ctx = context.Background() conn, err = proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -55,6 +55,7 @@ func main() { if err != nil { log.Fatal(err) } + conn.Exec(ctx, `DROP STREAM IF EXISTS example`) if err := conn.Exec(ctx, ddl); err != nil { log.Fatal(err) } diff --git a/examples/native/write-columnar/main.go b/examples/native/write-columnar/main.go index d1a6ca9..33c8f82 100644 --- a/examples/native/write-columnar/main.go +++ b/examples/native/write-columnar/main.go @@ -26,16 +26,16 @@ import ( ) const ddl = ` -CREATE TEMPORARY TABLE example ( - Col1 UInt64 - , Col2 String - , Col3 Array(UInt8) +CREATE STREAM example ( + Col1 uint64 + , Col2 string + , Col3 array(uint8) , Col4 DateTime -) +) ` func example(conn proton.Conn) error { - batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example") + batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example (* except _tp_time)") if err != nil { return err } @@ -70,7 +70,7 @@ func main() { var ( ctx = context.Background() conn, err = proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -86,6 +86,9 @@ func main() { if err != nil { log.Fatal(err) } + if err := conn.Exec(context.Background(), `DROP STREAM IF EXISTS example`); err != nil { + log.Fatal(err) + } if err := conn.Exec(ctx, ddl); err != nil { log.Fatal(err) } diff --git a/examples/native/write-struct/main.go b/examples/native/write-struct/main.go index 45b5f5e..a3c9296 100644 --- a/examples/native/write-struct/main.go +++ b/examples/native/write-struct/main.go @@ -26,12 +26,12 @@ import ( ) const ddl = ` -CREATE TEMPORARY TABLE example ( - Col1 UInt64 - , Col2 String - , Col3 Array(UInt8) +CREATE STREAM example ( + Col1 uint64 + , Col2 string + , Col3 array(uint8) , Col4 DateTime -) +) ` type row struct { @@ -42,7 +42,7 @@ type row struct { } func example(conn proton.Conn) error { - batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example") + batch, err := conn.PrepareBatch(context.Background(), "INSERT INTO example (* except _tp_time)") if err != nil { return err } @@ -64,7 +64,7 @@ func main() { var ( ctx = context.Background() conn, err = proton.Open(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -80,6 +80,9 @@ func main() { if err != nil { log.Fatal(err) } + if err := conn.Exec(context.Background(), `DROP STREAM IF EXISTS example`); err != nil { + log.Fatal(err) + } if err := conn.Exec(ctx, ddl); err != nil { log.Fatal(err) } diff --git a/examples/std/batch/main.go b/examples/std/batch/main.go index 2532a1e..7321f1b 100644 --- a/examples/std/batch/main.go +++ b/examples/std/batch/main.go @@ -28,7 +28,7 @@ import ( ) func example() error { - conn, err := sql.Open("proton", "proton://127.0.0.1:9000?dial_timeout=1s&compress=true") + conn, err := sql.Open("proton", "proton://127.0.0.1:8463?dial_timeout=1s&compress=true") if err != nil { return err } @@ -36,20 +36,20 @@ func example() error { if err != nil { return err } - if _, err := conn.Exec(`DROP TABLE IF EXISTS example`); err != nil { + if _, err := conn.Exec(`DROP STREAM IF EXISTS example`); err != nil { return err } _, err = conn.Exec(` - CREATE TABLE IF NOT EXISTS example ( - Col1 UInt8 - , Col2 String - , Col3 FixedString(3) - , Col4 UUID - , Col5 Map(String, UInt8) - , Col6 Array(String) - , Col7 Tuple(String, UInt8, Array(Map(String, String))) + CREATE STREAM IF NOT EXISTS example ( + Col1 uint8 + , Col2 string + , Col3 fixed_string(3) + , Col4 uuid + , Col5 map(string, uint8) + , Col6 array(string) + , Col7 tuple(string, uint8, array(map(string, string))) , Col8 DateTime - ) Engine = Null + ) `) if err != nil { return err @@ -58,11 +58,11 @@ func example() error { if err != nil { return err } - batch, err := scope.Prepare("INSERT INTO example") + batch, err := scope.Prepare("INSERT INTO example (* except _tp_time)") if err != nil { return err } - for i := 0; i < 500_000; i++ { + for i := 0; i < 100; i++ { _, err := batch.Exec( uint8(42), "ClickHouse", "Inc", diff --git a/examples/std/bind/main.go b/examples/std/bind/main.go index 40fd785..73af225 100644 --- a/examples/std/bind/main.go +++ b/examples/std/bind/main.go @@ -27,17 +27,20 @@ import ( ) func example() error { - conn, err := sql.Open("proton", "proton://127.0.0.1:9000") + conn, err := sql.Open("proton", "proton://127.0.0.1:8463") if err != nil { return err } const ddl = ` - CREATE TEMPORARY TABLE example ( - Col1 UInt8 - , Col2 String + CREATE STREAM example ( + Col1 uint8 + , Col2 string , Col3 DateTime - ) + ) ` + if _, err := conn.Exec(`DROP STREAM IF EXISTS example`); err != nil { + return err + } if _, err := conn.Exec(ddl); err != nil { return err } @@ -47,7 +50,7 @@ func example() error { if err != nil { return err } - batch, err := scope.Prepare("INSERT INTO example") + batch, err := scope.Prepare("INSERT INTO example (* except _tp_time)") if err != nil { return err } @@ -67,7 +70,7 @@ func example() error { Col3 time.Time } { - if err := conn.QueryRow(`SELECT * FROM example WHERE Col1 = $1 AND Col3 = $2`, 2, datetime).Scan( + if err := conn.QueryRow(`SELECT * except _tp_time FROM example WHERE _tp_time > earliest_ts() AND Col1 = $1 AND Col3 = $2 LIMIT 1`, 2, datetime).Scan( &result.Col1, &result.Col2, &result.Col3, @@ -77,7 +80,7 @@ func example() error { fmt.Println(result) } { - if err := conn.QueryRow(`SELECT * FROM example WHERE Col1 = @Col1 AND Col3 = @Col2`, + if err := conn.QueryRow(`SELECT * except _tp_time FROM example WHERE _tp_time > earliest_ts() AND Col1 = @Col1 AND Col3 = @Col2 LIMIT 1`, sql.Named("Col1", 4), sql.Named("Col2", datetime), ).Scan( diff --git a/examples/std/open_db/main.go b/examples/std/open_db/main.go index a465718..f09c2ee 100644 --- a/examples/std/open_db/main.go +++ b/examples/std/open_db/main.go @@ -28,7 +28,7 @@ import ( func example() error { conn := proton.OpenDB(&proton.Options{ - Addr: []string{"127.0.0.1:9000"}, + Addr: []string{"127.0.0.1:8463"}, Auth: proton.Auth{ Database: "default", Username: "default", @@ -51,21 +51,23 @@ func example() error { }), proton.WithProgress(func(p *proton.Progress) { fmt.Println("progress: ", p) })) + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(3)) + defer cancel() if err := conn.PingContext(ctx); err != nil { if exception, ok := err.(*proton.Exception); ok { fmt.Printf("Catch exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) } return err } - if _, err := conn.ExecContext(ctx, `DROP TABLE IF EXISTS example`); err != nil { + if _, err := conn.ExecContext(ctx, `DROP STREAM IF EXISTS example`); err != nil { return err } _, err := conn.ExecContext(ctx, ` - CREATE TABLE IF NOT EXISTS example ( - Col1 UInt8, - Col2 String, + CREATE STREAM IF NOT EXISTS example ( + Col1 uint8, + Col2 string, Col3 DateTime - ) engine=Memory + ) `) if err != nil { return err @@ -88,7 +90,7 @@ func example() error { if err := scope.Commit(); err != nil { return err } - rows, err := conn.QueryContext(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE Col1 >= $1 AND Col2 <> $2 AND Col3 <= $3", 0, "xxx", time.Now()) + rows, err := conn.QueryContext(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE _tp_time > earliest_ts() AND Col1 >= $1 AND Col2 <> $2 AND Col3 <= $3 LIMIT 10", 0, "xxx", time.Now()) if err != nil { return err } diff --git a/examples/std/simple/main.go b/examples/std/simple/main.go index f18e48b..96b065a 100644 --- a/examples/std/simple/main.go +++ b/examples/std/simple/main.go @@ -28,7 +28,7 @@ import ( ) func example() error { - conn, err := sql.Open("proton", "proton://127.0.0.1:9000?dial_timeout=1s&compress=true") + conn, err := sql.Open("proton", "proton://127.0.0.1:8463?dial_timeout=1s&compress=true") if err != nil { return err } @@ -40,21 +40,23 @@ func example() error { }), proton.WithProgress(func(p *proton.Progress) { fmt.Println("progress: ", p) })) + ctx, cancel := context.WithTimeout(ctx, time.Second*time.Duration(3)) + defer cancel() if err := conn.PingContext(ctx); err != nil { if exception, ok := err.(*proton.Exception); ok { fmt.Printf("Catch exception [%d] %s \n%s\n", exception.Code, exception.Message, exception.StackTrace) } return err } - if _, err := conn.ExecContext(ctx, `DROP TABLE IF EXISTS example`); err != nil { + if _, err := conn.ExecContext(ctx, `DROP STREAM IF EXISTS example`); err != nil { return err } _, err = conn.ExecContext(ctx, ` - CREATE TABLE IF NOT EXISTS example ( - Col1 UInt8, - Col2 String, + CREATE STREAM IF NOT EXISTS example ( + Col1 uint8, + Col2 string, Col3 DateTime - ) engine=Memory + ) `) if err != nil { return err @@ -77,7 +79,7 @@ func example() error { if err := scope.Commit(); err != nil { return err } - rows, err := conn.QueryContext(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE Col1 >= $1 AND Col2 <> $2 AND Col3 <= $3", 0, "xxx", time.Now()) + rows, err := conn.QueryContext(ctx, "SELECT Col1, Col2, Col3 FROM example WHERE _tp_time > earliest_ts() AND Col1 >= $1 AND Col2 <> $2 AND Col3 <= $3", 0, "xxx", time.Now()) if err != nil { return err } diff --git a/examples/std/write-async/main.go b/examples/std/write-async/main.go index 138a5cc..31b406a 100644 --- a/examples/std/write-async/main.go +++ b/examples/std/write-async/main.go @@ -27,19 +27,22 @@ import ( ) const ddl = ` -CREATE TEMPORARY TABLE example ( - Col1 UInt64 - , Col2 String - , Col3 Array(UInt8) +CREATE STREAM example ( + Col1 uint64 + , Col2 string + , Col3 array(uint8) , Col4 DateTime ) ` func main() { - conn, err := sql.Open("proton", "proton://127.0.0.1:9000") + conn, err := sql.Open("proton", "proton://127.0.0.1:8463") if err != nil { log.Fatal(err) } + if _, err := conn.Exec(`DROP STREAM IF EXISTS example`); err != nil { + log.Fatal(err) + } if _, err := conn.Exec(ddl); err != nil { log.Fatal(err) }