Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable gometalinter. #252

Merged
merged 1 commit into from
Jul 25, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ check-deps: deps
(go get github.com/alecthomas/gometalinter && gometalinter --install)

check: check-deps $(SOURCES) test
gometalinter . --deadline 720s --vendor -D dupl -D gotype -D errcheck -D gas -D golint -E gofmt
gometalinter ./... --deadline 720s --vendor -D dupl -D gotype -D errcheck -D gas -D golint -D aligncheck -E gofmt

format:
goimports -w -l $(APP_SOURCES)
Expand Down
10 changes: 5 additions & 5 deletions apps/app_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ func TestAppInt(t *testing.T) {
assert.Equal(t, 2, app.RegistrationIntentsNumber())

task := Task{Ports: []int{0, 1, 2, 3}}
intents := app.RegistrationIntents(&task,".")
intents := app.RegistrationIntents(&task, ".")

assert.Contains(t, intents[0].Tags, "Lorem ipsum dolor sit amet, consectetur adipiscing elit")
assert.Contains(t, intents[1].Tags, "secureConnection:true")
assert.NotContains(t, intents[0].Tags, "secureConnection:true")
assert.NotContains(t, intents[1].Tags, "Lorem ipsum dolor sit amet, consectetur adipiscing elit")
assert.Contains(t, intents[0].Tags, "Lorem ipsum dolor sit amet, consectetur adipiscing elit")
assert.Contains(t, intents[1].Tags, "secureConnection:true")
assert.NotContains(t, intents[0].Tags, "secureConnection:true")
assert.NotContains(t, intents[1].Tags, "Lorem ipsum dolor sit amet, consectetur adipiscing elit")
}

func TestParseApp(t *testing.T) {
Expand Down
7 changes: 5 additions & 2 deletions apps/task_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,10 @@ func TestId_AppId(t *testing.T) {
assert.Equal(t, AppID("/pl.allegro/test/app"), TaskID(id).AppID())
}

func TestId_AppIdForInvalid(t *testing.T) {
func TestId_AppIdForInvalidIdShouldPanic(t *testing.T) {
t.Parallel()
assert.Panics(t, func() { TaskID("id").AppID() })
assert.Panics(t, func() {
a := TaskID("id").AppID()
assert.Nil(t, a)
})
}
76 changes: 42 additions & 34 deletions consul/consul.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (c *Consul) registerMultipleServices(services []*consulapi.AgentServiceRegi
}
}

return utils.MergeErrorsOrNil(registerErrors, fmt.Sprint("registering services"))
return utils.MergeErrorsOrNil(registerErrors, "registering services")
}

func (c *Consul) register(service *consulapi.AgentServiceRegistration) error {
Expand Down Expand Up @@ -320,45 +320,53 @@ func (c *Consul) marathonToConsulChecks(task *apps.Task, healthChecks []apps.Hea
continue
}

consulCheck := consulapi.AgentServiceCheck{
Interval: fmt.Sprintf("%ds", check.IntervalSeconds),
Timeout: fmt.Sprintf("%ds", check.TimeoutSeconds),
Status: "passing",
if c := marathonToConsulCheck(task, check, serviceAddress, port); c != nil {
checks = append(checks, c)
}

switch check.Protocol {
case "HTTP", "HTTPS", "MESOS_HTTP", "MESOS_HTTPS":
path := "/"
if check.Path != "" {
path = check.Path
}
if parsedURL, err := url.ParseRequestURI(path); err == nil {
if check.Protocol == "HTTP" || check.Protocol == "MESOS_HTTP" {
parsedURL.Scheme = "http"
} else {
parsedURL.Scheme = "https"
}
parsedURL.Host = fmt.Sprintf("%s:%d", serviceAddress, port)
consulCheck.HTTP = parsedURL.String()
checks = append(checks, &consulCheck)
}
return checks
}

func marathonToConsulCheck(task *apps.Task, check apps.HealthCheck, serviceAddress string, port int) *consulapi.AgentServiceCheck {
consulCheck := &consulapi.AgentServiceCheck{
Interval: fmt.Sprintf("%ds", check.IntervalSeconds),
Timeout: fmt.Sprintf("%ds", check.TimeoutSeconds),
Status: "passing",
}

switch check.Protocol {
case "HTTP", "HTTPS", "MESOS_HTTP", "MESOS_HTTPS":
path := "/"
if check.Path != "" {
path = check.Path
}
if parsedURL, err := url.ParseRequestURI(path); err == nil {
if check.Protocol == "HTTP" || check.Protocol == "MESOS_HTTP" {
parsedURL.Scheme = "http"
} else {
log.WithError(err).
WithField("Id", task.AppID.String()).
WithField("Address", serviceAddress).
Warnf("Could not parse provided path: %s", path)
parsedURL.Scheme = "https"
}
case "TCP", "MESOS_TCP":
consulCheck.TCP = fmt.Sprintf("%s:%d", serviceAddress, port)
checks = append(checks, &consulCheck)
case "COMMAND":
consulCheck.Script = substituteEnvironment(check.Command.Value, *task)
checks = append(checks, &consulCheck)
default:
log.WithField("Id", task.AppID.String()).WithField("Address", serviceAddress).
Warnf("Unrecognized check protocol %s", check.Protocol)
parsedURL.Host = fmt.Sprintf("%s:%d", serviceAddress, port)
consulCheck.HTTP = parsedURL.String()
return consulCheck
} else {
log.WithError(err).
WithField("Id", task.AppID.String()).
WithField("Address", serviceAddress).
Warnf("Could not parse provided path: %s", path)
}
case "TCP", "MESOS_TCP":
consulCheck.TCP = fmt.Sprintf("%s:%d", serviceAddress, port)
return consulCheck
case "COMMAND":
consulCheck.Script = substituteEnvironment(check.Command.Value, *task)
return consulCheck
default:
log.WithField("Id", task.AppID.String()).WithField("Address", serviceAddress).
Warnf("Unrecognized check protocol %s", check.Protocol)
}
return checks
return nil
}

func getHealthCheckPort(check apps.HealthCheck, task apps.Task) (int, error) {
Expand Down
7 changes: 0 additions & 7 deletions events/event_handler_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package events

import (
"errors"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -405,12 +404,6 @@ func TestEventHandler_HandleHealthStatusEventWhenTaskIsNotInMarathon(t *testing.
assert.True(t, marathon.Interactions())
}

type BadReader struct{}

func (r BadReader) Read(p []byte) (int, error) {
return 0, errors.New("Some error")
}

func healthStatusChangeEventForTask(taskID string) []byte {
return []byte(`{
"appId":"/test/app",
Expand Down
11 changes: 5 additions & 6 deletions events/sse_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ import (

// Event holds state of parsed fields from marathon EventStream
type SSEEvent struct {
Type string
Body []byte
ID string
Delay string
maxLineSize int64
Type string
Body []byte
ID string
Delay string
}

var (
Expand All @@ -29,7 +28,7 @@ func (e *SSEEvent) parseLine(line []byte) bool {
// or a single U+000D CARRIAGE RETURN (CR) character.

//If the line is empty (a blank line)
if len(line) == 0 || bytes.Compare(line, lineFeed) == 0 {
if len(line) == 0 || bytes.Equal(line, lineFeed) {
//Dispatch the event, as defined below.
return !e.isEmpty()
}
Expand Down
30 changes: 15 additions & 15 deletions events/sse_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,37 +137,37 @@ var parseEventMultipleDataCases = []struct {
}{
{"\n\n\n\n\n\n\n",
[]SSEEvent{
SSEEvent{},
{},
},
},
{"event: status_update_event\ndata: testData\n\nevent: some_event\ndata: someData",
[]SSEEvent{
SSEEvent{Type: "status_update_event", Body: []byte("testData\n")},
SSEEvent{Type: "some_event", Body: []byte("someData\n")},
{Type: "status_update_event", Body: []byte("testData\n")},
{Type: "some_event", Body: []byte("someData\n")},
},
},
{"event: status_update_event\ndata: testData\n\nid: 13\ndata: someData\n\nid: 14\ndata: abc\n\nid: 15\ndata: def\n",
[]SSEEvent{
SSEEvent{Type: "status_update_event", Body: []byte("testData\n")},
SSEEvent{ID: "13", Body: []byte("someData\n")},
SSEEvent{ID: "14", Body: []byte("abc\n")},
SSEEvent{ID: "15", Body: []byte("def\n")},
{Type: "status_update_event", Body: []byte("testData\n")},
{ID: "13", Body: []byte("someData\n")},
{ID: "14", Body: []byte("abc\n")},
{ID: "15", Body: []byte("def\n")},
},
},
{"data: testData\n\ndata: someData\n\ndata: abc\n\ndata: def\n",
[]SSEEvent{
SSEEvent{Body: []byte("testData\n")},
SSEEvent{Body: []byte("someData\n")},
SSEEvent{Body: []byte("abc\n")},
SSEEvent{Body: []byte("def\n")},
{Body: []byte("testData\n")},
{Body: []byte("someData\n")},
{Body: []byte("abc\n")},
{Body: []byte("def\n")},
},
},
{"data: testData\nretry: 10\ndummy: dummy field\n\ndata: someData\n\ndata: abc\n\ndata: def\n",
[]SSEEvent{
SSEEvent{Body: []byte("testData\n"), Delay: "10"},
SSEEvent{Body: []byte("someData\n")},
SSEEvent{Body: []byte("abc\n")},
SSEEvent{Body: []byte("def\n")},
{Body: []byte("testData\n"), Delay: "10"},
{Body: []byte("someData\n")},
{Body: []byte("abc\n")},
{Body: []byte("def\n")},
},
},
}
Expand Down
10 changes: 6 additions & 4 deletions sse/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ type Stop func()
type Handler func(w http.ResponseWriter, r *http.Request)

func NewHandler(config Config, webConfig web.Config, marathon marathon.Marathoner, serviceOperations service.ServiceRegistry) (Stop, error) {
stopChannels := make([]chan<- events.StopEvent, webConfig.WorkersCount, webConfig.WorkersCount)
stopChannels := make([]chan<- events.StopEvent, webConfig.WorkersCount)
stopFunc := stop(stopChannels)
eventQueue := make(chan events.Event, webConfig.QueueSize)
for i := 0; i < webConfig.WorkersCount; i++ {
Expand All @@ -38,7 +38,7 @@ func NewHandler(config Config, webConfig web.Config, marathon marathon.Marathone
guardQuit := leaderGuard(sse.Streamer, marathon)
stopChannels = append(stopChannels, dispatcherStop, guardQuit)

return stopFunc, nil
return stop(stopChannels), nil
}

func stop(channels []chan<- events.StopEvent) Stop {
Expand All @@ -59,12 +59,13 @@ func leaderGuard(s *marathon.Streamer, m marathon.Marathoner) chan<- events.Stop
quit := make(chan events.StopEvent)

go func() {
ticker := time.Tick(5 * time.Second)
ticker := time.NewTicker(5 * time.Second)
for {
select {
case <-ticker:
case <-ticker.C:
if iAMLeader, err := m.IsLeader(); !iAMLeader && err != nil {
// Leader changed, not revocerable.
ticker.Stop()
s.Stop()
log.Error("Tearing down SSE stream, marathon leader changed.")
return
Expand All @@ -73,6 +74,7 @@ func leaderGuard(s *marathon.Streamer, m marathon.Marathoner) chan<- events.Stop
}
case <-quit:
log.Info("Recieved quit notification. Quit checker")
ticker.Stop()
s.Stop()
return
}
Expand Down
6 changes: 0 additions & 6 deletions sse/sse_handler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package sse

import (
"context"
"fmt"
"io"
"net/http"
"time"

log "github.com/Sirupsen/logrus"
Expand All @@ -18,10 +16,6 @@ import (
type SSEHandler struct {
config Config
eventQueue chan events.Event
loc string
client *http.Client
close context.CancelFunc
req *http.Request
Streamer *marathon.Streamer
maxLineSize int64
}
Expand Down
1 change: 0 additions & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ func (s *Sync) StartSyncServicesJob() {
}
}
}()
return
}

func (s *Sync) SyncServices() error {
Expand Down
2 changes: 1 addition & 1 deletion sync/sync_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func TestSync_WithDeregisteringFallback(t *testing.T) {
consulStub.Register(&task, marathonApp)
}
marathon.TasksStub = map[apps.AppID][]apps.Task{
apps.AppID("/test/app"): []apps.Task{marathonApp.Tasks[0]},
apps.AppID("/test/app"): {marathonApp.Tasks[0]},
}
sync := newSyncWithDefaultConfig(marathon, consulStub)

Expand Down
2 changes: 1 addition & 1 deletion time/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (t Timestamp) MarshalJSON() ([]byte, error) {
}

func (t *Timestamp) Delay() time.Duration {
return time.Now().Sub(t.Time)
return time.Since(t.Time)
}

func (t *Timestamp) String() string {
Expand Down
2 changes: 1 addition & 1 deletion web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ type Handler func(w http.ResponseWriter, r *http.Request)

func NewHandler(config Config, marathon marathon.Marathoner, serviceOperations service.ServiceRegistry) (Handler, Stop) {

stopChannels := make([]chan<- events.StopEvent, config.WorkersCount, config.WorkersCount)
stopChannels := make([]chan<- events.StopEvent, config.WorkersCount)
eventQueue := make(chan events.Event, config.QueueSize)
for i := 0; i < config.WorkersCount; i++ {
handler := events.NewEventHandler(i, serviceOperations, marathon, eventQueue)
Expand Down