Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: new write protocol implement #207

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions opengemini/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ import (
"context"
"crypto/tls"
"log/slog"
"strconv"
"time"

"github.com/prometheus/client_golang/prometheus"

"github.com/openGemini/opengemini-client-go/proto"
)

const (
Expand All @@ -33,6 +36,7 @@ const (
type Codec string

type ContentType string

type CompressMethod string

const (
Expand Down Expand Up @@ -69,6 +73,9 @@ type Client interface {
WriteBatchPoints(ctx context.Context, database string, bp []*Point) error
// WriteBatchPointsWithRp write batch points with retention policy
WriteBatchPointsWithRp(ctx context.Context, database string, rp string, bp []*Point) error
// WriteByGRPC write batch record to assigned database.retention_policy by gRPC.
// You'd better use NewRecordBuilder to build req.
WriteByGRPC(ctx context.Context, req *proto.WriteRequest) error

// CreateDatabase Create database
CreateDatabase(database string) error
Expand Down Expand Up @@ -166,6 +173,8 @@ type Config struct {
CustomMetricsLabels map[string]string
// Logger structured logger for logging operations
Logger *slog.Logger
// RPCConfig configuration information for write service by gRPC
RPCConfig *GRPCConfig
}

// Address configuration for providing service.
Expand All @@ -176,6 +185,10 @@ type Address struct {
Port int
}

func (a *Address) String() string {
return a.Host + ":" + strconv.Itoa(a.Port)
}

// AuthType type of identity authentication.
type AuthType int

Expand Down Expand Up @@ -212,6 +225,21 @@ type RpConfig struct {
IndexDuration string
}

// GRPCConfig represents the configuration information for write service by gRPC.
type GRPCConfig struct {
// Addresses Configure the service endpoints for the openGemini grpc write service.
// This parameter is required.
Addresses []Address
// AuthConfig configuration information for authentication.
AuthConfig *AuthConfig
// TlsConfig configuration information for tls.
TlsConfig *tls.Config
// CompressMethod determines the compress method used for data transmission.
CompressMethod CompressMethod
// Timeout default 30s
Timeout time.Duration
}

// NewClient Creates a openGemini client instance
func NewClient(config *Config) (Client, error) {
return newClient(config)
Expand Down
10 changes: 9 additions & 1 deletion opengemini/client_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type client struct {
prevIdx atomic.Int32
dataChanMap syncx.Map[dbRp, chan *sendBatchWithCB]
metrics *metrics
rpcClient *recordWriterClient

batchContext context.Context
batchContextCancel context.CancelFunc
Expand All @@ -48,7 +49,7 @@ type client struct {

func newClient(c *Config) (Client, error) {
if len(c.Addresses) == 0 {
return nil, errors.New("must have at least one address")
return nil, ErrEmptyAddress
}
if c.AuthConfig != nil {
if c.AuthConfig.AuthType == AuthTypeToken && len(c.AuthConfig.Token) == 0 {
Expand Down Expand Up @@ -91,6 +92,13 @@ func newClient(c *Config) (Client, error) {
} else {
dbClient.logger = slog.Default()
}
if c.RPCConfig != nil {
rc, err := newRecordWriterClient(c.RPCConfig)
if err != nil {
return nil, errors.New("failed to create rpc client: " + err.Error())
}
dbClient.rpcClient = rc
}
dbClient.prevIdx.Store(-1)
if len(c.Addresses) > 1 {
// if there are multiple addresses, start the health check
Expand Down
2 changes: 2 additions & 0 deletions opengemini/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ var (
ErrEmptyTagOrField = errors.New("empty tag or field")
ErrEmptyTagKey = errors.New("empty tag key")
ErrRetentionPolicy = errors.New("empty retention policy")
ErrEmptyRecord = errors.New("empty record")
ErrEmptyAddress = errors.New("empty address, must have at least one address")
)

// checkDatabaseName checks if the database name is empty and returns an error if it is.
Expand Down
1 change: 1 addition & 0 deletions opengemini/query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ func TestQueryWithEpoch(t *testing.T) {
assert.Equal(t, length, getTimestampLength(v))
}
}

func TestQueryWithMsgPack(t *testing.T) {
c := testNewClient(t, &Config{
Addresses: []Address{{
Expand Down
269 changes: 269 additions & 0 deletions opengemini/record_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,269 @@
// Copyright 2024 openGemini Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package opengemini

import (
"errors"
"fmt"
"sync"
"time"

"github.com/openGemini/opengemini-client-go/lib/record"
"github.com/openGemini/opengemini-client-go/proto"
)

var (
_ RecordBuilder = (*RecordBuilderImpl)(nil)
recordLinePool = &sync.Pool{New: func() any {
return &RecordLineBuilderImpl{}
}}
)

type RecordLine interface{}

type RecordLineBuilder interface {
// AddTag add a tag to the record.
// If the key exists, it will be overwritten.
// If the key is `time`, it will cause an error.
// If the key is empty or the value is empty, it will be ignored.
AddTag(key string, value string) RecordLineBuilder
// AddTags add multiple tags to the record.
// Each entry in the map represents a tag where the key is the tag name and the value is the tag value.
AddTags(tags map[string]string) RecordLineBuilder
// AddField add a field to the record.
// If the key is empty, it will be ignored.
// If the key is `time`, it will cause an error.
// If the key already exists, its value will be overwritten.
AddField(key string, value interface{}) RecordLineBuilder
// AddFields add multiple fields to the record.
// Each entry in the map represents a field where the key is the field name and the value is the field value.
AddFields(fields map[string]interface{}) RecordLineBuilder
CompressMethod(method CompressMethod) RecordLineBuilder
Error() error
// Build specifies the time of the record.
// If the time is not specified or zero value, the current time will be used.
Build(tt time.Time) RecordLine
}

type RecordBuilder interface {
Authenticate(username, password string) RecordBuilder
AddRecord(rlb ...RecordLine) RecordBuilder
Build() (*proto.WriteRequest, error)
}

type FieldTuple struct {
record.Field
Value interface{}
}

type RecordBuilderImpl struct {
database string
retentionPolicy string
username string
password string
transform transform
err error
}

func (r *RecordBuilderImpl) reset() {
r.transform.reset()
}

func (r *RecordBuilderImpl) Authenticate(username, password string) RecordBuilder {
r.username = username
r.password = password
return r
}

func NewRecordBuilder(database, retentionPolicy string) RecordBuilder {
return &RecordBuilderImpl{database: database, retentionPolicy: retentionPolicy, transform: make(transform)}
}

func (r *RecordBuilderImpl) AddRecord(rlb ...RecordLine) RecordBuilder {
for _, lineBuilder := range rlb {
lb, ok := lineBuilder.(*RecordLineBuilderImpl)
if !ok {
continue
}
err := r.transform.AppendRecord(lb)
recordLinePool.Put(lb)
if err != nil {
r.err = errors.Join(r.err, err)
continue
}
}
return r
}

func (r *RecordBuilderImpl) Build() (*proto.WriteRequest, error) {
defer r.reset()

if r.err != nil {
return nil, r.err
}

var req = &proto.WriteRequest{
Database: r.database,
RetentionPolicy: r.retentionPolicy,
Username: r.username,
Password: r.password,
}

for mst, rawRecord := range r.transform {
rec, err := rawRecord.ToSrvRecords()
if err != nil {
return nil, fmt.Errorf("failed to convert records: %v", err)
}
var buff []byte
buff, err = rec.Marshal(buff)
if err != nil {
return nil, fmt.Errorf("failed to marshal record: %v", err)
}

req.Records = append(req.Records, &proto.Record{
Measurement: mst,
MinTime: rawRecord.MinTime,
MaxTime: rawRecord.MaxTime,
Block: buff,
})
}

return req, nil
}

type RecordLineBuilderImpl struct {
measurement string
tags []*FieldTuple
fields []*FieldTuple
tt time.Time
compressMethod CompressMethod
built bool

err error
}

func (r *RecordLineBuilderImpl) CompressMethod(method CompressMethod) RecordLineBuilder {
r.compressMethod = method
return r
}

func newRecordLineBuilder(measurement string) *RecordLineBuilderImpl {
r := recordLinePool.Get().(*RecordLineBuilderImpl)
r.measurement = measurement
if len(r.tags) != 0 {
r.tags = r.tags[:0]
}
if len(r.fields) != 0 {
r.fields = r.fields[:0]
}
if !r.tt.IsZero() {
r.tt = time.Time{}
}
r.built = false
r.err = nil
return r
}

func NewRecordLineBuilder(measurement string) RecordLineBuilder {
return newRecordLineBuilder(measurement)
}

func (r *RecordLineBuilderImpl) Error() error {
return r.err
}

func (r *RecordLineBuilderImpl) AddTag(key string, value string) RecordLineBuilder {
if r.built {
r = newRecordLineBuilder(r.measurement)
}
if key == "" {
r.err = errors.Join(r.err, fmt.Errorf("miss tag name: %w", ErrEmptyName))
return r
}
if key == record.TimeField {
r.err = errors.Join(r.err, fmt.Errorf("tag name %s invalid: %w", key, ErrInvalidTimeColumn))
return r
}
r.tags = append(r.tags, &FieldTuple{
Field: record.Field{
Name: key,
Type: record.FieldTypeTag,
},
Value: value,
})
return r
}

func (r *RecordLineBuilderImpl) AddTags(tags map[string]string) RecordLineBuilder {
if r.built {
r = newRecordLineBuilder(r.measurement)
}
for key, value := range tags {
r.AddTag(key, value)
}
return r
}

func (r *RecordLineBuilderImpl) AddField(key string, value interface{}) RecordLineBuilder {
if r.built {
r = newRecordLineBuilder(r.measurement)
}
if key == "" {
r.err = errors.Join(r.err, fmt.Errorf("miss field name: %w", ErrEmptyName))
return r
}
if key == record.TimeField {
r.err = errors.Join(r.err, fmt.Errorf("field name %s invalid: %w", key, ErrInvalidTimeColumn))
return r
}
typ := record.FieldTypeUnknown
switch value.(type) {
case string:
typ = record.FieldTypeString
case float32, float64:
typ = record.FieldTypeFloat
case bool:
typ = record.FieldTypeBoolean
case int8, int16, int32, int64, uint8, uint16, uint32, uint64, int:
typ = record.FieldTypeInt
}
r.fields = append(r.fields, &FieldTuple{
Field: record.Field{
Name: key,
Type: typ,
},
Value: value,
})
return r
}

func (r *RecordLineBuilderImpl) AddFields(fields map[string]interface{}) RecordLineBuilder {
if r.built {
r = newRecordLineBuilder(r.measurement)
}
for key, value := range fields {
r.AddField(key, value)
}
return r
}

func (r *RecordLineBuilderImpl) Build(tt time.Time) RecordLine {
r.built = true
if err := checkMeasurementName(r.measurement); err != nil {
r.err = errors.Join(err, r.err)
}
r.tt = tt
return r
}
Loading
Loading