diff --git a/config.go b/config.go index 0036d2c..740bc44 100644 --- a/config.go +++ b/config.go @@ -9,7 +9,7 @@ const ( ) type config struct { - StorageType string `envconfig:"STORAGE_ADAPTER" default:"memory"` + StorageType string `envconfig:"STORAGE_ADAPTER" default:"redis"` NumLines int `envconfig:"NUMBER_OF_LINES" default:"1000"` AggregatorType string `envconfig:"AGGREGATOR_TYPE" default:"nsq"` } diff --git a/log/aggregator_factory_test.go b/log/aggregator_factory_test.go index 5e2583b..4466487 100644 --- a/log/aggregator_factory_test.go +++ b/log/aggregator_factory_test.go @@ -9,6 +9,9 @@ import ( type stubStorageAdapter struct { } +func (a *stubStorageAdapter) Start() { +} + func (a *stubStorageAdapter) Write(app string, message string) error { return nil } @@ -25,6 +28,9 @@ func (a *stubStorageAdapter) Reopen() error { return nil } +func (a *stubStorageAdapter) Stop() { +} + func TestGetUsingInvalidValues(t *testing.T) { _, err := NewAggregator("bogus", &stubStorageAdapter{}) if err == nil || err.Error() != fmt.Sprintf("Unrecognized aggregator type: '%s'", "bogus") { diff --git a/main.go b/main.go index 00e30f1..767cf44 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,8 @@ func main() { if err != nil { l.Fatal("Error creating storage adapter: ", err) } + storageAdapter.Start() + defer storageAdapter.Stop() aggregator, err := log.NewAggregator(cfg.AggregatorType, storageAdapter) if err != nil { diff --git a/manifests/deis-logger-rc.yaml b/manifests/deis-logger-rc.yaml index 764f9f2..1dbfc07 100644 --- a/manifests/deis-logger-rc.yaml +++ b/manifests/deis-logger-rc.yaml @@ -19,8 +19,11 @@ spec: image: quay.io/deis/logger:v2-beta imagePullPolicy: Always env: - - name: NSQ_HANDLER_COUNT - value: "100" + - name: DEIS_LOGGER_REDIS_PASSWORD + valueFrom: + secretKeyRef: + name: logger-redis-creds + key: password ports: - containerPort: 8088 name: http diff --git a/storage/adapter.go b/storage/adapter.go index e83e437..207777d 100644 --- a/storage/adapter.go +++ b/storage/adapter.go @@ -2,8 +2,10 @@ package storage // Adapter is an interface for pluggable components that store log messages. type Adapter interface { + Start() Write(string, string) error Read(string, int) ([]string, error) Destroy(string) error Reopen() error + Stop() } diff --git a/storage/file_adapter.go b/storage/file_adapter.go index 675a87f..52de145 100644 --- a/storage/file_adapter.go +++ b/storage/file_adapter.go @@ -22,6 +22,10 @@ func NewFileAdapter() (Adapter, error) { return &fileAdapter{files: make(map[string]*os.File)}, nil } +// Start the storage adapter-- in the case of this implementation, a no-op +func (a *fileAdapter) Start() { +} + // Write adds a log message to to an app-specific log file func (a *fileAdapter) Write(app string, message string) error { // Check first if we might actually have to add to the map of file pointers so we can avoid @@ -93,6 +97,7 @@ func (a *fileAdapter) Destroy(app string) error { return nil } +// Reopen every file referenced by this storage adapter func (a *fileAdapter) Reopen() error { // Ensure no other goroutine is trying to add a file pointer to the map of file pointers while // we're trying to clear it out @@ -102,6 +107,10 @@ func (a *fileAdapter) Reopen() error { return nil } +// Stop the storage adapter-- in the case of this implementation, a no-op +func (a *fileAdapter) Stop() { +} + func (a *fileAdapter) getFile(app string) (*os.File, error) { filePath := a.getFilePath(app) exists, err := fileExists(filePath) diff --git a/storage/redis_adapter.go b/storage/redis_adapter.go index a535127..b02592e 100644 --- a/storage/redis_adapter.go +++ b/storage/redis_adapter.go @@ -3,17 +3,75 @@ package storage import ( "fmt" "log" + "time" r "gopkg.in/redis.v3" ) +type message struct { + app string + messageBody string +} + +func newMessage(app string, messageBody string) *message { + return &message{ + app: app, + messageBody: messageBody, + } +} + +type messagePipeliner struct { + bufferSize int + messageCount int + pipeline *r.Pipeline + timeoutTicker *time.Ticker + queuedApps map[string]bool + errCh chan error +} + +func newMessagePipeliner(bufferSize int, redisClient *r.Client, errCh chan error) *messagePipeliner { + return &messagePipeliner{ + bufferSize: bufferSize, + pipeline: redisClient.Pipeline(), + timeoutTicker: time.NewTicker(time.Second), + queuedApps: map[string]bool{}, + errCh: errCh, + } +} + +func (mp *messagePipeliner) addMessage(message *message) { + if err := mp.pipeline.RPush(message.app, message.messageBody).Err(); err == nil { + mp.queuedApps[message.app] = true + mp.messageCount++ + } else { + mp.errCh <- fmt.Errorf("Error adding rpush to %s to the pipeline: %s", message.app, err) + } +} + +func (mp messagePipeliner) execPipeline() { + for app := range mp.queuedApps { + if err := mp.pipeline.LTrim(app, int64(-1*mp.bufferSize), -1).Err(); err != nil { + mp.errCh <- fmt.Errorf("Error adding ltrim of %s to the pipeline: %s", app, err) + } + } + go func() { + defer mp.pipeline.Close() + if _, err := mp.pipeline.Exec(); err != nil { + mp.errCh <- fmt.Errorf("Error executing pipeline: %s", err) + } + }() +} + type redisAdapter struct { - bufferSize int - redisClient *r.Client + started bool + bufferSize int + redisClient *r.Client + messageChannel chan *message + stopCh chan struct{} } // NewRedisStorageAdapter returns a pointer to a new instance of a redis-based storage.Adapter. -func NewRedisStorageAdapter(bufferSize int) (*redisAdapter, error) { +func NewRedisStorageAdapter(bufferSize int) (Adapter, error) { if bufferSize <= 0 { return nil, fmt.Errorf("Invalid buffer size: %d", bufferSize) } @@ -24,35 +82,59 @@ func NewRedisStorageAdapter(bufferSize int) (*redisAdapter, error) { if err != nil { return nil, err } - return &redisAdapter{ + rsa := &redisAdapter{ bufferSize: bufferSize, redisClient: r.NewClient(&r.Options{ Addr: fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort), Password: cfg.RedisPassword, // "" == no password DB: int64(cfg.RedisDB), }), - }, nil + messageChannel: make(chan *message), + stopCh: make(chan struct{}), + } + return rsa, nil } -// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics -func (a *redisAdapter) Write(app string, message string) error { - // Note: Deliberately NOT using MULTI / transactions here since in this implementation of the - // redis client, MULTI is not safe for concurrent use by multiple goroutines. It's been advised - // by the authors of the gopkg.in/redis.v3 package to just use pipelining when possible... - // and here that is technically possible. In the WORST case scenario, not having transactions - // means we may momentarily have more than the desired number of log entries in the list / - // buffer, but an LTRIM will eventually correct that, bringing the list / buffer back down to - // its desired max size. - pipeline := a.redisClient.Pipeline() - if err := pipeline.RPush(app, message).Err(); err != nil { - return err - } - if err := pipeline.LTrim(app, int64(-1*a.bufferSize), -1).Err(); err != nil { - return err - } - if _, err := pipeline.Exec(); err != nil { - return err +// Start the storage adapter. Invocations of this function are not concurrency safe and multiple +// serialized invocations have no effect. +func (a *redisAdapter) Start() { + if !a.started { + a.started = true + errCh := make(chan error) + mp := newMessagePipeliner(a.bufferSize, a.redisClient, errCh) + go func() { + for { + select { + case err := <-errCh: + log.Println(err) + case <-a.stopCh: + return + } + } + }() + go func() { + for { + select { + case message := <-a.messageChannel: + mp.addMessage(message) + if mp.messageCount == 50 { + mp.execPipeline() + mp = newMessagePipeliner(a.bufferSize, a.redisClient, errCh) + } + case <-mp.timeoutTicker.C: + mp.execPipeline() + mp = newMessagePipeliner(a.bufferSize, a.redisClient, errCh) + case <-a.stopCh: + return + } + } + }() } +} + +// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics +func (a *redisAdapter) Write(app string, messageBody string) error { + a.messageChannel <- newMessage(app, messageBody) return nil } @@ -77,7 +159,12 @@ func (a *redisAdapter) Destroy(app string) error { return nil } +// Reopen the storage adapter-- in the case of this implementation, a no-op func (a *redisAdapter) Reopen() error { - // No-op return nil } + +// Stop the storage adapter. Additional writes may not be performed after stopping. +func (a *redisAdapter) Stop() { + close(a.stopCh) +} diff --git a/storage/redis_adapter_test.go b/storage/redis_adapter_test.go index 492926e..f0f5e02 100644 --- a/storage/redis_adapter_test.go +++ b/storage/redis_adapter_test.go @@ -5,6 +5,7 @@ package storage import ( "fmt" "testing" + "time" ) func TestRedisReadFromNonExistingApp(t *testing.T) { @@ -42,12 +43,17 @@ func TestRedisLogs(t *testing.T) { if err != nil { t.Error(err) } + a.Start() + defer a.Stop() // And write a few logs to it, but do NOT fill it up for i := 0; i < 5; i++ { if err := a.Write(app, fmt.Sprintf("message %d", i)); err != nil { t.Error(err) } } + // Sleep for a bit because the adapter queues logs internally and writes them to Redis only when + // there are 50 queued up OR a 1 second timeout has been reached. + time.Sleep(time.Second * 2) // Read more logs than there are messages, err := a.Read(app, 8) if err != nil { @@ -78,6 +84,9 @@ func TestRedisLogs(t *testing.T) { t.Error(err) } } + // Sleep for a bit because the adapter queues logs internally and writes them to Redis only when + // there are 50 queued up OR a 1 second timeout has been reached. + time.Sleep(time.Second * 2) // Read more logs than the buffer can hold messages, err = a.Read(app, 20) if err != nil { @@ -101,12 +110,17 @@ func TestRedisDestroy(t *testing.T) { if err != nil { t.Error(err) } + a.Start() + defer a.Stop() // Write a log to create the file if err := a.Write(app, "Hello, log!"); err != nil { t.Error(err) } + // Sleep for a bit because the adapter queues logs internally and writes them to Redis only when + // there are 50 queued up OR a 1 second timeout has been reached. + time.Sleep(time.Second * 2) // A redis list should exist for the app - exists, err := a.redisClient.Exists(app).Result() + exists, err := a.(*redisAdapter).redisClient.Exists(app).Result() if err != nil { t.Error(err) } @@ -118,7 +132,7 @@ func TestRedisDestroy(t *testing.T) { t.Error(err) } // Now check that the redis list no longer exists - exists, err = a.redisClient.Exists(app).Result() + exists, err = a.(*redisAdapter).redisClient.Exists(app).Result() if err != nil { t.Error(err) } diff --git a/storage/ring_buffer_adapter.go b/storage/ring_buffer_adapter.go index 0ccc827..0b9d5cf 100644 --- a/storage/ring_buffer_adapter.go +++ b/storage/ring_buffer_adapter.go @@ -63,6 +63,10 @@ func NewRingBufferAdapter(bufferSize int) (Adapter, error) { return &ringBufferAdapter{bufferSize: bufferSize, ringBuffers: make(map[string]*ringBuffer)}, nil } +// Start the storage adapter-- in the case of this implementation, a no-op +func (a *ringBufferAdapter) Start() { +} + // Write adds a log message to to an app-specific ringBuffer func (a *ringBufferAdapter) Write(app string, message string) error { // Check first if we might actually have to add to the map of ringBuffer pointers so we can avoid @@ -110,7 +114,11 @@ func (a *ringBufferAdapter) Destroy(app string) error { return nil } +// Reopen the storage adapter-- in the case of this implementation, a no-op func (a *ringBufferAdapter) Reopen() error { - // No-op return nil } + +// Stop the storage adapter-- in the case of this implementation, a no-op +func (a *ringBufferAdapter) Stop() { +}