From 384739c78df4fe1df705bc65095ee94d888a5657 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1chym=20Tou=C5=A1ek?= Date: Wed, 20 Nov 2024 14:25:47 +0100 Subject: [PATCH] fix: Use context.WithoutCancel --- .../service/appsproxy/proxy/apphandler/upstream/upstream.go | 4 ++-- internal/pkg/service/appsproxy/proxy/server.go | 4 ++-- internal/pkg/service/appsproxy/proxy/transport/transport.go | 2 +- internal/pkg/service/cli/dialog/use_template.go | 2 +- internal/pkg/service/common/dependencies/distlock.go | 2 +- internal/pkg/service/common/distlock/mutex.go | 4 ++-- internal/pkg/service/common/distlock/mutex_test.go | 2 +- internal/pkg/service/common/httpserver/httpserver.go | 4 ++-- internal/pkg/service/stream/sink/router/router.go | 6 +++--- .../pkg/service/stream/source/type/httpsource/httpsource.go | 4 ++-- .../service/stream/storage/level/local/diskreader/volume.go | 2 +- .../service/stream/storage/level/local/diskwriter/volume.go | 2 +- 12 files changed, 19 insertions(+), 19 deletions(-) diff --git a/internal/pkg/service/appsproxy/proxy/apphandler/upstream/upstream.go b/internal/pkg/service/appsproxy/proxy/apphandler/upstream/upstream.go index 78222aa10b..01c00fbf56 100644 --- a/internal/pkg/service/appsproxy/proxy/apphandler/upstream/upstream.go +++ b/internal/pkg/service/appsproxy/proxy/apphandler/upstream/upstream.go @@ -190,7 +190,7 @@ func (u *AppUpstream) notify(ctx context.Context) { go func() { defer u.manager.wg.Done() - notificationCtx, cancel := context.WithTimeout(context.Background(), notifyRequestTimeout) + notificationCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), notifyRequestTimeout) defer cancel() notificationCtx = ctxattr.ContextWith(notificationCtx, ctxattr.Attributes(ctx).ToSlice()...) @@ -210,7 +210,7 @@ func (u *AppUpstream) wakeup(ctx context.Context, err error) { go func() { defer u.manager.wg.Done() - wakeupCtx, cancel := context.WithTimeout(context.Background(), wakeupRequestTimeout) + wakeupCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), wakeupRequestTimeout) defer cancel() wakeupCtx = ctxattr.ContextWith(wakeupCtx, ctxattr.Attributes(ctx).ToSlice()...) diff --git a/internal/pkg/service/appsproxy/proxy/server.go b/internal/pkg/service/appsproxy/proxy/server.go index 74886404d5..83ff3535e0 100644 --- a/internal/pkg/service/appsproxy/proxy/server.go +++ b/internal/pkg/service/appsproxy/proxy/server.go @@ -90,8 +90,8 @@ func StartServer(ctx context.Context, d dependencies.ServiceScope) error { proc.Add(func(shutdown servicectx.ShutdownFn) { // Start HTTP server in a separate goroutine. logger.Infof(ctx, "HTTP server listening on %q", cfg.API.Listen) - serverErr := srv.ListenAndServe() // ListenAndServe blocks while the server is running - shutdown(context.Background(), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation + serverErr := srv.ListenAndServe() // ListenAndServe blocks while the server is running + shutdown(context.WithoutCancel(ctx), serverErr) }) // Register graceful shutdown diff --git a/internal/pkg/service/appsproxy/proxy/transport/transport.go b/internal/pkg/service/appsproxy/proxy/transport/transport.go index cb0551f2f2..5a43b46af2 100644 --- a/internal/pkg/service/appsproxy/proxy/transport/transport.go +++ b/internal/pkg/service/appsproxy/proxy/transport/transport.go @@ -92,7 +92,7 @@ func NewWithDNSServer(d dependencies, dnsServerAddress string) (http.RoundTrippe // Create context for DNS resolving // It separates the events/tracing of the connection to the DNS server, from the connection to the target server. - resolveCtx, cancel := context.WithTimeout(context.Background(), DNSResolveTimeout) + resolveCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), DNSResolveTimeout) defer cancel() // We are using custom DNS resolving to detect if the target host - data app, is running. diff --git a/internal/pkg/service/cli/dialog/use_template.go b/internal/pkg/service/cli/dialog/use_template.go index 476e43ba55..ad0d27c6fb 100644 --- a/internal/pkg/service/cli/dialog/use_template.go +++ b/internal/pkg/service/cli/dialog/use_template.go @@ -37,7 +37,7 @@ func (p *Dialogs) AskUseTemplateInputs(ctx context.Context, groups input.StepsGr Dialogs: p, groups: groups, inputs: groups.InputsMap(), - context: context.Background(), + context: context.WithoutCancel(ctx), inputsValues: make(map[string]any), } return dialog.ask(ctx, isForTest, inputsFileFlag) diff --git a/internal/pkg/service/common/dependencies/distlock.go b/internal/pkg/service/common/dependencies/distlock.go index fcb55b872e..57bc4d857a 100644 --- a/internal/pkg/service/common/dependencies/distlock.go +++ b/internal/pkg/service/common/dependencies/distlock.go @@ -25,7 +25,7 @@ func newDistributedLockScope(ctx context.Context, cfg distlock.Config, d distrib ctx, span := d.Telemetry().Tracer().Start(ctx, "keboola.go.common.dependencies.NewDistributedLockScope") defer span.End(&err) - provider, err := distlock.NewProvider(cfg, d) + provider, err := distlock.NewProvider(ctx, cfg, d) if err != nil { return nil, err } diff --git a/internal/pkg/service/common/distlock/mutex.go b/internal/pkg/service/common/distlock/mutex.go index 83e41bf859..33ebedadad 100644 --- a/internal/pkg/service/common/distlock/mutex.go +++ b/internal/pkg/service/common/distlock/mutex.go @@ -40,11 +40,11 @@ func NewConfig() Config { } } -func NewProvider(cfg Config, d dependencies) (*Provider, error) { +func NewProvider(ctx context.Context, cfg Config, d dependencies) (*Provider, error) { p := &Provider{} p.logger = d.Logger().WithComponent("distribution.mutex.provider") - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(context.WithoutCancel(ctx)) wg := &sync.WaitGroup{} d.Process().OnShutdown(func(_ context.Context) { p.logger.Info(ctx, "received shutdown request") diff --git a/internal/pkg/service/common/distlock/mutex_test.go b/internal/pkg/service/common/distlock/mutex_test.go index f8eb4d0c67..4faf16357b 100644 --- a/internal/pkg/service/common/distlock/mutex_test.go +++ b/internal/pkg/service/common/distlock/mutex_test.go @@ -22,7 +22,7 @@ func TestProvider_NewMutex(t *testing.T) { d := dependencies.NewMocked(t, ctx, dependencies.WithEnabledEtcdClient()) client := d.TestEtcdClient() - p, err := distlock.NewProvider(distlock.NewConfig(), d) + p, err := distlock.NewProvider(context.Background(), distlock.NewConfig(), d) require.NoError(t, err) mtx := p.NewMutex("foo/bar") diff --git a/internal/pkg/service/common/httpserver/httpserver.go b/internal/pkg/service/common/httpserver/httpserver.go index 2abbbecf67..feea01be08 100644 --- a/internal/pkg/service/common/httpserver/httpserver.go +++ b/internal/pkg/service/common/httpserver/httpserver.go @@ -74,8 +74,8 @@ func (h *HTTPServer) Start(ctx context.Context) error { // Start HTTP server in a separate goroutine. h.proc.Add(func(shutdown servicectx.ShutdownFn) { h.logger.Infof(ctx, "started HTTP server on %q", h.listenAddress) - serverErr := h.ListenAndServe() // ListenAndServe blocks while the server is running - shutdown(context.Background(), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation + serverErr := h.ListenAndServe() // ListenAndServe blocks while the server is running + shutdown(context.WithoutCancel(ctx), serverErr) }) // Register graceful shutdown diff --git a/internal/pkg/service/stream/sink/router/router.go b/internal/pkg/service/stream/sink/router/router.go index 3a25eb00ee..bf43dbc14e 100644 --- a/internal/pkg/service/stream/sink/router/router.go +++ b/internal/pkg/service/stream/sink/router/router.go @@ -284,12 +284,12 @@ func (r *Router) dispatchToSink(sink *sinkData, c recordctx.Context) *SinkResult error: err, } + finalizationCtx := context.WithoutCancel(c.Ctx()) + if result.StatusCode == http.StatusInternalServerError { - r.logger.Errorf(context.Background(), `write record error: %s`, err.Error()) + r.logger.Errorf(finalizationCtx, `write record error: %s`, err.Error()) } - finalizationCtx := context.WithoutCancel(c.Ctx()) - // Update telemetry attrs := append( sink.sinkKey.Telemetry(), diff --git a/internal/pkg/service/stream/source/type/httpsource/httpsource.go b/internal/pkg/service/stream/source/type/httpsource/httpsource.go index 4cd68069e4..34e231e563 100644 --- a/internal/pkg/service/stream/source/type/httpsource/httpsource.go +++ b/internal/pkg/service/stream/source/type/httpsource/httpsource.go @@ -182,7 +182,7 @@ func Start(ctx context.Context, d dependencies, cfg Config) error { // Create connection conn, err := net.Listen("tcp4", cfg.Listen) if err != nil { - shutdown(context.Background(), err) // nolint: contextcheck // intentionally creating new context for the shutdown operation + shutdown(context.WithoutCancel(ctx), err) // nolint: contextcheck // intentionally creating new context for the shutdown operation return } // Serve requests @@ -190,7 +190,7 @@ func Start(ctx context.Context, d dependencies, cfg Config) error { serverErr := srv.Serve(conn) // blocks while the server is running // Server finished startDone() - shutdown(context.Background(), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation + shutdown(context.WithoutCancel(ctx), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation }) // Register graceful shutdown diff --git a/internal/pkg/service/stream/storage/level/local/diskreader/volume.go b/internal/pkg/service/stream/storage/level/local/diskreader/volume.go index 7cfb3168cb..a8fc0e31c7 100644 --- a/internal/pkg/service/stream/storage/level/local/diskreader/volume.go +++ b/internal/pkg/service/stream/storage/level/local/diskreader/volume.go @@ -66,7 +66,7 @@ func OpenVolume(ctx context.Context, logger log.Logger, clock clock.Clock, confi readers: make(map[string]*readerRef), } - v.ctx, v.cancel = context.WithCancel(context.Background()) + v.ctx, v.cancel = context.WithCancel(context.WithoutCancel(ctx)) v.logger.With(attribute.String("volume.path", spec.Path)).Infof(ctx, `opening volume`) diff --git a/internal/pkg/service/stream/storage/level/local/diskwriter/volume.go b/internal/pkg/service/stream/storage/level/local/diskwriter/volume.go index 55e439c175..1aa40194c4 100644 --- a/internal/pkg/service/stream/storage/level/local/diskwriter/volume.go +++ b/internal/pkg/service/stream/storage/level/local/diskwriter/volume.go @@ -90,7 +90,7 @@ func OpenVolume(ctx context.Context, logger log.Logger, clock clock.Clock, confi v.allocator = config.Allocation.OverrideAllocator } - v.ctx, v.cancel = context.WithCancel(context.Background()) + v.ctx, v.cancel = context.WithCancel(context.WithoutCancel(ctx)) v.logger = v.logger.WithComponent("volume").With(attribute.String("volume.path", spec.Path)) v.logger.Infof(ctx, `opening volume`)