From 2efd0d67ac3f5f65690ded88b8c68836ad13dd28 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Michal=20Jure=C4=8Dko?= Date: Fri, 27 Sep 2024 11:22:57 +0200 Subject: [PATCH] feat: Enable request body streaming --- internal/pkg/service/stream/config/config_test.go | 2 ++ internal/pkg/service/stream/source/type/httpsource/config.go | 2 ++ .../pkg/service/stream/source/type/httpsource/httpsource.go | 4 ++-- .../stream/kubernetes/templates/config/config-map.yaml | 2 ++ 4 files changed, 8 insertions(+), 2 deletions(-) diff --git a/internal/pkg/service/stream/config/config_test.go b/internal/pkg/service/stream/config/config_test.go index b51d8ea4c3..c57f1c51b5 100644 --- a/internal/pkg/service/stream/config/config_test.go +++ b/internal/pkg/service/stream/config/config_test.go @@ -122,6 +122,8 @@ source: readBufferSize: 16KB # Write buffer size. Validation rules: required writeBufferSize: 4KB + # Size of the body loaded to memory before handler. Validation rules: required + prefetchBodySize: 64KB # Max size of the HTTP request body. Validation rules: required maxRequestBodySize: 1MB sink: diff --git a/internal/pkg/service/stream/source/type/httpsource/config.go b/internal/pkg/service/stream/source/type/httpsource/config.go index 9db0f52745..aa6ca695bd 100644 --- a/internal/pkg/service/stream/source/type/httpsource/config.go +++ b/internal/pkg/service/stream/source/type/httpsource/config.go @@ -15,6 +15,7 @@ type Config struct { MaxConnections int `configKey:"maxConnections" configUsage:"The maximum number of concurrent connections the server may serve." validate:"required"` ReadBufferSize datasize.ByteSize `configKey:"readBufferSize" configUsage:"Read buffer size, all HTTP headers must fit in" validate:"required"` WriteBufferSize datasize.ByteSize `configKey:"writeBufferSize" configUsage:"Write buffer size." validate:"required"` + PrefetchBodySize datasize.ByteSize `configKey:"prefetchBodySize" configUsage:"Size of the body loaded to memory before handler." validate:"required"` MaxRequestBodySize datasize.ByteSize `configKey:"maxRequestBodySize" configUsage:"Max size of the HTTP request body." validate:"required"` } @@ -27,6 +28,7 @@ func NewConfig() Config { MaxConnections: 200000, ReadBufferSize: 16 * datasize.KB, WriteBufferSize: 4 * datasize.KB, + PrefetchBodySize: 64 * datasize.KB, MaxRequestBodySize: 1 * datasize.MB, } } diff --git a/internal/pkg/service/stream/source/type/httpsource/httpsource.go b/internal/pkg/service/stream/source/type/httpsource/httpsource.go index 4070534fdd..3f7d6c6989 100644 --- a/internal/pkg/service/stream/source/type/httpsource/httpsource.go +++ b/internal/pkg/service/stream/source/type/httpsource/httpsource.go @@ -138,8 +138,8 @@ func Start(ctx context.Context, d dependencies, cfg Config) error { ReadBufferSize: int(cfg.ReadBufferSize.Bytes()), WriteBufferSize: int(cfg.WriteBufferSize.Bytes()), ReduceMemoryUsage: true, // ctx.ResetBody frees the buffer for reuse (slightly higher CPU usage) - MaxRequestBodySize: int(cfg.MaxRequestBodySize.Bytes()), - StreamRequestBody: false, // true is slower + MaxRequestBodySize: int(cfg.PrefetchBodySize.Bytes()), + StreamRequestBody: true, // don't prefetch the full body TCPKeepalive: true, NoDefaultServerHeader: true, DisablePreParseMultipartForm: true, diff --git a/provisioning/stream/kubernetes/templates/config/config-map.yaml b/provisioning/stream/kubernetes/templates/config/config-map.yaml index fb8bc42b63..722f39a3fe 100644 --- a/provisioning/stream/kubernetes/templates/config/config-map.yaml +++ b/provisioning/stream/kubernetes/templates/config/config-map.yaml @@ -108,6 +108,8 @@ data: readBufferSize: 16KB # Write buffer size. Validation rules: required writeBufferSize: 4KB + # Size of the body loaded to memory before handler. Validation rules: required + prefetchBodySize: 64KB # Max size of the HTTP request body. Validation rules: required maxRequestBodySize: 1MB sink: