Skip to content

Commit

Permalink
Service discovery logic rework
Browse files Browse the repository at this point in the history
changed the ipMap to SetMatrix to allow transient states
Compacted the addSvc and deleteSvc into a one single method
Updated the datastructure for backends to allow storing all the information needed
to cleanup properly during the cleanupServiceBindings
Removed the enable/disable Service logic that was racing with sbLeave/sbJoin logic
Add some debug logs to track further race conditions

Signed-off-by: Flavio Crisciani <flavio.crisciani@docker.com>
  • Loading branch information
Flavio Crisciani committed Jun 10, 2017
1 parent 8783788 commit 41278c3
Show file tree
Hide file tree
Showing 11 changed files with 315 additions and 146 deletions.
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,6 @@ run-tests:

check-local: check-format check-code run-tests


integration-tests: ./bin/dnet
@./test/integration/dnet/run-integration-tests.sh

Expand Down
74 changes: 43 additions & 31 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -583,7 +583,7 @@ func (ep *endpoint) deleteDriverInfoFromCluster() error {
return nil
}

func (ep *endpoint) addServiceInfoToCluster() error {
func (ep *endpoint) addServiceInfoToCluster(sb *sandbox) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 || ep.Iface().Address() == nil {
return nil
}
Expand All @@ -593,6 +593,26 @@ func (ep *endpoint) addServiceInfoToCluster() error {
return nil
}

sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("addServiceInfoToCluster START for %s %s", ep.svcName, ep.ID())

// Check that the endpoint is still present on the sandbox before adding it to the service discovery.
// This is to handle a race between the EnableService and the sbLeave
// It is possible that the EnableService starts, fetches the list of the endpoints and
// by the time the addServiceInfoToCluster is called the endpoint got removed from the sandbox
// The risk is that the deleteServiceInfoToCluster happens before the addServiceInfoToCluster.
// This check under the Service lock of the sandbox ensure the correct behavior.
// If the addServiceInfoToCluster arrives first may find or not the endpoint and will proceed or exit
// but in any case the deleteServiceInfoToCluster will follow doing the cleanup if needed.
// In case the deleteServiceInfoToCluster arrives first, this one is happening after the endpoint is
// removed from the list, in this situation the delete will bail out not finding any data to cleanup
// and the add will bail out not finding the endpoint on the sandbox.
if e := sb.getEndpoint(ep.ID()); e == nil {
logrus.Warnf("addServiceInfoToCluster suppressing service resolution ep is not anymore in the sandbox %s", ep.ID())
return nil
}

c := n.getController()
agent := c.getAgent()

Expand All @@ -602,8 +622,7 @@ func (ep *endpoint) addServiceInfoToCluster() error {
if n.ingress {
ingressPorts = ep.ingressPorts
}

if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
if err := c.addServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Name(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "addServiceInfoToCluster"); err != nil {
return err
}
}
Expand Down Expand Up @@ -634,10 +653,12 @@ func (ep *endpoint) addServiceInfoToCluster() error {
}
}

logrus.Debugf("addServiceInfoToCluster END for %s %s", ep.svcName, ep.ID())

return nil
}

func (ep *endpoint) deleteServiceInfoFromCluster() error {
func (ep *endpoint) deleteServiceInfoFromCluster(sb *sandbox, method string) error {
if ep.isAnonymous() && len(ep.myAliases) == 0 {
return nil
}
Expand All @@ -647,6 +668,10 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
return nil
}

sb.Service.Lock()
defer sb.Service.Unlock()
logrus.Debugf("deleteServiceInfoFromCluster from %s START for %s %s", method, ep.svcName, ep.ID())

c := n.getController()
agent := c.getAgent()

Expand All @@ -655,8 +680,7 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
if n.ingress {
ingressPorts = ep.ingressPorts
}

if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.Iface().Address().IP); err != nil {
if err := c.rmServiceBinding(ep.svcName, ep.svcID, n.ID(), ep.ID(), ep.Name(), ep.virtualIP, ingressPorts, ep.svcAliases, ep.myAliases, ep.Iface().Address().IP, "deleteServiceInfoFromCluster"); err != nil {
return err
}
}
Expand All @@ -667,6 +691,8 @@ func (ep *endpoint) deleteServiceInfoFromCluster() error {
}
}

logrus.Debugf("deleteServiceInfoFromCluster from %s END for %s %s", method, ep.svcName, ep.ID())

return nil
}

Expand Down Expand Up @@ -814,58 +840,44 @@ func (c *controller) handleEpTableEvent(ev events.Event) {
value = event.Value
case networkdb.UpdateEvent:
logrus.Errorf("Unexpected update service table event = %#v", event)
}

nw, err := c.NetworkByID(nid)
if err != nil {
logrus.Errorf("Could not find network %s while handling service table event: %v", nid, err)
return
}
n := nw.(*network)

err = proto.Unmarshal(value, &epRec)
err := proto.Unmarshal(value, &epRec)
if err != nil {
logrus.Errorf("Failed to unmarshal service table value: %v", err)
return
}

name := epRec.Name
containerName := epRec.Name
svcName := epRec.ServiceName
svcID := epRec.ServiceID
vip := net.ParseIP(epRec.VirtualIP)
ip := net.ParseIP(epRec.EndpointIP)
ingressPorts := epRec.IngressPorts
aliases := epRec.Aliases
taskaliases := epRec.TaskAliases
serviceAliases := epRec.Aliases
taskAliases := epRec.TaskAliases

if name == "" || ip == nil {
if containerName == "" || ip == nil {
logrus.Errorf("Invalid endpoint name/ip received while handling service table event %s", value)
return
}

if isAdd {
logrus.Debugf("handleEpTableEvent ADD %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.addServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
if err := c.addServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed adding service binding for value %s: %v", value, err)
return
}
}

n.addSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.addSvcRecords(alias, ip, nil, true)
}
} else {
logrus.Debugf("handleEpTableEvent DEL %s R:%v", isAdd, eid, epRec)
if svcID != "" {
if err := c.rmServiceBinding(svcName, svcID, nid, eid, vip, ingressPorts, aliases, ip); err != nil {
logrus.Errorf("Failed adding service binding for value %s: %v", value, err)
if err := c.rmServiceBinding(svcName, svcID, nid, eid, containerName, vip, ingressPorts, serviceAliases, taskAliases, ip, "handleEpTableEvent"); err != nil {
logrus.Errorf("failed removing service binding for value %s: %v", value, err)
return
}
}

n.deleteSvcRecords(name, ip, nil, true)
for _, alias := range taskaliases {
n.deleteSvcRecords(alias, ip, nil, true)
}
}
}
2 changes: 1 addition & 1 deletion agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ option (gogoproto.goproto_stringer_all) = false;
// EndpointRecord specifies all the endpoint specific information that
// needs to gossiped to nodes participating in the network.
message EndpointRecord {
// Name of the endpoint
// Name of the container
string name = 1;

// Service name of the service to which this endpoint belongs.
Expand Down
16 changes: 11 additions & 5 deletions endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -597,8 +597,14 @@ func (ep *endpoint) rename(name string) error {

c := n.getController()

sb, ok := ep.getSandbox()
if !ok {
logrus.Warnf("rename for %s aborted, sandbox %s is not anymore present", ep.ID(), ep.sandboxID)
return nil
}

if c.isAgent() {
if err = ep.deleteServiceInfoFromCluster(); err != nil {
if err = ep.deleteServiceInfoFromCluster(sb, "rename"); err != nil {
return types.InternalErrorf("Could not delete service state for endpoint %s from cluster on rename: %v", ep.Name(), err)
}
} else {
Expand All @@ -617,15 +623,15 @@ func (ep *endpoint) rename(name string) error {
ep.anonymous = false

if c.isAgent() {
if err = ep.addServiceInfoToCluster(); err != nil {
if err = ep.addServiceInfoToCluster(sb); err != nil {
return types.InternalErrorf("Could not add service state for endpoint %s to cluster on rename: %v", ep.Name(), err)
}
defer func() {
if err != nil {
ep.deleteServiceInfoFromCluster()
ep.deleteServiceInfoFromCluster(sb, "rename")
ep.name = oldName
ep.anonymous = oldAnonymous
ep.addServiceInfoToCluster()
ep.addServiceInfoToCluster(sb)
}
}()
} else {
Expand Down Expand Up @@ -746,7 +752,7 @@ func (ep *endpoint) sbLeave(sb *sandbox, force bool, options ...EndpointOption)
return err
}

if e := ep.deleteServiceInfoFromCluster(); e != nil {
if e := ep.deleteServiceInfoFromCluster(sb, "sbLeave"); e != nil {
logrus.Errorf("Could not delete service state for endpoint %s from cluster: %v", ep.Name(), e)
}

Expand Down
3 changes: 2 additions & 1 deletion libnetwork_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"
"time"

"github.com/docker/libnetwork/common"
"github.com/docker/libnetwork/datastore"
"github.com/docker/libnetwork/discoverapi"
"github.com/docker/libnetwork/driverapi"
Expand Down Expand Up @@ -383,7 +384,7 @@ func TestSRVServiceQuery(t *testing.T) {
sr := svcInfo{
svcMap: make(map[string][]net.IP),
svcIPv6Map: make(map[string][]net.IP),
ipMap: make(map[string]*ipInfo),
ipMap: common.NewSetMatrix(),
service: make(map[string][]servicePorts),
}
// backing container for the service
Expand Down
Loading

0 comments on commit 41278c3

Please sign in to comment.