Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Check for explicitly defined Marathon port first. #1474

Merged
merged 2 commits into from
Apr 26, 2017
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Check for explicitly defined Marathon port first.
Previously, we did the check too late resulting in the traefik.port
label not being effective.

The change comes with additional refactorings in production and tests.
  • Loading branch information
timoreimann committed Apr 25, 2017
commit 099d605aedeb703480b565229faa56f4ca8994c8
109 changes: 64 additions & 45 deletions provider/marathon/marathon.go
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@ package marathon

import (
"errors"
"fmt"
"math"
"net"
"net/http"
@@ -22,6 +23,11 @@ import (
"github.com/gambol99/go-marathon"
)

const (
labelPort = "traefik.port"
labelPortIndex = "traefik.portIndex"
)

var _ provider.Provider = (*Provider)(nil)

// Provider holds configuration of the provider.
@@ -194,14 +200,23 @@ func (p *Provider) loadMarathonConfig() *types.Configuration {
func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applications, exposedByDefaultFlag bool) bool {
application, err := getApplication(task, applications.Apps)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
log.Errorf("Unable to get Marathon application %s for task %s", task.AppID, task.ID)
return false
}
ports := processPorts(application, task)
if len(ports) == 0 {
log.Debug("Filtering marathon task without port %s", task.AppID)
if _, err = processPorts(application, task); err != nil {
log.Errorf("Filtering Marathon task %s from application %s without port: %s", task.ID, application.ID, err)
return false
}

// Filter illegal port label specification.
_, hasPortIndexLabel := p.getLabel(application, labelPortIndex)
_, hasPortLabel := p.getLabel(application, labelPort)
if hasPortIndexLabel && hasPortLabel {
log.Debugf("Filtering Marathon task %s from application %s specifying both traefik.portIndex and traefik.port labels", task.ID, application.ID)
return false
}

// Filter by constraints.
label, _ := p.getLabel(application, "traefik.tags")
constraintTags := strings.Split(label, ",")
if p.MarathonLBCompatibility {
@@ -211,50 +226,29 @@ func (p *Provider) taskFilter(task marathon.Task, applications *marathon.Applica
}
if ok, failingConstraint := p.MatchConstraints(constraintTags); !ok {
if failingConstraint != nil {
log.Debugf("Application %v pruned by '%v' constraint", application.ID, failingConstraint.String())
log.Debugf("Filtering Marathon task %s from application %s pruned by '%v' constraint", task.ID, application.ID, failingConstraint.String())
}
return false
}

// Filter disabled application.
if !isApplicationEnabled(application, exposedByDefaultFlag) {
log.Debugf("Filtering disabled marathon task %s", task.AppID)
log.Debugf("Filtering disabled Marathon task %s from application %s", task.ID, application.ID)
return false
}

//filter indeterminable task port
portIndexLabel := (*application.Labels)["traefik.portIndex"]
portValueLabel := (*application.Labels)["traefik.port"]
if portIndexLabel != "" && portValueLabel != "" {
log.Debugf("Filtering marathon task %s specifying both traefik.portIndex and traefik.port labels", task.AppID)
return false
}
if portIndexLabel != "" {
index, err := strconv.Atoi((*application.Labels)["traefik.portIndex"])
if err != nil || index < 0 || index > len(ports)-1 {
log.Debugf("Filtering marathon task %s with unexpected value for traefik.portIndex label", task.AppID)
return false
}
}
if portValueLabel != "" {
_, err := strconv.Atoi((*application.Labels)["traefik.port"])
if err != nil {
log.Debugf("Filtering marathon task %s with unexpected value for traefik.port label", task.AppID)
return false
}
}

//filter healthchecks
// Filter task with existing, bad health check results.
if application.HasHealthChecks() {
if task.HasHealthCheckResults() {
for _, healthcheck := range task.HealthCheckResults {
// found one bad healthcheck, return false
if !healthcheck.Alive {
log.Debugf("Filtering marathon task %s with bad healthcheck", task.AppID)
log.Debugf("Filtering Marathon task %s from application %s with bad health check", task.ID, application.ID)
return false
}
}
}
}

return true
}

@@ -303,23 +297,16 @@ func (p *Provider) getLabel(application marathon.Application, label string) (str
func (p *Provider) getPort(task marathon.Task, applications []marathon.Application) string {
application, err := getApplication(task, applications)
if err != nil {
log.Errorf("Unable to get marathon application from task %s", task.AppID)
log.Errorf("Unable to get Marathon application %s for task %s", application.ID, task.ID)
return ""
}
ports := processPorts(application, task)
if portIndexLabel, ok := p.getLabel(application, "traefik.portIndex"); ok {
if index, err := strconv.Atoi(portIndexLabel); err == nil {
return strconv.Itoa(ports[index])
}
}
if portValueLabel, ok := p.getLabel(application, "traefik.port"); ok {
return portValueLabel
port, err := processPorts(application, task)
if err != nil {
log.Errorf("Unable to process ports for Marathon application %s and task %s: %s", application.ID, task.ID, err)
return ""
}

for _, port := range ports {
return strconv.Itoa(port)
}
return ""
return strconv.Itoa(port)
}

func (p *Provider) getWeight(task marathon.Task, applications []marathon.Application) string {
@@ -473,7 +460,39 @@ func (p *Provider) getCircuitBreakerExpression(application marathon.Application)
return "NetworkErrorRatio() > 1"
}

func processPorts(application marathon.Application, task marathon.Task) []int {
func processPorts(application marathon.Application, task marathon.Task) (int, error) {
if portLabel, ok := (*application.Labels)[labelPort]; ok {
port, err := strconv.Atoi(portLabel)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse port label: %s", err)
case port <= 0:
return 0, fmt.Errorf("explicitly specified port %d must be larger than zero", port)
}
return port, nil
}

ports := retrieveAvailablePorts(application, task)
if len(ports) == 0 {
return 0, errors.New("no port found")
}

portIndex := 0
portIndexLabel, ok := (*application.Labels)[labelPortIndex]
if ok {
var err error
portIndex, err = strconv.Atoi(portIndexLabel)
switch {
case err != nil:
return 0, fmt.Errorf("failed to parse port index label: %s", err)
case portIndex < 0, portIndex > len(ports)-1:
return 0, fmt.Errorf("port index %d must be within port range (0, %d)", portIndex, len(ports)-1)
}
}
return ports[portIndex], nil
}

func retrieveAvailablePorts(application marathon.Application, task marathon.Task) []int {
// Using default port configuration
if task.Ports != nil && len(task.Ports) > 0 {
return task.Ports
Loading