Skip to content

Commit

Permalink
Sync InsertRows and config related PR to master (#532)
Browse files Browse the repository at this point in the history
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
  • Loading branch information
congqixia authored Aug 1, 2023
1 parent 1d5c182 commit 65d9013
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 25 deletions.
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,11 @@ type Client interface {

// CreateCollectionByRow create collection by row
CreateCollectionByRow(ctx context.Context, row entity.Row, shardNum int32) error
// DEPRECATED
// InsertByRows insert by rows
InsertByRows(ctx context.Context, collName string, paritionName string, rows []entity.Row) (entity.Column, error)
// InsertRows insert with row base data.
InsertRows(ctx context.Context, collName string, partitionName string, rows []interface{}) (entity.Column, error)

// ManualCompaction triggers a compaction on provided collection
ManualCompaction(ctx context.Context, collName string, toleranceDuration time.Duration) (int64, error)
Expand Down
31 changes: 28 additions & 3 deletions client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,18 @@ type Config struct {

parsedAddress *url.URL

RetryRateLimit *RetryRateLimitOption // option for retry on rate limit inteceptor

DisableConn bool

flags uint64 // internal flags
}

type RetryRateLimitOption struct {
MaxRetry uint
MaxBackoff time.Duration
}

// Copy a new config, dialOption may shared with old config.
func (c *Config) Copy() Config {
newConfig := Config{
Expand Down Expand Up @@ -105,6 +112,9 @@ func (c *Config) parse() error {
if remoteURL.Scheme == "https" {
c.EnableTLSAuth = true
}
if remoteURL.Port() == "" && c.EnableTLSAuth {
remoteURL.Host += ":443"
}
c.parsedAddress = remoteURL
return nil
}
Expand Down Expand Up @@ -147,9 +157,7 @@ func (c *Config) getDialOption() []grpc.DialOption {
return 60 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
}),
grpc_retry.WithCodes(codes.Unavailable, codes.ResourceExhausted)),
RetryOnRateLimitInterceptor(75, func(ctx context.Context, attempt uint) time.Duration {
return 10 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
}),
c.getRetryOnRateLimitInterceptor(),
))

options = append(options, grpc.WithChainUnaryInterceptor(
Expand All @@ -158,6 +166,23 @@ func (c *Config) getDialOption() []grpc.DialOption {
return options
}

func (c *Config) getRetryOnRateLimitInterceptor() grpc.UnaryClientInterceptor {
if c.RetryRateLimit == nil {
c.RetryRateLimit = c.defaultRetryRateLimitOption()
}

return RetryOnRateLimitInterceptor(c.RetryRateLimit.MaxRetry, c.RetryRateLimit.MaxBackoff, func(ctx context.Context, attempt uint) time.Duration {
return 10 * time.Millisecond * time.Duration(math.Pow(3, float64(attempt)))
})
}

func (c *Config) defaultRetryRateLimitOption() *RetryRateLimitOption {
return &RetryRateLimitOption{
MaxRetry: 75,
MaxBackoff: 3 * time.Second,
}
}

// addFlags set internal flags
func (c *Config) addFlags(flags uint64) {
c.flags |= flags
Expand Down
2 changes: 1 addition & 1 deletion client/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestClientConfig(t *testing.T) {
&Config{
Address: "https://xxxx-xxxx-xxxxx.com",
},
"xxxx-xxxx-xxxxx.com", "", true, false,
"xxxx-xxxx-xxxxx.com:443", "", true, false,
)
// remote https host with dbname
assertConfig(
Expand Down
13 changes: 5 additions & 8 deletions client/rate_limit_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,24 +37,21 @@ const (
RetryOnRateLimit ctxKey = iota
)

var MaxBackOff = 3 * time.Second

// RetryOnRateLimitInterceptor returns a new retrying unary client interceptor.
func RetryOnRateLimitInterceptor(maxRetry uint, backoffFunc grpc_retry.BackoffFuncContext) grpc.UnaryClientInterceptor {
func RetryOnRateLimitInterceptor(maxRetry uint, maxBackoff time.Duration, backoffFunc grpc_retry.BackoffFuncContext) grpc.UnaryClientInterceptor {
return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
if maxRetry == 0 {
return invoker(parentCtx, method, req, reply, cc, opts...)
}
var lastErr error
for attempt := uint(0); attempt < maxRetry; attempt++ {
_, err := waitRetryBackoff(parentCtx, attempt, backoffFunc)
_, err := waitRetryBackoff(parentCtx, attempt, maxBackoff, backoffFunc)
if err != nil {
return err
}
lastErr = invoker(parentCtx, method, req, reply, cc, opts...)
rspStatus := getResultStatus(reply)
if retryOnRateLimit(parentCtx) && rspStatus.GetErrorCode() == common.ErrorCode_RateLimit {
//log.Printf("rate limit retry attempt: %d, backoff for %v, reson: %v\n", attempt, backoff, rspStatus.GetReason())
continue
}
return lastErr
Expand Down Expand Up @@ -102,14 +99,14 @@ func contextErrToGrpcErr(err error) error {
}
}

func waitRetryBackoff(parentCtx context.Context, attempt uint, backoffFunc grpc_retry.BackoffFuncContext) (time.Duration, error) {
func waitRetryBackoff(parentCtx context.Context, attempt uint, maxBackoff time.Duration, backoffFunc grpc_retry.BackoffFuncContext) (time.Duration, error) {
var waitTime time.Duration
if attempt > 0 {
waitTime = backoffFunc(parentCtx, attempt)
}
if waitTime > 0 {
if waitTime > MaxBackOff {
waitTime = MaxBackOff
if waitTime > maxBackoff {
waitTime = maxBackoff
}
timer := time.NewTimer(waitTime)
select {
Expand Down
3 changes: 2 additions & 1 deletion client/rate_limit_interceptor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func resetMockInvokeTimes() {

func TestRateLimitInterceptor(t *testing.T) {
maxRetry := uint(3)
inter := RetryOnRateLimitInterceptor(maxRetry, func(ctx context.Context, attempt uint) time.Duration {
maxBackoff := 3 * time.Second
inter := RetryOnRateLimitInterceptor(maxRetry, maxBackoff, func(ctx context.Context, attempt uint) time.Duration {
return 60 * time.Millisecond * time.Duration(math.Pow(2, float64(attempt)))
})

Expand Down
14 changes: 13 additions & 1 deletion client/row.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,18 @@ func (c *GrpcClient) CreateCollectionByRow(ctx context.Context, row entity.Row,
// InsertByRows insert by rows
func (c *GrpcClient) InsertByRows(ctx context.Context, collName string, partitionName string,
rows []entity.Row) (entity.Column, error) {
anys := make([]interface{}, 0, len(rows))
for _, row := range rows {
anys = append(anys, row)
}

return c.InsertRows(ctx, collName, partitionName, anys)
}

// InsertRows allows insert with row based data
// rows could be struct or map.
func (c *GrpcClient) InsertRows(ctx context.Context, collName string, partitionName string,
rows []interface{}) (entity.Column, error) {
if c.Service == nil {
return nil, ErrClientNotReady
}
Expand All @@ -81,7 +93,7 @@ func (c *GrpcClient) InsertByRows(ctx context.Context, collName string, partitio
return nil, err
}
// 1. convert rows to columns
columns, err := entity.RowsToColumns(rows, coll.Schema)
columns, err := entity.AnyToColumns(rows, coll.Schema)
if err != nil {
return nil, err
}
Expand Down
52 changes: 41 additions & 11 deletions entity/rows.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,16 +89,20 @@ func (b RowBase) Description() string {
return ""
}

// ParseSchema parse Schema from row interface
func ParseSchema(r Row) (*Schema, error) {
sch := &Schema{
CollectionName: r.Collection(),
Description: r.Description(),
}
// ParseSchemaAny parses schema from interface{}.
func ParseSchemaAny(r interface{}) (*Schema, error) {
sch := &Schema{}
t := reflect.TypeOf(r)
if t.Kind() == reflect.Array || t.Kind() == reflect.Slice || t.Kind() == reflect.Ptr {
t = t.Elem()
}

// MapRow is not supported for schema definition
// TODO add PrimaryKey() interface later
if t.Kind() == reflect.Map {
return nil, fmt.Errorf("map row is not supported for schema definition")
}

if t.Kind() != reflect.Struct {
return nil, fmt.Errorf("unsupported data type: %+v", r)
}
Expand Down Expand Up @@ -208,6 +212,21 @@ func ParseSchema(r Row) (*Schema, error) {
return sch, nil
}

// ParseSchema parse Schema from row interface
func ParseSchema(r Row) (*Schema, error) {
schema, err := ParseSchemaAny(r)
if err != nil {
return nil, err
}
if r.Collection() != "" {
schema.CollectionName = r.Collection()
}
if schema.Description != "" {
schema.Description = r.Description()
}
return schema, nil
}

// ParseTagSetting parses struct tag into map settings
func ParseTagSetting(str string, sep string) map[string]string {
settings := map[string]string{}
Expand Down Expand Up @@ -240,8 +259,7 @@ func ParseTagSetting(str string, sep string) map[string]string {
return settings
}

// RowsToColumns rows to columns
func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
func AnyToColumns(rows []interface{}, schemas ...*Schema) ([]Column, error) {
rowsLen := len(rows)
if rowsLen == 0 {
return []Column{}, errors.New("0 length column")
Expand All @@ -251,7 +269,7 @@ func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
var err error
// if schema not provided, try to parse from row
if len(schemas) == 0 {
sch, err = ParseSchema(rows[0])
sch, err = ParseSchemaAny(rows[0])
if err != nil {
return []Column{}, err
}
Expand Down Expand Up @@ -298,7 +316,7 @@ func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
data := make([]float64, 0, rowsLen)
col := NewColumnDouble(field.Name, data)
nameColumns[field.Name] = col
case FieldTypeString:
case FieldTypeString, FieldTypeVarChar:
data := make([]string, 0, rowsLen)
col := NewColumnString(field.Name, data)
nameColumns[field.Name] = col
Expand Down Expand Up @@ -356,7 +374,10 @@ func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
delete(set, field.Name)
continue
}
column := nameColumns[field.Name]
column, ok := nameColumns[field.Name]
if !ok {
return nil, fmt.Errorf("expected unhandled field %s", field.Name)
}

candi, ok := set[field.Name]
if !ok {
Expand Down Expand Up @@ -394,6 +415,15 @@ func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
return columns, nil
}

// RowsToColumns rows to columns
func RowsToColumns(rows []Row, schemas ...*Schema) ([]Column, error) {
anys := make([]interface{}, 0, len(rows))
for _, row := range rows {
anys = append(anys, row)
}
return AnyToColumns(anys, schemas...)
}

type fieldCandi struct {
name string
v reflect.Value
Expand Down

0 comments on commit 65d9013

Please sign in to comment.