Skip to content

Commit

Permalink
fix: Use context.WithoutCancel
Browse files Browse the repository at this point in the history
  • Loading branch information
jachym-tousek-keboola committed Nov 21, 2024
1 parent 204802a commit 384739c
Show file tree
Hide file tree
Showing 12 changed files with 19 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func (u *AppUpstream) notify(ctx context.Context) {
go func() {
defer u.manager.wg.Done()

notificationCtx, cancel := context.WithTimeout(context.Background(), notifyRequestTimeout)
notificationCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), notifyRequestTimeout)
defer cancel()

notificationCtx = ctxattr.ContextWith(notificationCtx, ctxattr.Attributes(ctx).ToSlice()...)
Expand All @@ -210,7 +210,7 @@ func (u *AppUpstream) wakeup(ctx context.Context, err error) {
go func() {
defer u.manager.wg.Done()

wakeupCtx, cancel := context.WithTimeout(context.Background(), wakeupRequestTimeout)
wakeupCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), wakeupRequestTimeout)
defer cancel()

wakeupCtx = ctxattr.ContextWith(wakeupCtx, ctxattr.Attributes(ctx).ToSlice()...)
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/service/appsproxy/proxy/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ func StartServer(ctx context.Context, d dependencies.ServiceScope) error {
proc.Add(func(shutdown servicectx.ShutdownFn) {
// Start HTTP server in a separate goroutine.
logger.Infof(ctx, "HTTP server listening on %q", cfg.API.Listen)
serverErr := srv.ListenAndServe() // ListenAndServe blocks while the server is running
shutdown(context.Background(), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation
serverErr := srv.ListenAndServe() // ListenAndServe blocks while the server is running
shutdown(context.WithoutCancel(ctx), serverErr)
})

// Register graceful shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func NewWithDNSServer(d dependencies, dnsServerAddress string) (http.RoundTrippe

// Create context for DNS resolving
// It separates the events/tracing of the connection to the DNS server, from the connection to the target server.
resolveCtx, cancel := context.WithTimeout(context.Background(), DNSResolveTimeout)
resolveCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), DNSResolveTimeout)
defer cancel()

// We are using custom DNS resolving to detect if the target host - data app, is running.
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/cli/dialog/use_template.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (p *Dialogs) AskUseTemplateInputs(ctx context.Context, groups input.StepsGr
Dialogs: p,
groups: groups,
inputs: groups.InputsMap(),
context: context.Background(),
context: context.WithoutCancel(ctx),
inputsValues: make(map[string]any),
}
return dialog.ask(ctx, isForTest, inputsFileFlag)
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/common/dependencies/distlock.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func newDistributedLockScope(ctx context.Context, cfg distlock.Config, d distrib
ctx, span := d.Telemetry().Tracer().Start(ctx, "keboola.go.common.dependencies.NewDistributedLockScope")
defer span.End(&err)

provider, err := distlock.NewProvider(cfg, d)
provider, err := distlock.NewProvider(ctx, cfg, d)
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/service/common/distlock/mutex.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ func NewConfig() Config {
}
}

func NewProvider(cfg Config, d dependencies) (*Provider, error) {
func NewProvider(ctx context.Context, cfg Config, d dependencies) (*Provider, error) {
p := &Provider{}
p.logger = d.Logger().WithComponent("distribution.mutex.provider")

ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithCancel(context.WithoutCancel(ctx))
wg := &sync.WaitGroup{}
d.Process().OnShutdown(func(_ context.Context) {
p.logger.Info(ctx, "received shutdown request")
Expand Down
2 changes: 1 addition & 1 deletion internal/pkg/service/common/distlock/mutex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestProvider_NewMutex(t *testing.T) {
d := dependencies.NewMocked(t, ctx, dependencies.WithEnabledEtcdClient())
client := d.TestEtcdClient()

p, err := distlock.NewProvider(distlock.NewConfig(), d)
p, err := distlock.NewProvider(context.Background(), distlock.NewConfig(), d)
require.NoError(t, err)

mtx := p.NewMutex("foo/bar")
Expand Down
4 changes: 2 additions & 2 deletions internal/pkg/service/common/httpserver/httpserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ func (h *HTTPServer) Start(ctx context.Context) error {
// Start HTTP server in a separate goroutine.
h.proc.Add(func(shutdown servicectx.ShutdownFn) {
h.logger.Infof(ctx, "started HTTP server on %q", h.listenAddress)
serverErr := h.ListenAndServe() // ListenAndServe blocks while the server is running
shutdown(context.Background(), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation
serverErr := h.ListenAndServe() // ListenAndServe blocks while the server is running
shutdown(context.WithoutCancel(ctx), serverErr)
})

// Register graceful shutdown
Expand Down
6 changes: 3 additions & 3 deletions internal/pkg/service/stream/sink/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,12 +284,12 @@ func (r *Router) dispatchToSink(sink *sinkData, c recordctx.Context) *SinkResult
error: err,
}

finalizationCtx := context.WithoutCancel(c.Ctx())

if result.StatusCode == http.StatusInternalServerError {
r.logger.Errorf(context.Background(), `write record error: %s`, err.Error())
r.logger.Errorf(finalizationCtx, `write record error: %s`, err.Error())
}

finalizationCtx := context.WithoutCancel(c.Ctx())

// Update telemetry
attrs := append(
sink.sinkKey.Telemetry(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,15 +182,15 @@ func Start(ctx context.Context, d dependencies, cfg Config) error {
// Create connection
conn, err := net.Listen("tcp4", cfg.Listen)
if err != nil {
shutdown(context.Background(), err) // nolint: contextcheck // intentionally creating new context for the shutdown operation
shutdown(context.WithoutCancel(ctx), err) // nolint: contextcheck // intentionally creating new context for the shutdown operation
return
}
// Serve requests
logger.Infof(ctx, "started HTTP source on %q", cfg.Listen)
serverErr := srv.Serve(conn) // blocks while the server is running
// Server finished
startDone()
shutdown(context.Background(), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation
shutdown(context.WithoutCancel(ctx), serverErr) // nolint: contextcheck // intentionally creating new context for the shutdown operation
})

// Register graceful shutdown
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func OpenVolume(ctx context.Context, logger log.Logger, clock clock.Clock, confi
readers: make(map[string]*readerRef),
}

v.ctx, v.cancel = context.WithCancel(context.Background())
v.ctx, v.cancel = context.WithCancel(context.WithoutCancel(ctx))

v.logger.With(attribute.String("volume.path", spec.Path)).Infof(ctx, `opening volume`)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ func OpenVolume(ctx context.Context, logger log.Logger, clock clock.Clock, confi
v.allocator = config.Allocation.OverrideAllocator
}

v.ctx, v.cancel = context.WithCancel(context.Background())
v.ctx, v.cancel = context.WithCancel(context.WithoutCancel(ctx))

v.logger = v.logger.WithComponent("volume").With(attribute.String("volume.path", spec.Path))
v.logger.Infof(ctx, `opening volume`)
Expand Down

0 comments on commit 384739c

Please sign in to comment.