From afd4ed331d6c71a62b8fbdf061b5bcc78d915e81 Mon Sep 17 00:00:00 2001 From: Lionel Villard Date: Tue, 8 Jun 2021 17:23:41 -0400 Subject: [PATCH] revert to setting consumer.fetch options instead of MaxReponseSize (#694) --- config/source/multi/deployments/adapter.yaml | 16 ++- pkg/source/mtadapter/adapter.go | 133 ++++++++++++------- pkg/source/mtadapter/adapter_test.go | 44 +----- 3 files changed, 96 insertions(+), 97 deletions(-) diff --git a/config/source/multi/deployments/adapter.yaml b/config/source/multi/deployments/adapter.yaml index fd1e7bc4bd..ec1936dae3 100644 --- a/config/source/multi/deployments/adapter.yaml +++ b/config/source/multi/deployments/adapter.yaml @@ -47,9 +47,11 @@ spec: - name: VREPLICA_LIMITS_MPS value: '50' - # The pod requested memory (see resources.requests.memory) - - name: REQUESTS_MEMORY - value: '300Mi' + # The memory limit, per vreplica. Must be a quantity. + # see https://github.com/kubernetes/apimachinery/blob/master/pkg/api/resource/quantity.go#L31 + # Should be (pod requested memory - overhead) / pod capacity (see controller.yaml) + - name: VREPLICA_LIMITS_MEMORY + value: '6Mi' # DO NOT MODIFY: The values below are being filled by the kafka source controller # See 500-controller.yaml @@ -64,11 +66,11 @@ spec: resources: requests: - cpu: 100m - memory: 300Mi + cpu: 1000m + memory: 700Mi # 600Mi for vreplicas + 100Mi overhead limits: - cpu: 200m - memory: 500Mi + cpu: 2000m + memory: 1000Mi ports: - name: metrics diff --git a/pkg/source/mtadapter/adapter.go b/pkg/source/mtadapter/adapter.go index dba2aa0fe3..187ec09b4a 100644 --- a/pkg/source/mtadapter/adapter.go +++ b/pkg/source/mtadapter/adapter.go @@ -19,9 +19,10 @@ package mtadapter import ( "context" "fmt" + "math" + "strconv" "sync" - "github.com/Shopify/sarama" cloudevents "github.com/cloudevents/sdk-go/v2" "go.uber.org/zap" "golang.org/x/time/rate" @@ -42,17 +43,12 @@ import ( "knative.dev/eventing-kafka/pkg/source/client" ) -const ( - responseSizeCap = int32(50 * 1024 * 1024) - maxBrokers = 6 -) - type AdapterConfig struct { adapter.EnvConfig - PodName string `envconfig:"POD_NAME" required:"true"` - MPSLimit int `envconfig:"VREPLICA_LIMITS_MPS" required:"true"` - MemoryRequest string `envconfig:"REQUESTS_MEMORY" required:"true"` + PodName string `envconfig:"POD_NAME" required:"true"` + MPSLimit int `envconfig:"VREPLICA_LIMITS_MPS" required:"true"` + MemoryLimit string `envconfig:"VREPLICA_LIMITS_MEMORY" required:"true"` } func NewEnvConfig() adapter.EnvConfigAccessor { @@ -70,8 +66,7 @@ type Adapter struct { client cloudevents.Client adapterCtor adapter.MessageAdapterConstructor kubeClient kubernetes.Interface - - memoryRequest int64 + memLimit int32 sourcesMu sync.RWMutex sources map[string]cancelContext @@ -87,18 +82,16 @@ func newAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie logger := logging.FromContext(ctx) config := processed.(*AdapterConfig) - sarama.MaxResponseSize = responseSizeCap - - mr := resource.MustParse(config.MemoryRequest) + ml := resource.MustParse(config.MemoryLimit) return &Adapter{ - client: ceClient, - config: config, - logger: logger, - adapterCtor: adapterCtor, - kubeClient: kubeclient.Get(ctx), - memoryRequest: mr.Value(), - sourcesMu: sync.RWMutex{}, - sources: make(map[string]cancelContext), + client: ceClient, + config: config, + logger: logger, + adapterCtor: adapterCtor, + kubeClient: kubeclient.Get(ctx), + memLimit: int32(ml.Value()), + sourcesMu: sync.RWMutex{}, + sources: make(map[string]cancelContext), } } @@ -131,7 +124,6 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) error { // Nothing to stop anymore delete(a.sources, key) - a.adjustResponseSize() } placement := scheduler.GetPlacementForPod(obj.GetPlacements(), a.config.PodName) @@ -189,6 +181,30 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) error { }, } + // Enforce memory limits + if a.memLimit > 0 { + // TODO: periodically enforce limits as the number of partitions can dynamically change + fetchSizePerVReplica, err := a.partitionFetchSize(ctx, logger, &kafkaEnvConfig, obj.Spec.Topics, scheduler.GetPodCount(obj.Status.Placement)) + if err != nil { + return err + } + fetchSize := fetchSizePerVReplica * int(placement.VReplicas) + + // Must handle at least 64k messages to the compliant with the CloudEvent spec + maxFetchSize := fetchSize + if fetchSize < 64*1024 { + maxFetchSize = 64 * 1024 + } + a.logger.Infow("setting partition fetch sizes", zap.Int("min", fetchSize), zap.Int("default", fetchSize), zap.Int("max", maxFetchSize)) + + // TODO: find a better way to interact with the ST adapter. + bufferSizeStr := strconv.Itoa(fetchSize) + min := `\n Min: ` + bufferSizeStr + def := `\n Default: ` + bufferSizeStr + max := `\n Max: ` + strconv.Itoa(maxFetchSize) + kafkaEnvConfig.KafkaConfigJson = `{"SaramaYamlString": "Consumer:\n Fetch:` + min + def + max + `"}` + } + config := stadapter.AdapterConfig{ EnvConfig: adapter.EnvConfig{ Component: "kafkasource", @@ -232,7 +248,6 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) error { } a.sources[key] = cancel - a.adjustResponseSize() go func(ctx context.Context) { err := adapter.Start(ctx) @@ -264,7 +279,6 @@ func (a *Adapter) Remove(obj *v1beta1.KafkaSource) { <-cancel.stopped delete(a.sources, key) - a.adjustResponseSize() a.logger.Infow("source removed", "name", obj.Name, "remaining", len(a.sources)) } @@ -288,32 +302,55 @@ func (a *Adapter) ResolveSecret(ctx context.Context, ns string, ref *corev1.Secr return "", fmt.Errorf("missing secret key or empty secret value (%s/%s)", ref.Name, ref.Key) } -// adjustResponseSize ensures the sum of all Kafka clients memory usage does not exceed the container memory request. -func (a *Adapter) adjustResponseSize() { - if a.memoryRequest > 0 { - maxResponseSize := responseSizeCap +// partitionFetchSize determines what should be the default fetch size (in bytes) +// so that the st adapter memory consumption does not exceed +// the allocated memory per vreplica (see MemoryLimit). +// Account for pod (consumer) partial outage by reducing the +// partition buffer size +func (a *Adapter) partitionFetchSize(ctx context.Context, + logger *zap.SugaredLogger, + kafkaEnvConfig *client.KafkaEnvConfig, + topics []string, + podCount int) (int, error) { + + // Compute the number of partitions handled by this source + // TODO: periodically check for # of resources. Need control-protocol. + adminClient, err := client.MakeAdminClient(ctx, kafkaEnvConfig) + if err != nil { + logger.Errorw("cannot create admin client", zap.Error(err)) + return 0, err + } - if len(a.sources) > 0 { - maxResponseSize = int32(float32(a.memoryRequest) / float32(len(a.sources))) - } - // cap the response size to 50MB. - if maxResponseSize > responseSizeCap { - maxResponseSize = 50 * 1024 * 1024 - } + metas, err := adminClient.DescribeTopics(topics) + if err != nil { + logger.Errorw("cannot describe topics", zap.Error(err)) + return 0, err + } - // maxResponseSize is per connected brokers. - // For now cap the number of connected brokers to maxBrokers until a better solution - // comes along - maxResponseSize = int32(float32(maxResponseSize) / float32(maxBrokers)) + totalPartitions := 0 + for _, meta := range metas { + totalPartitions += len(meta.Partitions) + } + adminClient.Close() - // Check for compliance. - if maxResponseSize < 64*1024 { - // Not CloudEvent compliant (https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#size-limits) - a.logger.Warnw("Kafka response size is lower than 64KB. Increase the pod memory request and/or lower the pod capacity.", - zap.Int32("responseSize", maxResponseSize)) - } + partitionsPerPod := int(math.Ceil(float64(totalPartitions) / float64(podCount))) - sarama.MaxResponseSize = maxResponseSize - a.logger.Infof("Setting MaxResponseSize to %d bytes", sarama.MaxResponseSize) + // Ideally, partitions are evenly spread across Kafka consumers. + // However, due to rebalancing or consumer (un)availability, a consumer + // might have to handle more partitions than expected. + // For now, account for 1 unavailable consumer. + handledPartitions := 2 * partitionsPerPod + if podCount < 3 { + handledPartitions = totalPartitions } + + logger.Infow("partition count", + zap.Int("total", totalPartitions), + zap.Int("averagePerPod", partitionsPerPod), + zap.Int("handled", handledPartitions)) + + // A partition consumes about 2 * fetch partition size + // Once by FetchResponse blocks and a second time when those blocks are converted to messages + // see https://github.com/Shopify/sarama/blob/83d633e6e4f71b402df5e9c53ad5c1c334b7065d/consumer.go#L649 + return int(math.Floor(float64(a.memLimit) / float64(handledPartitions) / 2.0)), nil } diff --git a/pkg/source/mtadapter/adapter_test.go b/pkg/source/mtadapter/adapter_test.go index 6250134cce..f29e6025f7 100644 --- a/pkg/source/mtadapter/adapter_test.go +++ b/pkg/source/mtadapter/adapter_test.go @@ -18,17 +18,14 @@ package mtadapter import ( "context" - "strconv" "testing" "time" - "github.com/Shopify/sarama" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake" - logtesting "knative.dev/pkg/logging/testing" pkgtesting "knative.dev/pkg/reconciler/testing" "knative.dev/pkg/source" @@ -54,7 +51,7 @@ func TestUpdateRemoveSources(t *testing.T) { ctx, _ := pkgtesting.SetupFakeContext(t) ctx, cancelAdapter := context.WithCancel(ctx) - env := &AdapterConfig{PodName: podName, MemoryRequest: "0"} + env := &AdapterConfig{PodName: podName, MemoryLimit: "0"} ceClient := adaptertest.NewTestClient() mtadapter := newAdapter(ctx, env, ceClient, newSampleAdapter).(*Adapter) @@ -355,7 +352,7 @@ func TestSourceMTAdapter(t *testing.T) { ctx, cancelAdapter := context.WithCancel(ctx) - env := &AdapterConfig{PodName: podName, MemoryRequest: "0"} + env := &AdapterConfig{PodName: podName, MemoryLimit: "0"} ceClient := adaptertest.NewTestClient() adapter := newAdapter(ctx, env, ceClient, newSampleAdapter).(*Adapter) @@ -408,40 +405,3 @@ func (d *sampleAdapter) Start(ctx context.Context) error { return nil } - -func TestAdjustResponseSize(t *testing.T) { - testCases := map[string]struct { - memoryRequest int64 - sourceCount int - want int32 - }{ - "no memory request": {memoryRequest: 0, sourceCount: 1, want: 100 * 1024 * 1024}, - "memory request, response size less 64k": {memoryRequest: 128 * 1024, sourceCount: 4, want: 32 * 1024 / maxBrokers}, - "memory request, response size more 64k": {memoryRequest: 512 * 1024, sourceCount: 4, want: 128 * 1024 / maxBrokers}, - "memory request, response size more than cap": {memoryRequest: 100 * 1024 * 1024, sourceCount: 1, want: responseSizeCap / maxBrokers}, - } - - for n, tc := range testCases { - t.Run(n, func(t *testing.T) { - // must run sequentially. - sarama.MaxResponseSize = 100 * 1024 * 1024 - - sources := make(map[string]cancelContext) - for i := 0; i < tc.sourceCount; i++ { - sources["s"+strconv.Itoa(i)] = cancelContext{} - } - - a := Adapter{ - memoryRequest: tc.memoryRequest, - sources: sources, - logger: logtesting.TestLogger(t), - } - a.adjustResponseSize() - - if sarama.MaxResponseSize != tc.want { - t.Errorf("Unexpected MaxResponseSize. wanted %d, got %d", tc.want, sarama.MaxResponseSize) - } - }) - } - -}