Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move sync.Leader to marathon module #211

Merged
merged 1 commit into from
Mar 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ marathon-protocol | `http` | Marathon protocol (http or https
marathon-ssl-verify | `true` | Verify certificates when connecting via SSL
marathon-timeout | `30s` | Time limit for requests made by the Marathon HTTP client. A Timeout of zero means no timeout
marathon-username | | Marathon username for basic auth
marathon-leader | | Marathon cluster-wide node name (defaults to <hostname>:8080), the some leader specific calls will be made only if the specified node is the current Marathon-leader")
metrics-interval | `30s` | Metrics reporting interval
metrics-location | | Graphite URL (used when metrics-target is set to graphite)
metrics-prefix | `default` | Metrics prefix (default is resolved to <hostname>.<app_name>
Expand Down
2 changes: 1 addition & 1 deletion config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,14 +91,14 @@ func (config *Config) parseFlags() {
// Sync
flag.BoolVar(&config.Sync.Enabled, "sync-enabled", true, "Enable Marathon-consul scheduled sync")
flag.DurationVar(&config.Sync.Interval.Duration, "sync-interval", 15*time.Minute, "Marathon-consul sync interval")
flag.StringVar(&config.Sync.Leader, "sync-leader", "", "Marathon cluster-wide node name (defaults to <hostname>:8080), the sync will run only if the specified node is the current Marathon-leader")
flag.BoolVar(&config.Sync.Force, "sync-force", false, "Force leadership-independent Marathon-consul sync (run always)")

// Marathon
flag.StringVar(&config.Marathon.Location, "marathon-location", "localhost:8080", "Marathon URL")
flag.StringVar(&config.Marathon.Protocol, "marathon-protocol", "http", "Marathon protocol (http or https)")
flag.StringVar(&config.Marathon.Username, "marathon-username", "", "Marathon username for basic auth")
flag.StringVar(&config.Marathon.Password, "marathon-password", "", "Marathon password for basic auth")
flag.StringVar(&config.Marathon.Leader, "marathon-leader", "", "Marathon cluster-wide node name (defaults to <hostname>:8080), the some leader specific calls will be made only if the specified node is the current Marathon-leader")
flag.BoolVar(&config.Marathon.VerifySsl, "marathon-ssl-verify", true, "Verify certificates when connecting via SSL")
flag.DurationVar(&config.Marathon.Timeout.Duration, "marathon-timeout", 30*time.Second, "Time limit for requests made by the Marathon HTTP client. A Timeout of zero means no timeout")

Expand Down
2 changes: 1 addition & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func main() {

consulInstance := consul.New(config.Consul)
// TODO(tz) - move Leader from sync module to highest level config, access like config.Leader
remote, err := marathon.New(config.Marathon, config.Sync.Leader)
remote, err := marathon.New(config.Marathon)
if err != nil {
log.Fatal(err.Error())
}
Expand Down
1 change: 1 addition & 0 deletions marathon/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ type Config struct {
Protocol string
Username string
Password string
Leader string
VerifySsl bool
Timeout time.Interval
}
28 changes: 23 additions & 5 deletions marathon/marathon.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"io/ioutil"
"net/http"
"net/url"
"os"
"strings"
"time"

Expand All @@ -15,13 +16,15 @@ import (
"github.com/allegro/marathon-consul/metrics"
)

var hostname = os.Hostname

type Marathoner interface {
ConsulApps() ([]*apps.App, error)
App(apps.AppID) (*apps.App, error)
Tasks(apps.AppID) ([]apps.Task, error)
Leader() (string, error)
EventStream([]string, int, int) (*Streamer, error)
AmILeader() (bool, error)
IsLeader() (bool, error)
}

type Marathon struct {
Expand All @@ -36,7 +39,7 @@ type LeaderResponse struct {
Leader string `json:"leader"`
}

func New(config Config, leader string) (*Marathon, error) {
func New(config Config) (*Marathon, error) {
var auth *url.Userinfo
if len(config.Username) == 0 && len(config.Password) == 0 {
auth = nil
Expand All @@ -53,8 +56,8 @@ func New(config Config, leader string) (*Marathon, error) {
return &Marathon{
Location: config.Location,
Protocol: config.Protocol,
MyLeader: config.Leader,
Auth: auth,
MyLeader: leader,
client: &http.Client{
Transport: transport,
Timeout: config.Timeout.Duration,
Expand Down Expand Up @@ -142,7 +145,7 @@ func (m Marathon) leaderPoll() error {
retries := 5
i := 0
for range pollTicker {
leading, err := m.AmILeader()
leading, err := m.IsLeader()
if err != nil {
if i >= retries {
return fmt.Errorf("Failed to get a leader after %d retries", i)
Expand Down Expand Up @@ -244,7 +247,22 @@ func (m Marathon) urlWithQuery(path string, params params) string {
return marathon.String()
}

func (m Marathon) AmILeader() (bool, error) {
func (m *Marathon) IsLeader() (bool, error) {
if m.MyLeader == "" {
if err := m.resolveHostname(); err != nil {
return false, fmt.Errorf("Could not resolve hostname: %v", err)
}
}
leader, err := m.Leader()
return m.MyLeader == leader, err
}

func (m *Marathon) resolveHostname() error {
hostname, err := hostname()
if err != nil {
return err
}
m.MyLeader = fmt.Sprintf("%s:8080", hostname)
log.WithField("Leader", m.MyLeader).Info("Marathon Leader mode set to resolved hostname")
return nil
}
9 changes: 6 additions & 3 deletions marathon/marathon_stub.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ type MarathonerStub struct {
AppsStub []*apps.App
AppStub map[apps.AppID]*apps.App
TasksStub map[apps.AppID][]apps.Task
MyLeader string
leader string
interactionsMu sync.RWMutex
interactions bool
Expand Down Expand Up @@ -46,8 +47,8 @@ func (m *MarathonerStub) EventStream([]string, int, int) (*Streamer, error) {
return &Streamer{}, nil
}

func (m *MarathonerStub) AmILeader() (bool, error) {
return false, nil
func (m *MarathonerStub) IsLeader() (bool, error) {
return m.leader == m.MyLeader, nil
}

func (m *MarathonerStub) Interactions() bool {
Expand All @@ -62,9 +63,10 @@ func (m *MarathonerStub) noteInteraction() {
m.interactions = true
}

func MarathonerStubWithLeaderForApps(leader string, args ...*apps.App) *MarathonerStub {
func MarathonerStubWithLeaderForApps(leader, myLeader string, args ...*apps.App) *MarathonerStub {
stub := MarathonerStubForApps(args...)
stub.leader = leader
stub.MyLeader = myLeader
return stub
}

Expand All @@ -86,6 +88,7 @@ func MarathonerStubForApps(args ...*apps.App) *MarathonerStub {
AppsStub: args,
AppStub: appsMap,
TasksStub: tasksMap,
MyLeader: "localhost:8080",
leader: "localhost:8080",
}
}
2 changes: 1 addition & 1 deletion marathon/marathon_stub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
func TestMarathonStub(t *testing.T) {
t.Parallel()
// given
m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", utils.ConsulApp("/test/app", 3))
m := marathon.MarathonerStubWithLeaderForApps("some.host:1234", "some.host:1234", utils.ConsulApp("/test/app", 3))
// then
assert.False(t, m.Interactions())
// when
Expand Down
Loading