Skip to content

Commit

Permalink
Fix batcher format issue and generate CRD in serving.kubeflow.org_inf…
Browse files Browse the repository at this point in the history
…erenceservices.yaml. (kubeflow#896)

* Fix batcher format issue and generate CRD in serving.kubeflow.org_inferenceservices.yaml.

* Fix batcher format issue and generate CRD in serving.kubeflow.org_inferenceservices.yaml.
  • Loading branch information
zhangrongguo authored Jun 22, 2020
1 parent b4340c4 commit 2eb4f36
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 59 deletions.
4 changes: 2 additions & 2 deletions cmd/batcher/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ package main
import (
"errors"
"flag"
"os"
"strconv"
"github.com/kubeflow/kfserving/pkg/batcher"
"github.com/kubeflow/kfserving/pkg/batcher/controllers"
"os"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"strconv"
)

var (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ spec:
required:
- type
type: object
batcher:
description: Activate batcher
properties:
maxBatchSize:
description: MaxBatchSize of batcher service
type: integer
maxLatency:
description: MaxLatency of batcher service
type: integer
timeout:
description: Timeout of batcher service
type: integer
type: object
custom:
description: Spec for a custom explainer
properties:
Expand Down Expand Up @@ -1273,6 +1286,19 @@ spec:
predictor:
description: Predictor defines the model serving spec
properties:
batcher:
description: Activate batcher
properties:
maxBatchSize:
description: MaxBatchSize of batcher service
type: integer
maxLatency:
description: MaxLatency of batcher service
type: integer
timeout:
description: Timeout of batcher service
type: integer
type: object
custom:
description: Spec for a custom predictor
properties:
Expand Down Expand Up @@ -2692,6 +2718,19 @@ spec:
and after the predictor call, transformer service calls to predictor
service.
properties:
batcher:
description: Activate batcher
properties:
maxBatchSize:
description: MaxBatchSize of batcher service
type: integer
maxLatency:
description: MaxLatency of batcher service
type: integer
timeout:
description: Timeout of batcher service
type: integer
type: object
custom:
description: Spec for a custom transformer
properties:
Expand Down Expand Up @@ -3916,6 +3955,19 @@ spec:
required:
- type
type: object
batcher:
description: Activate batcher
properties:
maxBatchSize:
description: MaxBatchSize of batcher service
type: integer
maxLatency:
description: MaxLatency of batcher service
type: integer
timeout:
description: Timeout of batcher service
type: integer
type: object
custom:
description: Spec for a custom explainer
properties:
Expand Down Expand Up @@ -5081,6 +5133,19 @@ spec:
predictor:
description: Predictor defines the model serving spec
properties:
batcher:
description: Activate batcher
properties:
maxBatchSize:
description: MaxBatchSize of batcher service
type: integer
maxLatency:
description: MaxLatency of batcher service
type: integer
timeout:
description: Timeout of batcher service
type: integer
type: object
custom:
description: Spec for a custom predictor
properties:
Expand Down Expand Up @@ -6500,6 +6565,19 @@ spec:
and after the predictor call, transformer service calls to predictor
service.
properties:
batcher:
description: Activate batcher
properties:
maxBatchSize:
description: MaxBatchSize of batcher service
type: integer
maxLatency:
description: MaxLatency of batcher service
type: integer
timeout:
description: Timeout of batcher service
type: integer
type: object
custom:
description: Spec for a custom transformer
properties:
Expand Down
78 changes: 39 additions & 39 deletions pkg/batcher/controllers/main_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ package controllers

import (
"bytes"
"fmt"
"io/ioutil"
"net/http"
"sync"
"time"
"errors"
"context"
"encoding/json"
"errors"
"fmt"
"github.com/astaxie/beego"
"github.com/go-logr/logr"
"github.com/satori/go.uuid"
"io/ioutil"
"net/http"
logf "sigs.k8s.io/controller-runtime/pkg/runtime/log"
"sync"
"time"
)

const (
Expand All @@ -39,10 +39,10 @@ const (
)

var (
log logr.Logger
channelIn = make(chan Input)
log logr.Logger
channelIn = make(chan Input)
batcherInfo BatcherInfo
mutex sync.Mutex
mutex sync.Mutex
)

type MainController struct {
Expand All @@ -55,18 +55,18 @@ type Request struct {

type Input struct {
ContextInput *context.Context
Instances *[]interface{}
ChannelOut *chan Response
Instances *[]interface{}
ChannelOut *chan Response
}

type InputInfo struct {
ChannelOut *chan Response
Index []int
Index []int
}

type Response struct {
Message string `json:"message"`
BatchID string `json:"batchId"`
Message string `json:"message"`
BatchID string `json:"batchId"`
Predictions []interface{} `json:"predictions"`
}

Expand All @@ -79,20 +79,20 @@ type Predictions struct {
}

type BatcherInfo struct {
MaxBatchSize int
MaxLatency int
Port string
SvcHost string
SvcPort string
Timeout time.Duration
Path string
ContentType string
BatchID string
Instances []interface{}
Predictions Predictions
Info map[*context.Context] InputInfo
Start time.Time
Now time.Time
MaxBatchSize int
MaxLatency int
Port string
SvcHost string
SvcPort string
Timeout time.Duration
Path string
ContentType string
BatchID string
Instances []interface{}
Predictions Predictions
Info map[*context.Context]InputInfo
Start time.Time
Now time.Time
CurrentInputLen int
}

Expand Down Expand Up @@ -155,7 +155,7 @@ func (batcherInfo *BatcherInfo) InitializeInfo() {
batcherInfo.CurrentInputLen = 0
batcherInfo.Instances = make([]interface{}, 0)
batcherInfo.Predictions = Predictions{}
batcherInfo.Info = make(map[*context.Context] InputInfo)
batcherInfo.Info = make(map[*context.Context]InputInfo)
batcherInfo.Start = GetNowTime()
batcherInfo.Now = batcherInfo.Start
}
Expand All @@ -166,8 +166,8 @@ func (batcherInfo *BatcherInfo) BatchPredict() {
log.Error(errors.New(*err), "")
for _, v := range batcherInfo.Info {
res := Response{
Message: *err,
BatchID: "",
Message: *err,
BatchID: "",
Predictions: nil,
}
*v.ChannelOut <- res
Expand All @@ -180,8 +180,8 @@ func (batcherInfo *BatcherInfo) BatchPredict() {
predictions = append(predictions, batcherInfo.Predictions.Predictions[index])
}
res := Response{
Message: "",
BatchID: batcherInfo.BatchID,
Message: "",
BatchID: batcherInfo.BatchID,
Predictions: predictions,
}
if jsonStr, err := json.Marshal(res); err == nil {
Expand All @@ -198,22 +198,22 @@ func (batcherInfo *BatcherInfo) BatchPredict() {
func (batcherInfo *BatcherInfo) Batcher() {
for {
select {
case req := <- channelIn:
case req := <-channelIn:
if len(batcherInfo.Instances) == 0 {
batcherInfo.Start = GetNowTime()
}
batcherInfo.CurrentInputLen = len(batcherInfo.Instances)
batcherInfo.Instances = append(batcherInfo.Instances, *req.Instances...)
var index = make([]int, 0)
for i := 0; i < len(*req.Instances); i++ {
index = append(index, batcherInfo.CurrentInputLen + i)
index = append(index, batcherInfo.CurrentInputLen+i)
}
batcherInfo.Info[req.ContextInput] = InputInfo{
req.ChannelOut,
index,
}
batcherInfo.CurrentInputLen = len(batcherInfo.Instances)
case <- time.After(SleepTime):
case <-time.After(SleepTime):
}
batcherInfo.Now = GetNowTime()
if batcherInfo.CurrentInputLen >= batcherInfo.MaxBatchSize ||
Expand All @@ -224,7 +224,7 @@ func (batcherInfo *BatcherInfo) Batcher() {
}
}

func (batcherInfo *BatcherInfo) Consume() {
func (batcherInfo *BatcherInfo) Consume() {
log.Info("Start Consume")
if batcherInfo.MaxBatchSize <= 0 {
batcherInfo.MaxBatchSize = MaxBatchSize
Expand Down Expand Up @@ -261,13 +261,13 @@ func (c *MainController) Post() {

var ctx = context.Background()
var chl = make(chan Response)
channelIn <- Input {
channelIn <- Input{
&ctx,
&req.Instances,
&chl,
}

response := <- chl
response := <-chl
close(chl)

c.Data["json"] = &response
Expand Down
4 changes: 2 additions & 2 deletions pkg/batcher/routers/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ limitations under the License.
package routers

import (
"github.com/kubeflow/kfserving/pkg/batcher/controllers"
"github.com/astaxie/beego"
"github.com/kubeflow/kfserving/pkg/batcher/controllers"
)

func init() {
beego.Router("/*", &controllers.MainController{})
beego.Router("/*", &controllers.MainController{})
}
2 changes: 1 addition & 1 deletion pkg/batcher/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ limitations under the License.
package batcher

import (
"github.com/astaxie/beego"
_ "github.com/kubeflow/kfserving/pkg/batcher/routers"
"github.com/kubeflow/kfserving/pkg/constants"
"github.com/astaxie/beego"
"strconv"
)

Expand Down
4 changes: 2 additions & 2 deletions pkg/batcher/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ import (
"bytes"
"encoding/json"
"fmt"
"github.com/astaxie/beego"
"github.com/kubeflow/kfserving/pkg/batcher/controllers"
"github.com/kubeflow/kfserving/pkg/constants"
"github.com/onsi/gomega"
"github.com/astaxie/beego"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestBatcher(t *testing.T) {
b2, _ := ioutil.ReadAll(w.Result().Body)
var res controllers.Response
var predictions controllers.Predictions
_ = json.Unmarshal(b2, &res)
_ = json.Unmarshal(b2, &res)
predictions.Predictions = res.Predictions
josnStr, _ := json.Marshal(predictions)
fmt.Println(string(josnStr))
Expand Down
12 changes: 6 additions & 6 deletions pkg/webhook/admission/pod/batcher_injector.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import (
)

const (
BatcherContainerName = "batcher"
BatcherConfigMapKeyName = "batcher"
BatcherArgumentMaxBatchSize = "--max-batchsize"
BatcherArgumentMaxLatency = "--max-latency"
BatcherArgumentTimeout = "--timeout"
BatcherContainerName = "batcher"
BatcherConfigMapKeyName = "batcher"
BatcherArgumentMaxBatchSize = "--max-batchsize"
BatcherArgumentMaxLatency = "--max-latency"
BatcherArgumentTimeout = "--timeout"
)

type BatcherConfig struct {
Expand Down Expand Up @@ -111,7 +111,7 @@ func (il *BatcherInjector) InjectBatcher(pod *v1.Pod) error {
batcherContainer := &v1.Container{
Name: BatcherContainerName,
Image: il.config.Image,
Args: args,
Args: args,
Resources: v1.ResourceRequirements{
Limits: map[v1.ResourceName]resource.Quantity{
v1.ResourceCPU: resource.MustParse(il.config.CpuLimit),
Expand Down
Loading

0 comments on commit 2eb4f36

Please sign in to comment.