Skip to content

Commit

Permalink
Unify the format of cdc http response
Browse files Browse the repository at this point in the history
Signed-off-by: SimFG <bang.fu@zilliz.com>
  • Loading branch information
SimFG committed Nov 1, 2023
1 parent 804d1a5 commit 0f76a8c
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 15 deletions.
14 changes: 7 additions & 7 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
name: Test

on:
push:
branches:
- main
pull_request:
branches:
- main
#on:
# push:
# branches:
# - main
# pull_request:
# branches:
# - main

jobs:
milvus-cdc-test:
Expand Down
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ coverage:
status:
project:
default:
threshold: 0% #Allow the coverage to drop by threshold%, and posting a success status.
threshold: 80% #Allow the coverage to drop by threshold%, and posting a success status.
branches:
- main
patch:
Expand Down
12 changes: 8 additions & 4 deletions server/model/request/base.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,20 +31,24 @@ const (
List = "list"
)

//go:generate easytags $GOFILE json,mapstructure

type CDCRequest struct {
RequestType string `json:"request_type" mapstructure:"request_type"`
RequestData map[string]any `json:"request_data" mapstructure:"request_data"`
}

type CDCResponse struct {
Code int `json:"code" mapstructure:"code"`
Message string `json:"message" mapstructure:"message"`
Data map[string]any `json:"data" mapstructure:"data"`
}

// Task some info can be showed about the task
type Task struct {
TaskID string `json:"task_id" mapstructure:"task_id"`
MilvusConnectParam model.MilvusConnectParam `json:"milvus_connect_param" mapstructure:"milvus_connect_param"`
CollectionInfos []model.CollectionInfo `json:"collection_infos" mapstructure:"collection_infos"`
State string `json:"state" mapstructure:"state"`
LastFailReason string `json:"reason,omitempty" mapstructure:"reason"`
LastPauseReason string `json:"reason" mapstructure:"reason"`
}

func GetTask(taskInfo *meta.TaskInfo) Task {
Expand All @@ -55,6 +59,6 @@ func GetTask(taskInfo *meta.TaskInfo) Task {
MilvusConnectParam: taskInfo.MilvusConnectParam,
CollectionInfos: taskInfo.CollectionInfos,
State: taskInfo.State.String(),
LastFailReason: taskInfo.Reason,
LastPauseReason: taskInfo.Reason,
}
}
21 changes: 18 additions & 3 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (c *CDCServer) Run(config *CDCServerConfig) {
func (c *CDCServer) getCDCHandler() http.Handler {
return http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
startTime := time.Now()
writer.Header().Set("Content-Type", "application/json")
if request.Method != http.MethodPost {
c.handleError(writer, "only support the POST method", http.StatusMethodNotAllowed,
zap.String("method", request.Method))
Expand All @@ -79,8 +80,18 @@ func (c *CDCServer) getCDCHandler() http.Handler {
response := c.handleRequest(cdcRequest, writer)

if response != nil {
writer.Header().Set("Content-Type", "application/json")
_ = json.NewEncoder(writer).Encode(response)
var m map[string]interface{}
err = mapstructure.Decode(response, &m)
if err != nil {
log.Warn("fail to decode the response", zap.Any("resp", response), zap.Error(err))
c.handleError(writer, err.Error(), http.StatusInternalServerError)
return
}
realResp := &modelrequest.CDCResponse{
Code: 200,
Data: m,
}
_ = json.NewEncoder(writer).Encode(realResp)
metrics.TaskRequestCountVec.WithLabelValues(cdcRequest.RequestType, metrics.SuccessStatusLabel).Inc()
metrics.TaskRequestLatencyVec.WithLabelValues(cdcRequest.RequestType).Observe(float64(time.Since(startTime).Milliseconds()))
}
Expand All @@ -89,7 +100,11 @@ func (c *CDCServer) getCDCHandler() http.Handler {

func (c *CDCServer) handleError(w http.ResponseWriter, error string, code int, fields ...zap.Field) {
log.Warn(error, fields...)
http.Error(w, error, code)
errResp := &modelrequest.CDCResponse{
Code: code,
Message: error,
}
_ = json.NewEncoder(w).Encode(errResp)
}

func (c *CDCServer) handleRequest(cdcRequest *modelrequest.CDCRequest, writer http.ResponseWriter) any {
Expand Down
47 changes: 47 additions & 0 deletions server/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@ import (
"testing"

"github.com/cockroachdb/errors"
"github.com/milvus-io/milvus/pkg/log"
"github.com/mitchellh/mapstructure"
"github.com/stretchr/testify/assert"
"go.uber.org/zap"

cdcerror "github.com/zilliztech/milvus-cdc/server/error"
"github.com/zilliztech/milvus-cdc/server/model"
"github.com/zilliztech/milvus-cdc/server/model/request"
)

Expand Down Expand Up @@ -209,3 +213,46 @@ func TestCDCHandler(t *testing.T) {
assert.Contains(t, string(responseWriter.resp), taskID)
})
}

func TestDecodeStruct(t *testing.T) {
t.Run("err", func(t *testing.T) {
buf := bytes.NewBufferString("")
errResp := &request.CDCResponse{
Code: 500,
Message: "error msg",
}
_ = json.NewEncoder(buf).Encode(errResp)
log.Warn("err", zap.Any("resp", buf.String()))
})

t.Run("success", func(t *testing.T) {
buf := bytes.NewBufferString("")
var m map[string]interface{}
response := &request.ListResponse{
Tasks: []request.Task{
{
TaskID: "123",
MilvusConnectParam: model.MilvusConnectParam{
Host: "localhost",
Port: 19530,
},
CollectionInfos: []model.CollectionInfo{
{
Name: "foo",
},
},
State: "Running",
LastPauseReason: "receive the pause request",
},
},
}

_ = mapstructure.Decode(response, &m)
realResp := &request.CDCResponse{
Code: 200,
Data: m,
}
_ = json.NewEncoder(buf).Encode(realResp)
log.Warn("err", zap.Any("resp", buf.String()))
})
}

0 comments on commit 0f76a8c

Please sign in to comment.