Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature - clickhouse driver #1616

Merged
merged 19 commits into from
Apr 12, 2022
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/gf.yml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,13 @@ jobs:
--health-timeout 5s
--health-retries 10

clickhouse-server:
image: yandex/clickhouse-server
ports:
- 9000:9000
- 8123:8123
- 9001:9001


# strategy set
strategy:
Expand Down
296 changes: 295 additions & 1 deletion contrib/drivers/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,5 +8,299 @@
package clickhouse

import (
_ "github.com/ClickHouse/clickhouse-go"
"context"
"database/sql"
"errors"
"fmt"
"github.com/ClickHouse/clickhouse-go"
"strings"

"github.com/gogf/gf/v2/container/gmap"
"github.com/gogf/gf/v2/database/gdb"
"github.com/gogf/gf/v2/errors/gcode"
"github.com/gogf/gf/v2/errors/gerror"
"github.com/gogf/gf/v2/text/gregex"
"github.com/gogf/gf/v2/text/gstr"
"github.com/gogf/gf/v2/util/gconv"
)

// Driver is the driver for postgresql database.
type Driver struct {
*gdb.Core
}

var (
// tableFieldsMap caches the table information retrieved from database.
tableFieldsMap = gmap.New(true)
ErrUnsupportedInsertIgnore = errors.New("unsupported method:InsertIgnore")
DGuang21 marked this conversation as resolved.
Show resolved Hide resolved
ErrUnsupportedInsertGetId = errors.New("unsupported method:InsertGetId")
ErrUnsupportedReplace = errors.New("unsupported method:Replace")
ErrUnsupportedBegin = errors.New("unsupported method:Begin")
ErrUnsupportedTransaction = errors.New("unsupported method:Transaction")
ErrSQLNull = errors.New("SQL cannot be null")
)

func init() {
if err := gdb.Register(`clickhouse`, New()); err != nil {
panic(err)
}
}

// New create and returns a driver that implements gdb.Driver, which supports operations for clickhouse.
func New() gdb.Driver {
return &Driver{}
}

// New creates and returns a database object for clickhouse.
// It implements the interface of gdb.Driver for extra database driver installation.
func (d *Driver) New(core *gdb.Core, node *gdb.ConfigNode) (gdb.DB, error) {
return &Driver{
Core: core,
}, nil
}

// Open creates and returns an underlying sql.DB object for clickhouse.
func (d *Driver) Open(config *gdb.ConfigNode) (*sql.DB, error) {
var (
source string
driver = "clickhouse"
)
if config.Link != "" {
source = config.Link
} else if config.Pass != "" {
source = fmt.Sprintf(
"clickhouse://%s:%s@%s:%s/%s?charset=%s&debug=%s",
config.User, config.Pass, config.Host, config.Port, config.Name, config.Charset, gconv.String(config.Debug))
} else {
source = fmt.Sprintf(
"clickhouse://%s@%s:%s/%s?charset=%s&debug=%s",
config.User, config.Host, config.Port, config.Name, config.Charset, gconv.String(config.Debug))
}
db, err := sql.Open(driver, source)
if err != nil {
return nil, err
}

return db, nil
}

// Tables retrieves and returns the tables of current schema.
// It's mainly used in cli tool chain for automatically generating the models.
func (d *Driver) Tables(ctx context.Context, schema ...string) (tables []string, err error) {
var result gdb.Result
link, err := d.SlaveLink(schema...)
if err != nil {
return nil, err
}
query := fmt.Sprintf("select name from `system`.tables where database = '%s'", d.GetConfig().Name)
result, err = d.DoGetAll(ctx, link, query)
if err != nil {
return
}
for _, m := range result {
tables = append(tables, m["name"].String())
}
return
}

// TableFields retrieves and returns the fields' information of specified table of current schema.
// Also see DriverMysql.TableFields.
func (d *Driver) TableFields(ctx context.Context, table string, schema ...string) (fields map[string]*gdb.TableField, err error) {
charL, charR := d.GetChars()
table = gstr.Trim(table, charL+charR)
if gstr.Contains(table, " ") {
return nil, gerror.NewCode(gcode.CodeInvalidParameter, "function TableFields supports only single table operations")
}
useSchema := d.GetSchema()
if len(schema) > 0 && schema[0] != "" {
useSchema = schema[0]
}
v := tableFieldsMap.GetOrSetFuncLock(
fmt.Sprintf(`clickhouse_table_fields_%s_%s@group:%s`, table, useSchema, d.GetGroup()),
func() interface{} {
var (
result gdb.Result
link gdb.Link
)
if link, err = d.SlaveLink(useSchema); err != nil {
return nil
}
getColumnsSql := fmt.Sprintf("select name,position,default_expression,comment from `system`.columns c where database = '%s' and `table` = '%s'", d.GetConfig().Name, table)
result, err = d.DoGetAll(ctx, link, getColumnsSql)
if err != nil {
return nil
}
fields = make(map[string]*gdb.TableField)
for _, m := range result {
var (
isNull = false
fieldType = m["type"].String()
)
// in clickhouse , filed type like is Nullable(int)
fieldsResult, _ := gregex.MatchString(`^Nullable\((.*?)\)`, fieldType)
if len(fieldsResult) == 2 {
isNull = true
fieldType = fieldsResult[1]
}
fields[m["name"].String()] = &gdb.TableField{
Index: m["position"].Int(),
Name: m["name"].String(),
Default: m["default_expression"].Val(),
Comment: m["comment"].String(),
//Key: m["Key"].String(),
Type: fieldType,
Null: isNull,
}
}
return fields
},
)
if v != nil {
fields = v.(map[string]*gdb.TableField)
}
return
}

// FilteredLink retrieves and returns filtered `linkInfo` that can be using for
// logging or tracing purpose.
func (d *Driver) FilteredLink() string {
linkInfo := d.GetConfig().Link
if linkInfo == "" {
return ""
}
s, _ := gregex.ReplaceString(
`(.+?):(.+)@tcp(.+)`,
`$1:xxx@tcp$3`,
linkInfo,
)
return s
}

// PingMaster pings the master node to check authentication or keeps the connection alive.
func (d *Driver) PingMaster() error {
conn, err := d.Master()
if err != nil {
return err
}
return d.ping(conn)
}

// PingSlave pings the slave node to check authentication or keeps the connection alive.
func (d *Driver) PingSlave() error {
conn, err := d.Slave()
if err != nil {
return err
}
return d.ping(conn)
}

// ping Returns the Clickhouse specific error.
func (d *Driver) ping(conn *sql.DB) error {
err := conn.Ping()
if exception, ok := err.(*clickhouse.Exception); ok {
return errors.New(fmt.Sprintf("[%d]%s", exception.Code, exception.Message))
}
return err
}

// DoFilter handles the sql before posts it to database.
func (d *Driver) DoFilter(ctx context.Context, link gdb.Link, sql string, args []interface{}) (newSql string, newArgs []interface{}, err error) {
// replace MySQL to Clickhouse SQL grammar
DGuang21 marked this conversation as resolved.
Show resolved Hide resolved
// MySQL eg: UPDATE visits SET xxx
// Clickhouse eg: ALTER TABLE visits UPDATE xxx
// MySQL eg: DELETE FROM VISIT
// Clickhouse eg: ALTER TABLE VISIT DELETE WHERE filter_expr
result, err := gregex.MatchString("(?i)^UPDATE|DELETE", sql)
if err != nil {
return "", nil, err
}
if len(result) != 0 {
sqlSlice := strings.Split(sql, " ")
if len(sqlSlice) < 3 {
return "", nil, ErrSQLNull
}
ck := []string{"ALTER", "TABLE"}
switch strings.ToUpper(result[0]) {
case "UPDATE":
sqlSlice = append(append(append(ck, sqlSlice[1]), result[0]), sqlSlice[3:]...)
return strings.Join(sqlSlice, " "), args, nil
case "DELETE":
sqlSlice = append(append(append(ck, sqlSlice[2]), result[0]), sqlSlice[3:]...)
return strings.Join(sqlSlice, " "), args, nil
}
}
return sql, args, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

虽然使用正则匹配方式进行替换效率比较高,但是比较严谨的还是使用SQL解析器来解析后替换。可以参考下develop分支中sqlparser的用法。

}

// DoCommit commits current sql and arguments to underlying sql driver.
func (d *Driver) DoCommit(ctx context.Context, in gdb.DoCommitInput) (out gdb.DoCommitOutput, err error) {
return d.Core.DoCommit(context.WithValue(ctx, "isIgnoreResult", true), in)
}

func (d *Driver) DoInsert(ctx context.Context, link gdb.Link, table string, list gdb.List, option gdb.DoInsertOption) (result sql.Result, err error) {
DGuang21 marked this conversation as resolved.
Show resolved Hide resolved
var (
keys []string // Field names.
valueHolder = make([]string, 0)
)
// Handle the field names and placeholders.
for k := range list[0] {
keys = append(keys, k)
valueHolder = append(valueHolder, "?")
}
// Prepare the batch result pointer.
var (
charL, charR = d.Core.GetChars()
keysStr = charL + strings.Join(keys, charR+","+charL) + charR
holderStr = strings.Join(valueHolder, ",")
tx = &gdb.TX{}
stdSqlResult sql.Result
stmt *gdb.Stmt
)
tx, err = d.Core.Begin(ctx)
if err != nil {
return
}
stmt, err = tx.Prepare(fmt.Sprintf(
"INSERT INTO %s(%s) VALUES (%s)",
d.QuotePrefixTableName(table), keysStr,
holderStr,
))
if err != nil {
return
}
for i := 0; i < len(list); i++ {
params := []interface{}{} // Values that will be committed to underlying database driver.
for _, k := range keys {
params = append(params, list[i][k])
}
// Prepare is allowed to execute only once in a transaction opened by clickhouse
stdSqlResult, err = stmt.ExecContext(ctx, params...)
if err != nil {
return stdSqlResult, err
}
}
return stdSqlResult, tx.Commit()
}

// InsertIgnore Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) InsertIgnore(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) {
return nil, ErrUnsupportedInsertIgnore
}

// InsertAndGetId Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) InsertAndGetId(ctx context.Context, table string, data interface{}, batch ...int) (int64, error) {
return 0, ErrUnsupportedInsertGetId
}

// Replace Other queries for modifying data parts are not supported: REPLACE, MERGE, UPSERT, INSERT UPDATE.
func (d *Driver) Replace(ctx context.Context, table string, data interface{}, batch ...int) (sql.Result, error) {
return nil, ErrUnsupportedReplace
}

func (d *Driver) Begin(ctx context.Context) (tx *gdb.TX, err error) {
return nil, ErrUnsupportedBegin
}

func (d *Driver) Transaction(ctx context.Context, f func(ctx context.Context, tx *gdb.TX) error) error {
return ErrUnsupportedTransaction
}
Loading