Skip to content

Commit

Permalink
fix: multiple small fixes for service runners
Browse files Browse the repository at this point in the history
It all started debugging the issue on Talos being stuck on reboot when
`talosctl logs -f kubelet` is being used.

Fixes:

* abort goroutine runner even if the goroutine doesn't terminate - we
  have no way to force termination, so at least don't hang forever
* align timeouts for apid/trustd for graceful termination - so that at
  least the service is not SIGKILLed while it does its own graceful
  shutdown
* in stream chunker, act on canceled context immediately instead of
  relying on `Read` to return: with `logs -f` the reader will block
  forever waiting for new logs

Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com>
  • Loading branch information
smira committed Nov 29, 2024
1 parent e33d2f5 commit c7b2543
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 13 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ require (
golang.org/x/text v0.20.0
golang.org/x/time v0.8.0
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20230429144221-925a1e7659e6
google.golang.org/grpc v1.68.0 // do not update to 1.68.0 until we find a way around https://github.com/grpc/grpc-go/pull/7535
google.golang.org/grpc v1.68.0
google.golang.org/protobuf v1.35.2
gopkg.in/yaml.v3 v3.0.1
k8s.io/klog/v2 v2.130.1
Expand Down
32 changes: 29 additions & 3 deletions internal/app/machined/pkg/system/runner/goroutine/goroutine.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,16 @@ import (
"io"
stdlibruntime "runtime"
"sync"
"time"

"github.com/siderolabs/talos/internal/app/machined/pkg/runtime"
"github.com/siderolabs/talos/internal/app/machined/pkg/system/events"
"github.com/siderolabs/talos/internal/app/machined/pkg/system/runner"
)

// ErrAborted is returned by the service when it's aborted (doesn't stop on timeout).
var ErrAborted = errors.New("service aborted")

// goroutineRunner is a runner.Runner that runs a service in a goroutine.
type goroutineRunner struct {
main FuncMain
Expand Down Expand Up @@ -66,10 +70,32 @@ func (r *goroutineRunner) Run(eventSink events.Recorder) error {

eventSink(events.StateRunning, "Service started as goroutine")

return r.wrappedMain()
errCh := make(chan error)
ctx := r.ctx

go func() {
errCh <- r.wrappedMain(ctx)
}()

select {
case <-r.ctx.Done():
eventSink(events.StateStopping, "Service stopping")
case err := <-errCh:
// service finished on its own
return err
}

select {
case <-time.After(r.opts.GracefulShutdownTimeout * 2):
eventSink(events.StateStopping, "Service hasn't stopped gracefully on timeout, aborting")

return ErrAborted
case err := <-errCh:
return err
}
}

func (r *goroutineRunner) wrappedMain() (err error) {
func (r *goroutineRunner) wrappedMain(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil {
buf := make([]byte, 8192)
Expand All @@ -87,7 +113,7 @@ func (r *goroutineRunner) wrappedMain() (err error) {

defer writerCloser() //nolint:errcheck

if err = r.main(r.ctx, r.runtime, w); !errors.Is(err, context.Canceled) {
if err = r.main(ctx, r.runtime, w); !errors.Is(err, context.Canceled) {
return err // return error if it's not context.Canceled (service was not aborted)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,38 @@ func (suite *GoroutineSuite) TestStop() {
suite.Assert().NoError(<-errCh)
}

func (suite *GoroutineSuite) TestStuckOnStop() {
r := goroutine.NewRunner(suite.r, "teststop",
func(ctx context.Context, data runtime.Runtime, logger io.Writer) error {
// hanging forever
select {}
},
runner.WithLoggingManager(suite.loggingManager),
runner.WithGracefulShutdownTimeout(10*time.Millisecond),
)

suite.Assert().NoError(r.Open())

defer func() { suite.Assert().NoError(r.Close()) }()

errCh := make(chan error)

go func() {
errCh <- r.Run(MockEventSink)
}()

time.Sleep(20 * time.Millisecond)

select {
case <-errCh:
suite.Require().Fail("should not return yet")
default:
}

suite.Assert().NoError(r.Stop())
suite.Assert().ErrorIs(<-errCh, goroutine.ErrAborted)
}

func (suite *GoroutineSuite) TestRunLogs() {
r := goroutine.NewRunner(suite.r, "logtest",
func(ctx context.Context, data runtime.Runtime, logger io.Writer) error {
Expand Down
2 changes: 2 additions & 0 deletions internal/app/machined/pkg/system/services/apid.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/containerd/containerd/v2/pkg/cap"
"github.com/containerd/containerd/v2/pkg/oci"
Expand Down Expand Up @@ -200,6 +201,7 @@ func (o *APID) Runner(r runtime.Runtime) (runner.Runner, error) {
runner.WithLoggingManager(r.Logging()),
runner.WithContainerdAddress(constants.SystemContainerdAddress),
runner.WithEnv(env),
runner.WithGracefulShutdownTimeout(15*time.Second),
runner.WithCgroupPath(constants.CgroupApid),
runner.WithSelinuxLabel(constants.SelinuxLabelApid),
runner.WithOCISpecOpts(
Expand Down
2 changes: 2 additions & 0 deletions internal/app/machined/pkg/system/services/trustd.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"os"
"path/filepath"
"strconv"
"time"

"github.com/containerd/containerd/v2/pkg/cap"
"github.com/containerd/containerd/v2/pkg/oci"
Expand Down Expand Up @@ -164,6 +165,7 @@ func (t *Trustd) Runner(r runtime.Runtime) (runner.Runner, error) {
runner.WithContainerdAddress(constants.SystemContainerdAddress),
runner.WithEnv(env),
runner.WithCgroupPath(constants.CgroupTrustd),
runner.WithGracefulShutdownTimeout(15*time.Second),
runner.WithSelinuxLabel(constants.SelinuxLabelTrustd),
runner.WithOCISpecOpts(
oci.WithDroppedCapabilities(cap.Known()),
Expand Down
20 changes: 15 additions & 5 deletions pkg/chunker/stream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,10 @@ import (
"errors"
"fmt"
"io"
"os"

"github.com/siderolabs/gen/xslices"
"github.com/siderolabs/go-circular"

"github.com/siderolabs/talos/pkg/chunker"
)
Expand Down Expand Up @@ -61,27 +63,35 @@ func NewChunker(ctx context.Context, source Source, setters ...Option) chunker.C
}

// Read implements ChunkReader.
//
//nolint:gocyclo
func (c *Stream) Read() <-chan []byte {
// Create a buffered channel of length 1.
ch := make(chan []byte, 1)

go func(ch chan []byte) {
defer close(ch)
//nolint:errcheck
defer c.source.Close()

ctx, cancel := context.WithCancel(c.ctx)
defer cancel()

go func() {
<-ctx.Done()
c.source.Close() //nolint:errcheck
}()

buf := make([]byte, c.options.Size)

for {
select {
case <-c.ctx.Done():
case <-ctx.Done():
return
default:
}

n, err := c.source.Read(buf)
if err != nil {
if !(errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe)) {
if !(errors.Is(err, io.EOF) || errors.Is(err, io.ErrClosedPipe) || errors.Is(err, os.ErrClosed) || errors.Is(err, circular.ErrClosed)) {
fmt.Printf("read error: %s\n", err.Error())
}

Expand All @@ -93,7 +103,7 @@ func (c *Stream) Read() <-chan []byte {
b := xslices.CopyN(buf, n)

select {
case <-c.ctx.Done():
case <-ctx.Done():
return
case ch <- b:
}
Expand Down
4 changes: 0 additions & 4 deletions pkg/chunker/stream/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ func (suite *StreamChunkerSuite) TestStreamingCancel() {

ctxCancel()

// need any I/O for chunker to notice that context got canceled
//nolint:errcheck
suite.writer.Write([]byte(""))

suite.Require().Equal([]byte("abcdefghijklmno"), <-combinedCh)
}

Expand Down

0 comments on commit c7b2543

Please sign in to comment.