Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Use Docker event listener to detect microservices #1499

Merged
merged 1 commit into from
Oct 7, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
203 changes: 77 additions & 126 deletions plugins/linux/nsplugin/descriptor/microservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,11 @@ const (
// MicroserviceDescriptorName is the name of the descriptor for microservices.
MicroserviceDescriptorName = "microservice"

// how often in seconds to refresh the microservice state data
dockerRefreshPeriod = 3 * time.Second
dockerRetryPeriod = 5 * time.Second
// docker API keywords
dockerTypeContainer = "container"
dockerStateRunning = "running"
dockerActionStart = "start"
dockerActionStop = "stop"
)

// MicroserviceDescriptor watches Docker and notifies KVScheduler about newly
Expand Down Expand Up @@ -168,91 +170,6 @@ func (d *MicroserviceDescriptor) GetMicroserviceStateData(msLabel string) (ms *M
return ms, found
}

// handleMicroservices handles microservice changes.
func (d *MicroserviceDescriptor) handleMicroservices(ctx *microserviceCtx) {
var err error
var newest int64
var containers []docker.APIContainers
var nextCreated []string

// First check if any microservice has terminated.
d.msStateLock.Lock()
for container := range d.microServiceByID {
details, err := d.dockerClient.InspectContainer(container)
if err != nil || !details.State.Running {
d.processTerminatedMicroservice(container)
}
}
d.msStateLock.Unlock()

// Now check if previously created containers have transitioned to the state "running".
for _, container := range ctx.created {
details, err := d.dockerClient.InspectContainer(container)
if err == nil {
if details.State.Running {
d.detectMicroservice(details)
} else if details.State.Status == "created" {
nextCreated = append(nextCreated, container)
}
} else {
d.log.Debugf("Inspect container ID %v failed: %v", container, err)
}
}
ctx.created = nextCreated

// Inspect newly created containers
listOpts := docker.ListContainersOptions{
All: true,
Filters: map[string][]string{},
}
// List containers and filter all older than 'since' ID
if ctx.since != "" {
listOpts.Filters["since"] = []string{ctx.since}
}
containers, err = d.dockerClient.ListContainers(listOpts)
if err != nil {
// If 'since' container was not found, list all containers (404 is required to support older docker version)
if dockerErr, ok := err.(*docker.Error); ok && (dockerErr.Status == 500 || dockerErr.Status == 404) {
// Reset filter and list containers again
d.log.Debugf("clearing 'since' %s", ctx.since)
ctx.since = ""
delete(listOpts.Filters, "since")
containers, err = d.dockerClient.ListContainers(listOpts)
}
if err != nil {
// If there is other error, return it
d.log.Errorf("Error listing docker containers: %v", err)
return
}
}

for _, container := range containers {
if ctx.lastInspected != 0 {
d.log.Debugf("processing new container %v with state %v", container.ID, container.State)
}
if container.State == "running" && container.Created > ctx.lastInspected {
// Inspect the container to get the list of defined environment variables.
details, err := d.dockerClient.InspectContainer(container.ID)
if err != nil {
d.log.Debugf("Inspect container %v failed: %v", container.ID, err)
continue
}
d.detectMicroservice(details)
}
if container.State == "created" {
ctx.created = append(ctx.created, container.ID)
}
if container.Created > newest {
newest = container.Created
ctx.since = container.ID
}
}

if newest > ctx.lastInspected {
ctx.lastInspected = newest
}
}

// detectMicroservice inspects container to see if it is a microservice.
// If microservice is detected, processNewMicroservice() is called to process it.
func (d *MicroserviceDescriptor) detectMicroservice(container *docker.Container) {
Expand Down Expand Up @@ -301,7 +218,6 @@ func (d *MicroserviceDescriptor) processNewMicroservice(microserviceLabel string
Key: nsmodel.MicroserviceKey(ms.Label),
Value: &prototypes.Empty{},
Metadata: nil,

})
}
}
Expand Down Expand Up @@ -331,6 +247,36 @@ func (d *MicroserviceDescriptor) processTerminatedMicroservice(id string) {
}
}

// setStateInSync sets internal state to "in sync" and signals the state transition.
func (d *MicroserviceDescriptor) setStateInSync() {
d.msStateLock.Lock()
d.msStateInSync = true
d.msStateLock.Unlock()
d.msStateInSyncCond.Broadcast()
}

// processStartedContainer processes a started Docker container - inspects whether it is a microservice.
// If it is, notifies scheduler about a new microservice.
func (d *MicroserviceDescriptor) processStartedContainer(id string) {
container, err := d.dockerClient.InspectContainer(id)
if err != nil {
d.log.Warnf("Error by inspecting container %s: %v", id, err)
return
}
d.detectMicroservice(container)
}

// processStoppedContainer processes a stopped Docker container - if it is a microservice,
// notifies scheduler about its termination.
func (d *MicroserviceDescriptor) processStoppedContainer(id string) {
d.msStateLock.Lock()
defer d.msStateLock.Unlock()

if _, found := d.microServiceByID[id]; found {
d.processTerminatedMicroservice(id)
}
}

// trackMicroservices is running in the background and maintains a map of microservice labels to container info.
func (d *MicroserviceDescriptor) trackMicroservices(ctx context.Context) {
d.wg.Add(1)
Expand All @@ -339,51 +285,56 @@ func (d *MicroserviceDescriptor) trackMicroservices(ctx context.Context) {
d.log.Debugf("Microservice tracking ended")
}()

msCtx := &microserviceCtx{}
// subscribe to Docker events
listener := make(chan *docker.APIEvents, 10)
err := d.dockerClient.AddEventListener(listener)
if err != nil {
d.log.Warnf("Failed to add Docker event listener: %v", err)
d.setStateInSync() // empty set of microservices is considered
return
}

// list currently running containers
listOpts := docker.ListContainersOptions{
All: true,
}
containers, err := d.dockerClient.ListContainers(listOpts)
if err != nil {
d.log.Warnf("Failed to list Docker containers: %v", err)
d.setStateInSync() // empty set of microservices is considered
return
}
for _, container := range containers {
if container.State == dockerStateRunning {
details, err := d.dockerClient.InspectContainer(container.ID)
if err != nil {
d.log.Warnf("Error by inspecting container %s: %v", container.ID, err)
continue
}
d.detectMicroservice(details)
}
}

var clientOk bool
// mark state data as in-sync
d.setStateInSync()

timer := time.NewTimer(0)
// process Docker events
for {
select {
case <-timer.C:
if err := d.dockerClient.Ping(); err != nil {
if clientOk {
d.log.Errorf("Docker ping check failed: %v", err)
}
clientOk = false

// Sleep before another retry.
timer.Reset(dockerRetryPeriod)
break
case ev, ok := <-listener:
if !ok {
return
}

if !clientOk {
d.log.Infof("Docker ping check OK")
/*if info, err := d.dockerClient.Info(); err != nil {
d.log.Errorf("Retrieving docker info failed: %v", err)
timer.Reset(dockerRetryPeriod)
continue
} else {
d.log.Infof("Docker connection established: server version: %v (%v %v %v)",
info.ServerVersion, info.OperatingSystem, info.Architecture, info.KernelVersion)
}*/
if ev.Type == dockerTypeContainer {
if ev.Action == dockerActionStart {
d.processStartedContainer(ev.Actor.ID)
}
if ev.Action == dockerActionStop {
d.processStoppedContainer(ev.Actor.ID)
}
}
clientOk = true

d.handleMicroservices(msCtx)

// Sleep before another refresh.
timer.Reset(dockerRefreshPeriod)
case <-d.ctx.Done():
return
}

// mark state data as in-sync - if connection to docker is failing,
// empty set of microservices is considered
d.msStateLock.Lock()
d.msStateInSync = true
d.msStateLock.Unlock()
d.msStateInSyncCond.Broadcast()
}
}