Skip to content

Commit

Permalink
Check for explicitly defined Marathon port first.
Browse files Browse the repository at this point in the history
Previously, we did the check too late resulting in the traefik.port
label not being effective.

The change comes with additional refactorings in production and tests.
  • Loading branch information
timoreimann committed Apr 25, 2017
1 parent f1bc80c commit 099d605
Show file tree
Hide file tree
Showing 3 changed files with 280 additions and 218 deletions.
109 changes: 64 additions & 45 deletions provider/marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package marathon

import (
"errors"
"fmt"
"math"
"net"
"net/http"
Expand All @@ -22,6 +23,11 @@ import (
"github.com/gambol99/go-marathon"
)

const (
labelPort = "traefik.port"
labelPortIndex = "traefik.portIndex"
)

var _ provider.Provider = (*Provider)(nil)

// Provider holds configuration of the provider.
Expand Down Expand Up @@ -194,14 +200,23 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applications, exposedByDefaultFlag bool) bool {
application, err := getApplication(task, applications.Apps)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
log.Errorf("Unable to get Marathon application %s for task %s", task.AppID, task.ID)
return false
}
ports := processPorts(application, task)
if len(ports) == 0 {
log.Debug("Filtering marathon task without port %s", task.AppID)
if _, err = processPorts(application, task); err != nil {
log.Errorf("Filtering Marathon task %s from application %s without port: %s", task.ID, application.ID, err)
return false
}

// Filter illegal port label specification.
_, hasPortIndexLabel := p.getLabel(application, labelPortIndex)
_, hasPortLabel := p.getLabel(application, labelPort)
if hasPortIndexLabel && hasPortLabel {
log.Debugf("Filtering Marathon task %s from application %s specifying both traefik.portIndex and traefik.port labels", task.ID, application.ID)
return false
}

// Filter by constraints.
label, _ := p.getLabel(application, "traefik.tags")
constraintTags := strings.Split(label, ",")
if p.MarathonLBCompatibility {
Expand All @@ -211,50 +226,29 @@ func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applica
}
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
if failingConstraint != nil {
log.Debugf("Application %v pruned by '%v' constraint", application.ID, failingConstraint.String())
log.Debugf("Filtering Marathon task %s from application %s pruned by '%v' constraint", task.ID, application.ID, failingConstraint.String())
}
return false
}

// Filter disabled application.
if !isApplicationEnabled(application, exposedByDefaultFlag) {
log.Debugf("Filtering disabled marathon task %s", task.AppID)
log.Debugf("Filtering disabled Marathon task %s from application %s", task.ID, application.ID)
return false
}

//filter indeterminable task port
portIndexLabel := (*application.Labels)["traefik.portIndex"]
portValueLabel := (*application.Labels)["traefik.port"]
if portIndexLabel != "" && portValueLabel != "" {
log.Debugf("Filtering marathon task %s specifying both traefik.portIndex and traefik.port labels", task.AppID)
return false
}
if portIndexLabel != "" {
index, err := strconv.Atoi((*application.Labels)["traefik.portIndex"])
if err != nil || index < 0 || index > len(ports)-1 {
log.Debugf("Filtering marathon task %s with unexpected value for traefik.portIndex label", task.AppID)
return false
}
}
if portValueLabel != "" {
_, err := strconv.Atoi((*application.Labels)["traefik.port"])
if err != nil {
log.Debugf("Filtering marathon task %s with unexpected value for traefik.port label", task.AppID)
return false
}
}

//filter healthchecks
// Filter task with existing, bad health check results.
if application.HasHealthChecks() {
if task.HasHealthCheckResults() {
for _, healthcheck := range task.HealthCheckResults {
// found one bad healthcheck, return false
if !healthcheck.Alive {
log.Debugf("Filtering marathon task %s with bad healthcheck", task.AppID)
log.Debugf("Filtering Marathon task %s from application %s with bad health check", task.ID, application.ID)
return false
}
}
}
}

return true
}

Expand Down Expand Up @@ -303,23 +297,16 @@ func (p *Provider) getLabel(application marathon.Application, label string) (str
func (p *Provider) getPort(task marathon.Task, applications []marathon.Application) string {
application, err := getApplication(task, applications)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
log.Errorf("Unable to get Marathon application %s for task %s", application.ID, task.ID)
return ""
}
ports := processPorts(application, task)
if portIndexLabel, ok := p.getLabel(application, "traefik.portIndex"); ok {
if index, err := strconv.Atoi(portIndexLabel); err == nil {
return strconv.Itoa(ports[index])
}
}
if portValueLabel, ok := p.getLabel(application, "traefik.port"); ok {
return portValueLabel
port, err := processPorts(application, task)
if err != nil {
log.Errorf("Unable to process ports for Marathon application %s and task %s: %s", application.ID, task.ID, err)
return ""
}

for _, port := range ports {
return strconv.Itoa(port)
}
return ""
return strconv.Itoa(port)
}

func (p *Provider) getWeight(task marathon.Task, applications []marathon.Application) string {
Expand Down Expand Up @@ -473,7 +460,39 @@ func (p *Provider) getCircuitBreakerExpression(application marathon.Application)
return "NetworkErrorRatio() > 1"
}

func processPorts(application marathon.Application, task marathon.Task) []int {
func processPorts(application marathon.Application, task marathon.Task) (int, error) {
if portLabel, ok := (*application.Labels)[labelPort]; ok {
port, err := strconv.Atoi(portLabel)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse port label: %s", err)
case port <= 0:
return 0, fmt.Errorf("explicitly specified port %d must be larger than zero", port)
}
return port, nil
}

ports := retrieveAvailablePorts(application, task)
if len(ports) == 0 {
return 0, errors.New("no port found")
}

portIndex := 0
portIndexLabel, ok := (*application.Labels)[labelPortIndex]
if ok {
var err error
portIndex, err = strconv.Atoi(portIndexLabel)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse port index label: %s", err)
case portIndex < 0, portIndex > len(ports)-1:
return 0, fmt.Errorf("port index %d must be within port range (0, %d)", portIndex, len(ports)-1)
}
}
return ports[portIndex], nil
}

func retrieveAvailablePorts(application marathon.Application, task marathon.Task) []int {
// Using default port configuration
if task.Ports != nil && len(task.Ports) > 0 {
return task.Ports
Expand Down
Loading

0 comments on commit 099d605

Please sign in to comment.