diff --git a/cmd/batcher/main.go b/cmd/batcher/main.go index dac3ec721ad..c4768fbd2c8 100644 --- a/cmd/batcher/main.go +++ b/cmd/batcher/main.go @@ -19,11 +19,11 @@ package main import ( "errors" "flag" - "os" - "strconv" "github.com/kubeflow/kfserving/pkg/batcher" "github.com/kubeflow/kfserving/pkg/batcher/controllers" + "os" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "strconv" ) var ( diff --git a/config/default/crds/base/serving.kubeflow.org_inferenceservices.yaml b/config/default/crds/base/serving.kubeflow.org_inferenceservices.yaml index 32d369eb33b..9925029c9b0 100644 --- a/config/default/crds/base/serving.kubeflow.org_inferenceservices.yaml +++ b/config/default/crds/base/serving.kubeflow.org_inferenceservices.yaml @@ -108,6 +108,19 @@ spec: required: - type type: object + batcher: + description: Activate batcher + properties: + maxBatchSize: + description: MaxBatchSize of batcher service + type: integer + maxLatency: + description: MaxLatency of batcher service + type: integer + timeout: + description: Timeout of batcher service + type: integer + type: object custom: description: Spec for a custom explainer properties: @@ -1273,6 +1286,19 @@ spec: predictor: description: Predictor defines the model serving spec properties: + batcher: + description: Activate batcher + properties: + maxBatchSize: + description: MaxBatchSize of batcher service + type: integer + maxLatency: + description: MaxLatency of batcher service + type: integer + timeout: + description: Timeout of batcher service + type: integer + type: object custom: description: Spec for a custom predictor properties: @@ -2692,6 +2718,19 @@ spec: and after the predictor call, transformer service calls to predictor service. properties: + batcher: + description: Activate batcher + properties: + maxBatchSize: + description: MaxBatchSize of batcher service + type: integer + maxLatency: + description: MaxLatency of batcher service + type: integer + timeout: + description: Timeout of batcher service + type: integer + type: object custom: description: Spec for a custom transformer properties: @@ -3916,6 +3955,19 @@ spec: required: - type type: object + batcher: + description: Activate batcher + properties: + maxBatchSize: + description: MaxBatchSize of batcher service + type: integer + maxLatency: + description: MaxLatency of batcher service + type: integer + timeout: + description: Timeout of batcher service + type: integer + type: object custom: description: Spec for a custom explainer properties: @@ -5081,6 +5133,19 @@ spec: predictor: description: Predictor defines the model serving spec properties: + batcher: + description: Activate batcher + properties: + maxBatchSize: + description: MaxBatchSize of batcher service + type: integer + maxLatency: + description: MaxLatency of batcher service + type: integer + timeout: + description: Timeout of batcher service + type: integer + type: object custom: description: Spec for a custom predictor properties: @@ -6500,6 +6565,19 @@ spec: and after the predictor call, transformer service calls to predictor service. properties: + batcher: + description: Activate batcher + properties: + maxBatchSize: + description: MaxBatchSize of batcher service + type: integer + maxLatency: + description: MaxLatency of batcher service + type: integer + timeout: + description: Timeout of batcher service + type: integer + type: object custom: description: Spec for a custom transformer properties: diff --git a/pkg/batcher/controllers/main_controller.go b/pkg/batcher/controllers/main_controller.go index 824f2ec7ef7..fcce7d083fd 100644 --- a/pkg/batcher/controllers/main_controller.go +++ b/pkg/batcher/controllers/main_controller.go @@ -18,18 +18,18 @@ package controllers import ( "bytes" - "fmt" - "io/ioutil" - "net/http" - "sync" - "time" - "errors" "context" "encoding/json" + "errors" + "fmt" "github.com/astaxie/beego" "github.com/go-logr/logr" "github.com/satori/go.uuid" + "io/ioutil" + "net/http" logf "sigs.k8s.io/controller-runtime/pkg/runtime/log" + "sync" + "time" ) const ( @@ -39,10 +39,10 @@ const ( ) var ( - log logr.Logger - channelIn = make(chan Input) + log logr.Logger + channelIn = make(chan Input) batcherInfo BatcherInfo - mutex sync.Mutex + mutex sync.Mutex ) type MainController struct { @@ -55,18 +55,18 @@ type Request struct { type Input struct { ContextInput *context.Context - Instances *[]interface{} - ChannelOut *chan Response + Instances *[]interface{} + ChannelOut *chan Response } type InputInfo struct { ChannelOut *chan Response - Index []int + Index []int } type Response struct { - Message string `json:"message"` - BatchID string `json:"batchId"` + Message string `json:"message"` + BatchID string `json:"batchId"` Predictions []interface{} `json:"predictions"` } @@ -79,20 +79,20 @@ type Predictions struct { } type BatcherInfo struct { - MaxBatchSize int - MaxLatency int - Port string - SvcHost string - SvcPort string - Timeout time.Duration - Path string - ContentType string - BatchID string - Instances []interface{} - Predictions Predictions - Info map[*context.Context] InputInfo - Start time.Time - Now time.Time + MaxBatchSize int + MaxLatency int + Port string + SvcHost string + SvcPort string + Timeout time.Duration + Path string + ContentType string + BatchID string + Instances []interface{} + Predictions Predictions + Info map[*context.Context]InputInfo + Start time.Time + Now time.Time CurrentInputLen int } @@ -155,7 +155,7 @@ func (batcherInfo *BatcherInfo) InitializeInfo() { batcherInfo.CurrentInputLen = 0 batcherInfo.Instances = make([]interface{}, 0) batcherInfo.Predictions = Predictions{} - batcherInfo.Info = make(map[*context.Context] InputInfo) + batcherInfo.Info = make(map[*context.Context]InputInfo) batcherInfo.Start = GetNowTime() batcherInfo.Now = batcherInfo.Start } @@ -166,8 +166,8 @@ func (batcherInfo *BatcherInfo) BatchPredict() { log.Error(errors.New(*err), "") for _, v := range batcherInfo.Info { res := Response{ - Message: *err, - BatchID: "", + Message: *err, + BatchID: "", Predictions: nil, } *v.ChannelOut <- res @@ -180,8 +180,8 @@ func (batcherInfo *BatcherInfo) BatchPredict() { predictions = append(predictions, batcherInfo.Predictions.Predictions[index]) } res := Response{ - Message: "", - BatchID: batcherInfo.BatchID, + Message: "", + BatchID: batcherInfo.BatchID, Predictions: predictions, } if jsonStr, err := json.Marshal(res); err == nil { @@ -198,7 +198,7 @@ func (batcherInfo *BatcherInfo) BatchPredict() { func (batcherInfo *BatcherInfo) Batcher() { for { select { - case req := <- channelIn: + case req := <-channelIn: if len(batcherInfo.Instances) == 0 { batcherInfo.Start = GetNowTime() } @@ -206,14 +206,14 @@ func (batcherInfo *BatcherInfo) Batcher() { batcherInfo.Instances = append(batcherInfo.Instances, *req.Instances...) var index = make([]int, 0) for i := 0; i < len(*req.Instances); i++ { - index = append(index, batcherInfo.CurrentInputLen + i) + index = append(index, batcherInfo.CurrentInputLen+i) } batcherInfo.Info[req.ContextInput] = InputInfo{ req.ChannelOut, index, } batcherInfo.CurrentInputLen = len(batcherInfo.Instances) - case <- time.After(SleepTime): + case <-time.After(SleepTime): } batcherInfo.Now = GetNowTime() if batcherInfo.CurrentInputLen >= batcherInfo.MaxBatchSize || @@ -224,7 +224,7 @@ func (batcherInfo *BatcherInfo) Batcher() { } } -func (batcherInfo *BatcherInfo) Consume() { +func (batcherInfo *BatcherInfo) Consume() { log.Info("Start Consume") if batcherInfo.MaxBatchSize <= 0 { batcherInfo.MaxBatchSize = MaxBatchSize @@ -261,13 +261,13 @@ func (c *MainController) Post() { var ctx = context.Background() var chl = make(chan Response) - channelIn <- Input { + channelIn <- Input{ &ctx, &req.Instances, &chl, } - response := <- chl + response := <-chl close(chl) c.Data["json"] = &response diff --git a/pkg/batcher/routers/router.go b/pkg/batcher/routers/router.go index 142b05acfdd..fdbbbee628f 100644 --- a/pkg/batcher/routers/router.go +++ b/pkg/batcher/routers/router.go @@ -17,10 +17,10 @@ limitations under the License. package routers import ( - "github.com/kubeflow/kfserving/pkg/batcher/controllers" "github.com/astaxie/beego" + "github.com/kubeflow/kfserving/pkg/batcher/controllers" ) func init() { - beego.Router("/*", &controllers.MainController{}) + beego.Router("/*", &controllers.MainController{}) } diff --git a/pkg/batcher/server.go b/pkg/batcher/server.go index 0f2ba58ce22..4741030df5b 100644 --- a/pkg/batcher/server.go +++ b/pkg/batcher/server.go @@ -17,9 +17,9 @@ limitations under the License. package batcher import ( + "github.com/astaxie/beego" _ "github.com/kubeflow/kfserving/pkg/batcher/routers" "github.com/kubeflow/kfserving/pkg/constants" - "github.com/astaxie/beego" "strconv" ) diff --git a/pkg/batcher/server_test.go b/pkg/batcher/server_test.go index 513979773e2..edaf7fe58da 100644 --- a/pkg/batcher/server_test.go +++ b/pkg/batcher/server_test.go @@ -20,10 +20,10 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/astaxie/beego" "github.com/kubeflow/kfserving/pkg/batcher/controllers" "github.com/kubeflow/kfserving/pkg/constants" "github.com/onsi/gomega" - "github.com/astaxie/beego" "io/ioutil" "net/http" "net/http/httptest" @@ -84,7 +84,7 @@ func TestBatcher(t *testing.T) { b2, _ := ioutil.ReadAll(w.Result().Body) var res controllers.Response var predictions controllers.Predictions - _ = json.Unmarshal(b2, &res) + _ = json.Unmarshal(b2, &res) predictions.Predictions = res.Predictions josnStr, _ := json.Marshal(predictions) fmt.Println(string(josnStr)) diff --git a/pkg/webhook/admission/pod/batcher_injector.go b/pkg/webhook/admission/pod/batcher_injector.go index 775b1d39776..e7edf7a031e 100644 --- a/pkg/webhook/admission/pod/batcher_injector.go +++ b/pkg/webhook/admission/pod/batcher_injector.go @@ -26,11 +26,11 @@ import ( ) const ( - BatcherContainerName = "batcher" - BatcherConfigMapKeyName = "batcher" - BatcherArgumentMaxBatchSize = "--max-batchsize" - BatcherArgumentMaxLatency = "--max-latency" - BatcherArgumentTimeout = "--timeout" + BatcherContainerName = "batcher" + BatcherConfigMapKeyName = "batcher" + BatcherArgumentMaxBatchSize = "--max-batchsize" + BatcherArgumentMaxLatency = "--max-latency" + BatcherArgumentTimeout = "--timeout" ) type BatcherConfig struct { @@ -111,7 +111,7 @@ func (il *BatcherInjector) InjectBatcher(pod *v1.Pod) error { batcherContainer := &v1.Container{ Name: BatcherContainerName, Image: il.config.Image, - Args: args, + Args: args, Resources: v1.ResourceRequirements{ Limits: map[v1.ResourceName]resource.Quantity{ v1.ResourceCPU: resource.MustParse(il.config.CpuLimit), diff --git a/pkg/webhook/admission/pod/batcher_injector_test.go b/pkg/webhook/admission/pod/batcher_injector_test.go index bc89aacecb4..f7798b2182a 100644 --- a/pkg/webhook/admission/pod/batcher_injector_test.go +++ b/pkg/webhook/admission/pod/batcher_injector_test.go @@ -65,10 +65,10 @@ func TestBatcherInjector(t *testing.T) { Name: "deployment", Namespace: "default", Annotations: map[string]string{ - constants.BatcherInternalAnnotationKey: "true", + constants.BatcherInternalAnnotationKey: "true", constants.BatcherMaxBatchSizeInternalAnnotationKey: "32", - constants.BatcherMaxLatencyInternalAnnotationKey: "5000", - constants.BatcherTimeoutInternalAnnotationKey: "60", + constants.BatcherMaxLatencyInternalAnnotationKey: "5000", + constants.BatcherTimeoutInternalAnnotationKey: "60", }, Labels: map[string]string{ "serving.kubeflow.org/inferenceservice": "sklearn", @@ -87,10 +87,10 @@ func TestBatcherInjector(t *testing.T) { ObjectMeta: metav1.ObjectMeta{ Name: "deployment", Annotations: map[string]string{ - constants.BatcherInternalAnnotationKey: "true", + constants.BatcherInternalAnnotationKey: "true", constants.BatcherMaxBatchSizeInternalAnnotationKey: "32", - constants.BatcherMaxLatencyInternalAnnotationKey: "5000", - constants.BatcherTimeoutInternalAnnotationKey: "60", + constants.BatcherMaxLatencyInternalAnnotationKey: "5000", + constants.BatcherTimeoutInternalAnnotationKey: "60", }, }, Spec: v1.PodSpec{ @@ -101,7 +101,7 @@ func TestBatcherInjector(t *testing.T) { { Name: BatcherContainerName, Image: batcherConfig.Image, - Args: []string { + Args: []string{ BatcherArgumentMaxBatchSize, "32", BatcherArgumentMaxLatency,