Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement reading from loggregator v2 api #213

Merged
merged 1 commit into from
Mar 25, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
219 changes: 196 additions & 23 deletions Gopkg.lock

Large diffs are not rendered by default.

5 changes: 4 additions & 1 deletion Gopkg.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,7 @@

[[constraint]]
branch = "master"
name = "github.com/stvp/go-udp-testing"
name = "github.com/stvp/go-udp-testing"
[[constraint]]
name = "code.cloudfoundry.org/go-loggregator"
version = "7.3.0"
13 changes: 13 additions & 0 deletions authclient/authclient_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package authclient_test

import (
"testing"

. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
)

func TestAuthclient(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Authclient Suite")
}
48 changes: 48 additions & 0 deletions authclient/http.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package authclient

import (
"crypto/tls"
"net/http"
)

type tokenFetcher interface {
GetAuthToken(clientID, secret string, skipCertVerify bool) (string, error)
}

type AuthClient struct {
tokenFetcher tokenFetcher
clientID string
secret string
skipCertVerify bool
httpClient *http.Client
}

func NewHttp(tf tokenFetcher, clientID, secret string, skipCertVerify bool) *AuthClient {
return &AuthClient{
tokenFetcher: tf,
clientID: clientID,
secret: secret,
skipCertVerify: skipCertVerify,
httpClient: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{
InsecureSkipVerify: skipCertVerify,
},
},
},
}
}

func (c *AuthClient) Do(req *http.Request) (*http.Response, error) {
token, err := c.tokenFetcher.GetAuthToken(
c.clientID,
c.secret,
c.skipCertVerify,
)
if err != nil {
return nil, err
}

req.Header.Set("Authorization", token)
return c.httpClient.Do(req)
}
62 changes: 62 additions & 0 deletions authclient/http_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package authclient_test

import (
"errors"
"github.com/cloudfoundry-community/firehose-to-syslog/authclient"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"net/http"
"net/http/httptest"
)

var _ = Describe("Http", func() {
It("inserts an auth header for any calls it makes", func (){
var token string
h := http.HandlerFunc(func (w http.ResponseWriter, r *http.Request) {
token = r.Header.Get("Authorization")
})
s := httptest.NewServer(h)


tf := newStubTokenFetcher()
tf.token = "test-token"

c := authclient.NewHttp(tf, "test-client", "test-secret", true)

req, err := http.NewRequest(http.MethodGet, s.URL, nil)
Expect(err).ToNot(HaveOccurred())
c.Do(req)

Expect(token).To(Equal("test-token"))
Expect(tf.skipCertVerify).To(BeTrue())
})

It("fails when the token fetcher has an error", func() {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))

tf := newStubTokenFetcher()
tf.err = errors.New("an Error")
c := authclient.NewHttp(tf, "test-client", "test-secret", true)

req, err := http.NewRequest(http.MethodGet, s.URL, nil)
Expect(err).ToNot(HaveOccurred())

_, err = c.Do(req)
Expect(err).To(HaveOccurred())
})
})

type stubTokenFetcher struct {
token string
err error
skipCertVerify bool
}

func newStubTokenFetcher() *stubTokenFetcher {
return &stubTokenFetcher{}
}

func (s *stubTokenFetcher) GetAuthToken(username, password string, skipCertVerify bool) (string, error) {
s.skipCertVerify = skipCertVerify
return s.token, s.err
}
47 changes: 21 additions & 26 deletions cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package main
import (
"context"
"fmt"
"github.com/cloudfoundry-community/firehose-to-syslog/authclient"
"github.com/cloudfoundry-incubator/uaago"
"log"
"os"
"os/signal"
Expand All @@ -14,7 +16,6 @@ import (
"github.com/cloudfoundry-community/firehose-to-syslog/firehoseclient"
"github.com/cloudfoundry-community/firehose-to-syslog/logging"
"github.com/cloudfoundry-community/firehose-to-syslog/stats"
"github.com/cloudfoundry-community/firehose-to-syslog/uaatokenrefresher"
"github.com/cloudfoundry-community/go-cfclient"
"gopkg.in/alecthomas/kingpin.v2"
)
Expand Down Expand Up @@ -121,13 +122,16 @@ func (cli *CLI) Run(args []string) int {
}
defer cacheStore.Close()

cachingClient := caching.NewCacheLazyFill(&caching.CFClientAdapter{
CF: cfClient,
}, cacheStore, &caching.CacheLazyFillConfig{
IgnoreMissingApps: *ignoreMissingApps,
CacheInvalidateTTL: *tickerTime,
StripAppSuffixes: strings.Split(*stripAppSuffixes, ","),
})
cachingClient := caching.NewCacheLazyFill(
&caching.CFClientAdapter{
CF: cfClient,
},
cacheStore,
&caching.CacheLazyFillConfig{
IgnoreMissingApps: *ignoreMissingApps,
CacheInvalidateTTL: *tickerTime,
StripAppSuffixes: strings.Split(*stripAppSuffixes, ","),
})

if caching.IsNeeded(*wantedEvents) {
// Bootstrap cache
Expand Down Expand Up @@ -166,25 +170,10 @@ func (cli *CLI) Run(args []string) int {
//Set extrafields if needed
events.SetExtraFields(*extraFields)

uaaRefresher, err := uaatokenrefresher.NewUAATokenRefresher(
cfClient.Endpoint.AuthEndpoint,
*clientID,
*clientSecret,
*skipSSLValidation,
)

if err != nil {
logging.LogError(fmt.Sprint("Failed connecting to Get token from UAA..", err), "")
}

firehoseConfig := &firehoseclient.FirehoseConfig{
TrafficControllerURL: cfClient.Endpoint.DopplerEndpoint,
RLPAddr: strings.Replace(cfClient.Config.ApiAddress, "api", "log-stream", 1),
InsecureSSLSkipVerify: *skipSSLValidation,
IdleTimeoutSeconds: *keepAlive,
FirehoseSubscriptionID: *subscriptionId,
MinRetryDelay: *minRetryDelay,
MaxRetryDelay: *maxRetryDelay,
MaxRetryCount: *maxRetryCount,
BufferSize: *bufferSize,
}

Expand All @@ -195,7 +184,13 @@ func (cli *CLI) Run(args []string) int {
return ExitCodeError
}

firehoseClient := firehoseclient.NewFirehoseNozzle(uaaRefresher, events, firehoseConfig, statistic)
uaa, err := uaago.NewClient(cfClient.Endpoint.AuthEndpoint)
if err != nil {
logging.LogError(fmt.Sprint("Failed connecting to Get token from UAA..", err), "")
}

ac := authclient.NewHttp(uaa, *clientID, *clientSecret, *skipSSLValidation)
firehoseClient := firehoseclient.NewFirehoseNozzle(events, firehoseConfig, statistic, ac)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -205,7 +200,7 @@ func (cli *CLI) Run(args []string) int {
cleanupDone := make(chan bool)
signal.Notify(signalChan, os.Interrupt, os.Kill)
go func() {
for _ = range signalChan {
for range signalChan {
fmt.Println("\nSignal Received, Stop reading and starting Draining...")
firehoseClient.StopReading()
cctx, tcancel := context.WithTimeout(context.TODO(), 30*time.Second)
Expand Down
81 changes: 19 additions & 62 deletions firehoseclient/firehoseclient.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package firehoseclient

import (
"code.cloudfoundry.org/go-loggregator"
"context"
"crypto/tls"
"net/http"
"sync"
"time"

Expand All @@ -12,16 +13,11 @@ import (
"github.com/cloudfoundry-community/firehose-to-syslog/diodes"
"github.com/cloudfoundry-community/firehose-to-syslog/eventRouting"
"github.com/cloudfoundry-community/firehose-to-syslog/logging"
"github.com/cloudfoundry-community/firehose-to-syslog/uaatokenrefresher"
"github.com/cloudfoundry/noaa/consumer"
noaerrors "github.com/cloudfoundry/noaa/errors"
"github.com/cloudfoundry/sonde-go/events"

"github.com/gorilla/websocket"
)

type FirehoseNozzle struct {
errs <-chan error
Readerrs chan error
messages <-chan *events.Envelope
consumer *consumer.Consumer
Expand All @@ -32,15 +28,16 @@ type FirehoseNozzle struct {
stopReading chan struct{}
stopRouting chan struct{}
Stats *stats.Stats
httpClient doer
}

type doer interface {
Do(req *http.Request) (*http.Response, error)
}

type FirehoseConfig struct {
MinRetryDelay time.Duration
MaxRetryDelay time.Duration
MaxRetryCount int
TrafficControllerURL string
RLPAddr string
InsecureSSLSkipVerify bool
IdleTimeoutSeconds time.Duration
FirehoseSubscriptionID string
BufferSize int
}
Expand All @@ -49,23 +46,24 @@ var (
wg sync.WaitGroup
)

func NewFirehoseNozzle(uaaR *uaatokenrefresher.UAATokenRefresher,
func NewFirehoseNozzle(
eventRouting eventRouting.EventRouting,
firehoseconfig *FirehoseConfig,
stats *stats.Stats) *FirehoseNozzle {
stats *stats.Stats,
httpClient doer,
) *FirehoseNozzle {
return &FirehoseNozzle{
errs: make(<-chan error),
Readerrs: make(chan error),
messages: make(<-chan *events.Envelope),
eventRouting: eventRouting,
config: firehoseconfig,
uaaRefresher: uaaR,
envelopeBuffer: diodes.NewOneToOneEnvelope(firehoseconfig.BufferSize, gendiodes.AlertFunc(func(missed int) {
logging.LogError("Missed Logs ", missed)
})),
stopReading: make(chan struct{}),
stopRouting: make(chan struct{}),
Stats: stats,
httpClient: httpClient,
}
}

Expand All @@ -86,16 +84,12 @@ func (f *FirehoseNozzle) StopReading() {
}

func (f *FirehoseNozzle) consumeFirehose() {
f.consumer = consumer.New(
f.config.TrafficControllerURL,
&tls.Config{InsecureSkipVerify: f.config.InsecureSSLSkipVerify},
nil)
f.consumer.RefreshTokenFrom(f.uaaRefresher)
f.consumer.SetIdleTimeout(f.config.IdleTimeoutSeconds)
f.consumer.SetMinRetryDelay(f.config.MinRetryDelay)
f.consumer.SetMaxRetryDelay(f.config.MaxRetryDelay)
f.consumer.SetMaxRetryCount(f.config.MaxRetryCount)
f.messages, f.errs = f.consumer.Firehose(f.config.FirehoseSubscriptionID, "")
rlpGatewayClient := loggregator.NewRLPGatewayClient(
f.config.RLPAddr,
loggregator.WithRLPGatewayHTTPClient(f.httpClient),
)
a := NewV2Adapter(rlpGatewayClient)
f.messages = a.Firehose(f.config.FirehoseSubscriptionID)
}

func (f *FirehoseNozzle) ReadLogsBuffer(ctx context.Context) {
Expand All @@ -115,12 +109,10 @@ func (f *FirehoseNozzle) ReadLogsBuffer(ctx context.Context) {
time.Sleep(1 * time.Millisecond)
continue
}
f.handleMessage(envelope)
f.eventRouting.RouteEvent(envelope)
f.Stats.Dec(stats.SubInputBuffer)
}
}

}

func (f *FirehoseNozzle) Draining(ctx context.Context) {
Expand All @@ -136,7 +128,6 @@ func (f *FirehoseNozzle) Draining(ctx context.Context) {
logging.LogStd("Finishing Draining", true)
return
}
f.handleMessage(envelope)
f.eventRouting.RouteEvent(envelope)
f.Stats.Dec(stats.SubInputBuffer)
}
Expand All @@ -154,13 +145,6 @@ func (f *FirehoseNozzle) routeEvent(ctx context.Context) {
f.envelopeBuffer.Set(envelope)
f.Stats.Inc(stats.SubInputBuffer)
}
case err := <-f.errs:
f.handleError(err)
retryrerr := f.handleError(err)
if !retryrerr {
logging.LogError("RouteEvent Loop Error ", err)
return
}
case <-ctx.Done():
logging.LogStd("Closing routing event routine", true)
return
Expand All @@ -169,31 +153,4 @@ func (f *FirehoseNozzle) routeEvent(ctx context.Context) {
return
}
}

}

func (f *FirehoseNozzle) handleError(err error) bool {
logging.LogError("Error while reading from the Firehose: ", err)

switch err.(type) {
case noaerrors.RetryError:
switch noaRetryError := err.(noaerrors.RetryError).Err.(type) {
case *websocket.CloseError:
switch noaRetryError.Code {
case websocket.ClosePolicyViolation:
logging.LogError("Nozzle couldn't keep up. Please try scaling up the Nozzle.", err)
}
}
return true
}

logging.LogStd("Closing connection with Firehose...", true)
f.consumer.Close()
return false
}

func (f *FirehoseNozzle) handleMessage(envelope *events.Envelope) {
if envelope.GetEventType() == events.Envelope_CounterEvent && envelope.CounterEvent.GetName() == "TruncatingBuffer.DroppedMessages" && envelope.GetOrigin() == "doppler" {
logging.LogStd("We've intercepted an upstream message which indicates that the nozzle or the TrafficController is not keeping up. Please try scaling up the nozzle.", true)
}
}
Loading