Skip to content
This repository has been archived by the owner on Sep 2, 2024. It is now read-only.

Commit

Permalink
revert to setting consumer.fetch options instead of MaxReponseSize (#694
Browse files Browse the repository at this point in the history
)
  • Loading branch information
lionelvillard authored Jun 8, 2021
1 parent 000dd9c commit afd4ed3
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 97 deletions.
16 changes: 9 additions & 7 deletions config/source/multi/deployments/adapter.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,11 @@ spec:
- name: VREPLICA_LIMITS_MPS
value: '50'

# The pod requested memory (see resources.requests.memory)
- name: REQUESTS_MEMORY
value: '300Mi'
# The memory limit, per vreplica. Must be a quantity.
# see https://github.com/kubernetes/apimachinery/blob/master/pkg/api/resource/quantity.go#L31
# Should be (pod requested memory - overhead) / pod capacity (see controller.yaml)
- name: VREPLICA_LIMITS_MEMORY
value: '6Mi'

# DO NOT MODIFY: The values below are being filled by the kafka source controller
# See 500-controller.yaml
Expand All @@ -64,11 +66,11 @@ spec:

resources:
requests:
cpu: 100m
memory: 300Mi
cpu: 1000m
memory: 700Mi # 600Mi for vreplicas + 100Mi overhead
limits:
cpu: 200m
memory: 500Mi
cpu: 2000m
memory: 1000Mi

ports:
- name: metrics
Expand Down
133 changes: 85 additions & 48 deletions pkg/source/mtadapter/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package mtadapter
import (
"context"
"fmt"
"math"
"strconv"
"sync"

"github.com/Shopify/sarama"
cloudevents "github.com/cloudevents/sdk-go/v2"
"go.uber.org/zap"
"golang.org/x/time/rate"
Expand All @@ -42,17 +43,12 @@ import (
"knative.dev/eventing-kafka/pkg/source/client"
)

const (
responseSizeCap = int32(50 * 1024 * 1024)
maxBrokers = 6
)

type AdapterConfig struct {
adapter.EnvConfig

PodName string `envconfig:"POD_NAME" required:"true"`
MPSLimit int `envconfig:"VREPLICA_LIMITS_MPS" required:"true"`
MemoryRequest string `envconfig:"REQUESTS_MEMORY" required:"true"`
PodName string `envconfig:"POD_NAME" required:"true"`
MPSLimit int `envconfig:"VREPLICA_LIMITS_MPS" required:"true"`
MemoryLimit string `envconfig:"VREPLICA_LIMITS_MEMORY" required:"true"`
}

func NewEnvConfig() adapter.EnvConfigAccessor {
Expand All @@ -70,8 +66,7 @@ type Adapter struct {
client cloudevents.Client
adapterCtor adapter.MessageAdapterConstructor
kubeClient kubernetes.Interface

memoryRequest int64
memLimit int32

sourcesMu sync.RWMutex
sources map[string]cancelContext
Expand All @@ -87,18 +82,16 @@ func newAdapter(ctx context.Context, processed adapter.EnvConfigAccessor, ceClie
logger := logging.FromContext(ctx)
config := processed.(*AdapterConfig)

sarama.MaxResponseSize = responseSizeCap

mr := resource.MustParse(config.MemoryRequest)
ml := resource.MustParse(config.MemoryLimit)
return &Adapter{
client: ceClient,
config: config,
logger: logger,
adapterCtor: adapterCtor,
kubeClient: kubeclient.Get(ctx),
memoryRequest: mr.Value(),
sourcesMu: sync.RWMutex{},
sources: make(map[string]cancelContext),
client: ceClient,
config: config,
logger: logger,
adapterCtor: adapterCtor,
kubeClient: kubeclient.Get(ctx),
memLimit: int32(ml.Value()),
sourcesMu: sync.RWMutex{},
sources: make(map[string]cancelContext),
}
}

Expand Down Expand Up @@ -131,7 +124,6 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) error {

// Nothing to stop anymore
delete(a.sources, key)
a.adjustResponseSize()
}

placement := scheduler.GetPlacementForPod(obj.GetPlacements(), a.config.PodName)
Expand Down Expand Up @@ -189,6 +181,30 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) error {
},
}

// Enforce memory limits
if a.memLimit > 0 {
// TODO: periodically enforce limits as the number of partitions can dynamically change
fetchSizePerVReplica, err := a.partitionFetchSize(ctx, logger, &kafkaEnvConfig, obj.Spec.Topics, scheduler.GetPodCount(obj.Status.Placement))
if err != nil {
return err
}
fetchSize := fetchSizePerVReplica * int(placement.VReplicas)

// Must handle at least 64k messages to the compliant with the CloudEvent spec
maxFetchSize := fetchSize
if fetchSize < 64*1024 {
maxFetchSize = 64 * 1024
}
a.logger.Infow("setting partition fetch sizes", zap.Int("min", fetchSize), zap.Int("default", fetchSize), zap.Int("max", maxFetchSize))

// TODO: find a better way to interact with the ST adapter.
bufferSizeStr := strconv.Itoa(fetchSize)
min := `\n Min: ` + bufferSizeStr
def := `\n Default: ` + bufferSizeStr
max := `\n Max: ` + strconv.Itoa(maxFetchSize)
kafkaEnvConfig.KafkaConfigJson = `{"SaramaYamlString": "Consumer:\n Fetch:` + min + def + max + `"}`
}

config := stadapter.AdapterConfig{
EnvConfig: adapter.EnvConfig{
Component: "kafkasource",
Expand Down Expand Up @@ -232,7 +248,6 @@ func (a *Adapter) Update(ctx context.Context, obj *v1beta1.KafkaSource) error {
}

a.sources[key] = cancel
a.adjustResponseSize()

go func(ctx context.Context) {
err := adapter.Start(ctx)
Expand Down Expand Up @@ -264,7 +279,6 @@ func (a *Adapter) Remove(obj *v1beta1.KafkaSource) {
<-cancel.stopped

delete(a.sources, key)
a.adjustResponseSize()

a.logger.Infow("source removed", "name", obj.Name, "remaining", len(a.sources))
}
Expand All @@ -288,32 +302,55 @@ func (a *Adapter) ResolveSecret(ctx context.Context, ns string, ref *corev1.Secr
return "", fmt.Errorf("missing secret key or empty secret value (%s/%s)", ref.Name, ref.Key)
}

// adjustResponseSize ensures the sum of all Kafka clients memory usage does not exceed the container memory request.
func (a *Adapter) adjustResponseSize() {
if a.memoryRequest > 0 {
maxResponseSize := responseSizeCap
// partitionFetchSize determines what should be the default fetch size (in bytes)
// so that the st adapter memory consumption does not exceed
// the allocated memory per vreplica (see MemoryLimit).
// Account for pod (consumer) partial outage by reducing the
// partition buffer size
func (a *Adapter) partitionFetchSize(ctx context.Context,
logger *zap.SugaredLogger,
kafkaEnvConfig *client.KafkaEnvConfig,
topics []string,
podCount int) (int, error) {

// Compute the number of partitions handled by this source
// TODO: periodically check for # of resources. Need control-protocol.
adminClient, err := client.MakeAdminClient(ctx, kafkaEnvConfig)
if err != nil {
logger.Errorw("cannot create admin client", zap.Error(err))
return 0, err
}

if len(a.sources) > 0 {
maxResponseSize = int32(float32(a.memoryRequest) / float32(len(a.sources)))
}
// cap the response size to 50MB.
if maxResponseSize > responseSizeCap {
maxResponseSize = 50 * 1024 * 1024
}
metas, err := adminClient.DescribeTopics(topics)
if err != nil {
logger.Errorw("cannot describe topics", zap.Error(err))
return 0, err
}

// maxResponseSize is per connected brokers.
// For now cap the number of connected brokers to maxBrokers until a better solution
// comes along
maxResponseSize = int32(float32(maxResponseSize) / float32(maxBrokers))
totalPartitions := 0
for _, meta := range metas {
totalPartitions += len(meta.Partitions)
}
adminClient.Close()

// Check for compliance.
if maxResponseSize < 64*1024 {
// Not CloudEvent compliant (https://github.com/cloudevents/spec/blob/v1.0.1/spec.md#size-limits)
a.logger.Warnw("Kafka response size is lower than 64KB. Increase the pod memory request and/or lower the pod capacity.",
zap.Int32("responseSize", maxResponseSize))
}
partitionsPerPod := int(math.Ceil(float64(totalPartitions) / float64(podCount)))

sarama.MaxResponseSize = maxResponseSize
a.logger.Infof("Setting MaxResponseSize to %d bytes", sarama.MaxResponseSize)
// Ideally, partitions are evenly spread across Kafka consumers.
// However, due to rebalancing or consumer (un)availability, a consumer
// might have to handle more partitions than expected.
// For now, account for 1 unavailable consumer.
handledPartitions := 2 * partitionsPerPod
if podCount < 3 {
handledPartitions = totalPartitions
}

logger.Infow("partition count",
zap.Int("total", totalPartitions),
zap.Int("averagePerPod", partitionsPerPod),
zap.Int("handled", handledPartitions))

// A partition consumes about 2 * fetch partition size
// Once by FetchResponse blocks and a second time when those blocks are converted to messages
// see https://github.com/Shopify/sarama/blob/83d633e6e4f71b402df5e9c53ad5c1c334b7065d/consumer.go#L649
return int(math.Floor(float64(a.memLimit) / float64(handledPartitions) / 2.0)), nil
}
44 changes: 2 additions & 42 deletions pkg/source/mtadapter/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@ package mtadapter

import (
"context"
"strconv"
"testing"
"time"

"github.com/Shopify/sarama"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"

fakekubeclient "knative.dev/pkg/client/injection/kube/client/fake"
logtesting "knative.dev/pkg/logging/testing"
pkgtesting "knative.dev/pkg/reconciler/testing"
"knative.dev/pkg/source"

Expand All @@ -54,7 +51,7 @@ func TestUpdateRemoveSources(t *testing.T) {
ctx, _ := pkgtesting.SetupFakeContext(t)
ctx, cancelAdapter := context.WithCancel(ctx)

env := &AdapterConfig{PodName: podName, MemoryRequest: "0"}
env := &AdapterConfig{PodName: podName, MemoryLimit: "0"}
ceClient := adaptertest.NewTestClient()

mtadapter := newAdapter(ctx, env, ceClient, newSampleAdapter).(*Adapter)
Expand Down Expand Up @@ -355,7 +352,7 @@ func TestSourceMTAdapter(t *testing.T) {

ctx, cancelAdapter := context.WithCancel(ctx)

env := &AdapterConfig{PodName: podName, MemoryRequest: "0"}
env := &AdapterConfig{PodName: podName, MemoryLimit: "0"}
ceClient := adaptertest.NewTestClient()

adapter := newAdapter(ctx, env, ceClient, newSampleAdapter).(*Adapter)
Expand Down Expand Up @@ -408,40 +405,3 @@ func (d *sampleAdapter) Start(ctx context.Context) error {

return nil
}

func TestAdjustResponseSize(t *testing.T) {
testCases := map[string]struct {
memoryRequest int64
sourceCount int
want int32
}{
"no memory request": {memoryRequest: 0, sourceCount: 1, want: 100 * 1024 * 1024},
"memory request, response size less 64k": {memoryRequest: 128 * 1024, sourceCount: 4, want: 32 * 1024 / maxBrokers},
"memory request, response size more 64k": {memoryRequest: 512 * 1024, sourceCount: 4, want: 128 * 1024 / maxBrokers},
"memory request, response size more than cap": {memoryRequest: 100 * 1024 * 1024, sourceCount: 1, want: responseSizeCap / maxBrokers},
}

for n, tc := range testCases {
t.Run(n, func(t *testing.T) {
// must run sequentially.
sarama.MaxResponseSize = 100 * 1024 * 1024

sources := make(map[string]cancelContext)
for i := 0; i < tc.sourceCount; i++ {
sources["s"+strconv.Itoa(i)] = cancelContext{}
}

a := Adapter{
memoryRequest: tc.memoryRequest,
sources: sources,
logger: logtesting.TestLogger(t),
}
a.adjustResponseSize()

if sarama.MaxResponseSize != tc.want {
t.Errorf("Unexpected MaxResponseSize. wanted %d, got %d", tc.want, sarama.MaxResponseSize)
}
})
}

}

0 comments on commit afd4ed3

Please sign in to comment.