Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[breaking] Add err chan to log producer and don't panic on error #1971

Merged
merged 14 commits into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion container.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type Container interface {
Terminate(context.Context) error // terminate the container
Logs(context.Context) (io.ReadCloser, error) // Get logs of the container
FollowOutput(LogConsumer)
StartLogProducer(context.Context) error
StartLogProducer(context.Context, ...LogProducerOption) error
StopLogProducer() error
Name(context.Context) (string, error) // get container name
State(context.Context) (*types.ContainerState, error) // returns container's running state
Expand All @@ -61,6 +61,7 @@ type Container interface {
CopyDirToContainer(ctx context.Context, hostDirPath string, containerParentPath string, fileMode int64) error
CopyFileToContainer(ctx context.Context, hostFilePath string, containerFilePath string, fileMode int64) error
CopyFileFromContainer(ctx context.Context, filePath string) (io.ReadCloser, error)
GetLogProducerErrorChannel() <-chan error
}

// ImageBuildInfo defines what is needed to build an image
Expand Down
89 changes: 73 additions & 16 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"path/filepath"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
Expand Down Expand Up @@ -67,6 +68,9 @@ type DockerContainer struct {
raw *types.ContainerJSON
stopProducer chan bool
producerDone chan bool
producerError chan error
producerMutex sync.Mutex
producerTimeout *time.Duration
logger Logging
lifecycleHooks []ContainerLifecycleHooks
}
Expand Down Expand Up @@ -612,19 +616,68 @@ func (c *DockerContainer) CopyToContainer(ctx context.Context, fileContent []byt
return nil
}

type LogProducerOption func(*DockerContainer)

// WithLogProducerTimeout is a functional option that sets the timeout for the log producer.
// If the timeout is lower than 5s or greater than 60s it will be set to 5s or 60s respectively.
func WithLogProducerTimeout(timeout time.Duration) LogProducerOption {
return func(c *DockerContainer) {
c.producerTimeout = &timeout
}
}

// StartLogProducer will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer
func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
if c.stopProducer != nil {
return errors.New("log producer already started")
// from the container and will send them to each added LogConsumer.
// Default log producer timeout is 5s. It is used to set the context timeout
// which means that each log-reading loop will last at least the specified timeout
// and that it cannot be cancelled earlier.
// Use functional option WithLogProducerTimeout() to override default timeout. If it's
// lower than 5s and greater than 60s it will be set to 5s or 60s respectively.
func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProducerOption) error {
{
c.producerMutex.Lock()
defer c.producerMutex.Unlock()

if c.stopProducer != nil {
return errors.New("log producer already started")
}
}

for _, opt := range opts {
opt(c)
}

minProducerTimeout := time.Duration(5 * time.Second)
maxProducerTimeout := time.Duration(60 * time.Second)

if c.producerTimeout == nil {
c.producerTimeout = &minProducerTimeout
}

if *c.producerTimeout < minProducerTimeout {
c.producerTimeout = &minProducerTimeout
}

if *c.producerTimeout > maxProducerTimeout {
c.producerTimeout = &maxProducerTimeout
}

c.stopProducer = make(chan bool)
c.producerDone = make(chan bool)
c.producerError = make(chan error, 1)

go func(stop <-chan bool, done chan<- bool) {
go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
// signal the producer is done once go routine exits, this prevents race conditions around start/stop
defer close(done)
// set c.stopProducer to nil so that it can be started again
defer func() {
defer c.producerMutex.Unlock()
close(done)
close(errorCh)
{
c.producerMutex.Lock()
c.stopProducer = nil
}
}()

since := ""
// if the socket is closed we will make additional logs request with updated Since timestamp
Expand All @@ -636,25 +689,20 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
Since: since,
}

ctx, cancel := context.WithTimeout(ctx, time.Second*5)
ctx, cancel := context.WithTimeout(ctx, *c.producerTimeout)
defer cancel()

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
if err != nil {
// if we can't get the logs, panic, we can't return an error to anything
// from within this goroutine
panic(err)
errorCh <- err
return
}
defer c.provider.Close()

for {
select {
case <-stop:
err := r.Close()
if err != nil {
// we can't close the read closer, this should never happen
panic(err)
}
errorCh <- r.Close()
return
default:
h := make([]byte, 8)
Expand Down Expand Up @@ -710,24 +758,33 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context) error {
}
}
}
}(c.stopProducer, c.producerDone)
}(c.stopProducer, c.producerDone, c.producerError)

return nil
}

// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) StopLogProducer() error {
c.producerMutex.Lock()
defer c.producerMutex.Unlock()
if c.stopProducer != nil {
c.stopProducer <- true
// block until the producer is actually done in order to avoid strange races
<-c.producerDone
c.stopProducer = nil
c.producerDone = nil
return <-c.producerError
}
return nil
}

// GetLogProducerErrorChannel exposes the only way for the consumer
// to be able to listen to errors and react to them.
func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error {
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
mdelapenya marked this conversation as resolved.
Show resolved Hide resolved
return c.producerError
}

// DockerNetwork represents a network started using Docker
type DockerNetwork struct {
ID string // Network ID from Docker
Expand Down
57 changes: 56 additions & 1 deletion docs/features/follow_logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,59 @@ if err != nil {
```

`LogProducer` is stopped in `c.Terminate()`. It can be done manually during container lifecycle
using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time
using `c.StopLogProducer()`. For a particular container, only one `LogProducer` can be active at time.

`StartLogProducer()` also accepts a functional parameter now used to set log producer timeout:
```golang
type LogProducerOption func(*DockerContainer)

func WithLogProducerTimeout(timeout time.Duration) LogProducerOption {
return func(c *DockerContainer) {
c.producerTimeout = &timeout
}
}

// usage
err := c.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second))
if err != nil {
// do something with err
}
```

If no parameter is passed a default timeout of 5 seconds will be used. Values below 5 seconds and above 60 seconds will
be coerced to these boundary values.

## Listening to errors

When log producer fails to start within given timeout (causing a context deadline) or there's an error returned while closing the reader it will no longer panic, but instead will return an error over a channel. You can listen to it using `DockerContainer.GetLogProducerErrorChannel()` method:
```golang
func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error {
return c.producerError
}
```

This allows you to, for example, retry restarting log producer if it fails to start the first time. For example:

```golang
Tofel marked this conversation as resolved.
Show resolved Hide resolved
// start log producer normally
err = container.StartLogProducer(ctx, WithLogProducerTimeout(10*time.Second))

// listen to errors in a detached goroutine
go func(done chan struct{}, timeout time.Duration, retryLimit int) {
for {
select {
case logErr := <-container.GetLogProducerErrorChannel():
if logErr != nil {
// do something with error
// for example, retry starting log producer
// (here we retry it once, in real life you might want to retry it more times)
startErr := container.StartLogProducer(ctx, timeout)
if startErr != nil {
return
}
case <-done:
return
}
}
}(cons.logListeningDone, time.Duration(10*time.Second))
```
64 changes: 64 additions & 0 deletions logconsumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,3 +458,67 @@ func TestContainerLogsShouldBeWithoutStreamHeader(t *testing.T) {
}
assert.Equal(t, "0", strings.TrimSpace(string(b)))
}

func Test_StartLogProducerStillStartsWithTooLowTimeout(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testdata/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)
terminateContainerOnEnd(t, ctx, c)

g := TestLogConsumer{
Msgs: []string{},
Done: make(chan bool),
Accepted: devNullAcceptorChan(),
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx, WithLogProducerTimeout(4*time.Second))
require.NoError(t, err, "should still start with too low timeout")
}

func Test_StartLogProducerStillStartsWithTooHighTimeout(t *testing.T) {
ctx := context.Background()
req := ContainerRequest{
FromDockerfile: FromDockerfile{
Context: "./testdata/",
Dockerfile: "echoserver.Dockerfile",
},
ExposedPorts: []string{"8080/tcp"},
WaitingFor: wait.ForLog("ready"),
}

gReq := GenericContainerRequest{
ContainerRequest: req,
Started: true,
}

c, err := GenericContainer(ctx, gReq)
require.NoError(t, err)
terminateContainerOnEnd(t, ctx, c)

g := TestLogConsumer{
Msgs: []string{},
Done: make(chan bool),
Accepted: devNullAcceptorChan(),
}

c.FollowOutput(&g)

err = c.StartLogProducer(ctx, WithLogProducerTimeout(61*time.Second))
require.NoError(t, err, "should still start with too high timeout")
}
Loading