Skip to content

Commit

Permalink
Adding LogConsumers start as part of the ContainerRequest (#2073)
Browse files Browse the repository at this point in the history
* Adding LogConsumers and LogProducer start as part of the GenericRequest

* Apply suggestions from code review

Co-authored-by: Manuel de la Peña <social.mdelapenya@gmail.com>

* Addressing PR review comments

* Adding docs and global option

* Moving deprecation comments to the right place

* chore: define a StdoutLogConsumer to be exposed

* feat: support configuring the producer at the container request

* chore: deprecate StartLogProducer

* chore: deprecate StopLogProducer and FollowOutput

* docs: document the follow logs feature

* chore: update tests

* doc: add warning regarding timeouts

* chore: move to the container lifecycle

* chore: typo

* chore: remove StartStop test

The lifecycle will be removed

* docs: wording

* fix: handle error

* docs: replace producer word with production

* chore: remove the concept of log producer

* fix: lint

* fix: wrong assertion

* chore: use fixed mysql 8.0.36 version in test

* chore: use NotEmpty assertion

---------

Co-authored-by: Manuel de la Peña <mdelapenya@gmail.com>
Co-authored-by: Manuel de la Peña <social.mdelapenya@gmail.com>
  • Loading branch information
3 people authored Jan 25, 2024
1 parent 6297ec1 commit 16c689a
Show file tree
Hide file tree
Showing 12 changed files with 435 additions and 308 deletions.
25 changes: 13 additions & 12 deletions container.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,25 @@ type Container interface {
Ports(context.Context) (nat.PortMap, error) // get all exposed ports
SessionID() string // get session id
IsRunning() bool
Start(context.Context) error // start the container
Stop(context.Context, *time.Duration) error // stop the container
Terminate(context.Context) error // terminate the container
Logs(context.Context) (io.ReadCloser, error) // Get logs of the container
FollowOutput(LogConsumer)
StartLogProducer(context.Context, ...LogProducerOption) error
StopLogProducer() error
Name(context.Context) (string, error) // get container name
State(context.Context) (*types.ContainerState, error) // returns container's running state
Networks(context.Context) ([]string, error) // get container networks
NetworkAliases(context.Context) (map[string][]string, error) // get container network aliases for a network
Start(context.Context) error // start the container
Stop(context.Context, *time.Duration) error // stop the container
Terminate(context.Context) error // terminate the container
Logs(context.Context) (io.ReadCloser, error) // Get logs of the container
FollowOutput(LogConsumer) // Deprecated: it will be removed in the next major release
StartLogProducer(context.Context, ...LogProductionOption) error // Deprecated: Use the ContainerRequest instead
StopLogProducer() error // Deprecated: it will be removed in the next major release
Name(context.Context) (string, error) // get container name
State(context.Context) (*types.ContainerState, error) // returns container's running state
Networks(context.Context) ([]string, error) // get container networks
NetworkAliases(context.Context) (map[string][]string, error) // get container network aliases for a network
Exec(ctx context.Context, cmd []string, options ...tcexec.ProcessOption) (int, io.Reader, error)
ContainerIP(context.Context) (string, error) // get container ip
ContainerIPs(context.Context) ([]string, error) // get all container IPs
CopyToContainer(ctx context.Context, fileContent []byte, containerFilePath string, fileMode int64) error
CopyDirToContainer(ctx context.Context, hostDirPath string, containerParentPath string, fileMode int64) error
CopyFileToContainer(ctx context.Context, hostFilePath string, containerFilePath string, fileMode int64) error
CopyFileFromContainer(ctx context.Context, filePath string) (io.ReadCloser, error)
GetLogProducerErrorChannel() <-chan error
GetLogProductionErrorChannel() <-chan error
}

// ImageBuildInfo defines what is needed to build an image
Expand Down Expand Up @@ -143,6 +143,7 @@ type ContainerRequest struct {
HostConfigModifier func(*container.HostConfig) // Modifier for the host config before container creation
EnpointSettingsModifier func(map[string]*network.EndpointSettings) // Modifier for the network settings before container creation
LifecycleHooks []ContainerLifecycleHooks // define hooks to be executed during container lifecycle
LogConsumerCfg *LogConsumerConfig // define the configuration for the log producer and its log consumers to follow the logs
}

// containerOptions functional options for a container
Expand Down
215 changes: 130 additions & 85 deletions docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,19 @@ type DockerContainer struct {
isRunning bool
imageWasBuilt bool
// keepBuiltImage makes Terminate not remove the image if imageWasBuilt.
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
stopProducer chan bool
producerDone chan bool
producerError chan error
producerMutex sync.Mutex
producerTimeout *time.Duration
logger Logging
lifecycleHooks []ContainerLifecycleHooks
keepBuiltImage bool
provider *DockerProvider
sessionID string
terminationSignal chan bool
consumers []LogConsumer
raw *types.ContainerJSON
stopLogProductionCh chan bool
logProductionDone chan bool
logProductionError chan error
logProductionMutex sync.Mutex
logProductionTimeout *time.Duration
logger Logging
lifecycleHooks []ContainerLifecycleHooks
}

// SetLogger sets the logger for the container
Expand Down Expand Up @@ -258,11 +258,6 @@ func (c *DockerContainer) Terminate(ctx context.Context) error {
return err
}

err = c.StopLogProducer()
if err != nil {
return err
}

err = c.provider.client.ContainerRemove(ctx, c.GetContainerID(), container.RemoveOptions{
RemoveVolumes: true,
Force: true,
Expand Down Expand Up @@ -367,9 +362,14 @@ func (c *DockerContainer) Logs(ctx context.Context) (io.ReadCloser, error) {
return pr, nil
}

// FollowOutput adds a LogConsumer to be sent logs from the container's
// STDOUT and STDERR
// Deprecated: use the ContainerRequest.LogConsumerConfig field instead.
func (c *DockerContainer) FollowOutput(consumer LogConsumer) {
c.followOutput(consumer)
}

// followOutput adds a LogConsumer to be sent logs from the container's
// STDOUT and STDERR
func (c *DockerContainer) followOutput(consumer LogConsumer) {
c.consumers = append(c.consumers, consumer)
}

Expand Down Expand Up @@ -615,66 +615,71 @@ func (c *DockerContainer) CopyToContainer(ctx context.Context, fileContent []byt
return nil
}

type LogProducerOption func(*DockerContainer)
type LogProductionOption func(*DockerContainer)

// WithLogProducerTimeout is a functional option that sets the timeout for the log producer.
// WithLogProductionTimeout is a functional option that sets the timeout for the log production.
// If the timeout is lower than 5s or greater than 60s it will be set to 5s or 60s respectively.
func WithLogProducerTimeout(timeout time.Duration) LogProducerOption {
func WithLogProductionTimeout(timeout time.Duration) LogProductionOption {
return func(c *DockerContainer) {
c.producerTimeout = &timeout
c.logProductionTimeout = &timeout
}
}

// StartLogProducer will start a concurrent process that will continuously read logs
// Deprecated: use the ContainerRequest.LogConsumerConfig field instead.
func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProductionOption) error {
return c.startLogProduction(ctx, opts...)
}

// startLogProduction will start a concurrent process that will continuously read logs
// from the container and will send them to each added LogConsumer.
// Default log producer timeout is 5s. It is used to set the context timeout
// Default log production timeout is 5s. It is used to set the context timeout
// which means that each log-reading loop will last at least the specified timeout
// and that it cannot be cancelled earlier.
// Use functional option WithLogProducerTimeout() to override default timeout. If it's
// Use functional option WithLogProductionTimeout() to override default timeout. If it's
// lower than 5s and greater than 60s it will be set to 5s or 60s respectively.
func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProducerOption) error {
func (c *DockerContainer) startLogProduction(ctx context.Context, opts ...LogProductionOption) error {
{
c.producerMutex.Lock()
defer c.producerMutex.Unlock()
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()

if c.stopProducer != nil {
return errors.New("log producer already started")
if c.stopLogProductionCh != nil {
return errors.New("log production already started")
}
}

for _, opt := range opts {
opt(c)
}

minProducerTimeout := time.Duration(5 * time.Second)
maxProducerTimeout := time.Duration(60 * time.Second)
minLogProductionTimeout := time.Duration(5 * time.Second)
maxLogProductionTimeout := time.Duration(60 * time.Second)

if c.producerTimeout == nil {
c.producerTimeout = &minProducerTimeout
if c.logProductionTimeout == nil {
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.producerTimeout < minProducerTimeout {
c.producerTimeout = &minProducerTimeout
if *c.logProductionTimeout < minLogProductionTimeout {
c.logProductionTimeout = &minLogProductionTimeout
}

if *c.producerTimeout > maxProducerTimeout {
c.producerTimeout = &maxProducerTimeout
if *c.logProductionTimeout > maxLogProductionTimeout {
c.logProductionTimeout = &maxLogProductionTimeout
}

c.stopProducer = make(chan bool)
c.producerDone = make(chan bool)
c.producerError = make(chan error, 1)
c.stopLogProductionCh = make(chan bool)
c.logProductionDone = make(chan bool)
c.logProductionError = make(chan error, 1)

go func(stop <-chan bool, done chan<- bool, errorCh chan error) {
// signal the producer is done once go routine exits, this prevents race conditions around start/stop
// set c.stopProducer to nil so that it can be started again
// signal the log production is done once go routine exits, this prevents race conditions around start/stop
// set c.stopLogProductionCh to nil so that it can be started again
defer func() {
defer c.producerMutex.Unlock()
defer c.logProductionMutex.Unlock()
close(done)
close(errorCh)
{
c.producerMutex.Lock()
c.stopProducer = nil
c.logProductionMutex.Lock()
c.stopLogProductionCh = nil
}
}()

Expand All @@ -688,7 +693,7 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu
Since: since,
}

ctx, cancel := context.WithTimeout(ctx, *c.producerTimeout)
ctx, cancel := context.WithTimeout(ctx, *c.logProductionTimeout)
defer cancel()

r, err := c.provider.client.ContainerLogs(ctx, c.GetContainerID(), options)
Expand Down Expand Up @@ -757,31 +762,36 @@ func (c *DockerContainer) StartLogProducer(ctx context.Context, opts ...LogProdu
}
}
}
}(c.stopProducer, c.producerDone, c.producerError)
}(c.stopLogProductionCh, c.logProductionDone, c.logProductionError)

return nil
}

// Deprecated: it will be removed in the next major release.
func (c *DockerContainer) StopLogProducer() error {
return c.stopLogProduction()
}

// StopLogProducer will stop the concurrent process that is reading logs
// and sending them to each added LogConsumer
func (c *DockerContainer) StopLogProducer() error {
c.producerMutex.Lock()
defer c.producerMutex.Unlock()
if c.stopProducer != nil {
c.stopProducer <- true
// block until the producer is actually done in order to avoid strange races
<-c.producerDone
c.stopProducer = nil
c.producerDone = nil
return <-c.producerError
func (c *DockerContainer) stopLogProduction() error {
c.logProductionMutex.Lock()
defer c.logProductionMutex.Unlock()
if c.stopLogProductionCh != nil {
c.stopLogProductionCh <- true
// block until the log production is actually done in order to avoid strange races
<-c.logProductionDone
c.stopLogProductionCh = nil
c.logProductionDone = nil
return <-c.logProductionError
}
return nil
}

// GetLogProducerErrorChannel exposes the only way for the consumer
// GetLogProductionErrorChannel exposes the only way for the consumer
// to be able to listen to errors and react to them.
func (c *DockerContainer) GetLogProducerErrorChannel() <-chan error {
return c.producerError
func (c *DockerContainer) GetLogProductionErrorChannel() <-chan error {
return c.logProductionError
}

// DockerNetwork represents a network started using Docker
Expand Down Expand Up @@ -1076,7 +1086,25 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
},
},
PostStarts: []ContainerHook{
// first post-start hook is to wait for the container to be ready
// first post-start hook is to produce logs and start log consumers
func(ctx context.Context, c Container) error {
dockerContainer := c.(*DockerContainer)

logConsumerConfig := req.LogConsumerCfg
if logConsumerConfig == nil {
return nil
}

for _, consumer := range logConsumerConfig.Consumers {
dockerContainer.followOutput(consumer)
}

if len(logConsumerConfig.Consumers) > 0 {
return dockerContainer.startLogProduction(ctx, logConsumerConfig.Opts...)
}
return nil
},
// second post-start hook is to wait for the container to be ready
func(ctx context.Context, c Container) error {
dockerContainer := c.(*DockerContainer)

Expand All @@ -1096,6 +1124,23 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
return nil
},
},
PreTerminates: []ContainerHook{
// first pre-terminate hook is to stop the log production
func(ctx context.Context, c Container) error {
logConsumerConfig := req.LogConsumerCfg

if logConsumerConfig == nil {
return nil
}
if len(logConsumerConfig.Consumers) == 0 {
return nil
}

dockerContainer := c.(*DockerContainer)

return dockerContainer.stopLogProduction()
},
},
},
}

Expand Down Expand Up @@ -1131,17 +1176,17 @@ func (p *DockerProvider) CreateContainer(ctx context.Context, req ContainerReque
}

c := &DockerContainer{
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
stopProducer: nil,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
ID: resp.ID,
WaitingFor: req.WaitingFor,
Image: imageName,
imageWasBuilt: req.ShouldBuildImage(),
keepBuiltImage: req.ShouldKeepBuiltImage(),
sessionID: core.SessionID(),
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
lifecycleHooks: req.LifecycleHooks,
}

err = c.createdHook(ctx)
Expand Down Expand Up @@ -1200,15 +1245,15 @@ func (p *DockerProvider) ReuseOrCreateContainer(ctx context.Context, req Contain
}

dc := &DockerContainer{
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
stopProducer: nil,
logger: p.Logger,
isRunning: c.State == "running",
ID: c.ID,
WaitingFor: req.WaitingFor,
Image: c.Image,
sessionID: sessionID,
provider: p,
terminationSignal: termSignal,
stopLogProductionCh: nil,
logger: p.Logger,
isRunning: c.State == "running",
}

return dc, nil
Expand Down Expand Up @@ -1498,7 +1543,7 @@ func containerFromDockerResponse(ctx context.Context, response types.Container)

container.sessionID = core.SessionID()
container.consumers = []LogConsumer{}
container.stopProducer = nil
container.stopLogProductionCh = nil
container.isRunning = response.State == "running"

// the termination signal should be obtained from the reaper
Expand Down
Loading

0 comments on commit 16c689a

Please sign in to comment.