Skip to content

Commit

Permalink
Merge pull request #100 from krancour/aggresive-pipelining
Browse files Browse the repository at this point in the history
Optimize redis storage adapter and make default
  • Loading branch information
krancour authored Jul 18, 2016
2 parents dcd4077 + 0c82466 commit 25fb250
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 30 deletions.
2 changes: 1 addition & 1 deletion config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (
)

type config struct {
StorageType string `envconfig:"STORAGE_ADAPTER" default:"memory"`
StorageType string `envconfig:"STORAGE_ADAPTER" default:"redis"`
NumLines int `envconfig:"NUMBER_OF_LINES" default:"1000"`
AggregatorType string `envconfig:"AGGREGATOR_TYPE" default:"nsq"`
}
Expand Down
6 changes: 6 additions & 0 deletions log/aggregator_factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (
type stubStorageAdapter struct {
}

func (a *stubStorageAdapter) Start() {
}

func (a *stubStorageAdapter) Write(app string, message string) error {
return nil
}
Expand All @@ -25,6 +28,9 @@ func (a *stubStorageAdapter) Reopen() error {
return nil
}

func (a *stubStorageAdapter) Stop() {
}

func TestGetUsingInvalidValues(t *testing.T) {
_, err := NewAggregator("bogus", &stubStorageAdapter{})
if err == nil || err.Error() != fmt.Sprintf("Unrecognized aggregator type: '%s'", "bogus") {
Expand Down
2 changes: 2 additions & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ func main() {
if err != nil {
l.Fatal("Error creating storage adapter: ", err)
}
storageAdapter.Start()
defer storageAdapter.Stop()

aggregator, err := log.NewAggregator(cfg.AggregatorType, storageAdapter)
if err != nil {
Expand Down
7 changes: 5 additions & 2 deletions manifests/deis-logger-rc.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ spec:
image: quay.io/deis/logger:v2-beta
imagePullPolicy: Always
env:
- name: NSQ_HANDLER_COUNT
value: "100"
- name: DEIS_LOGGER_REDIS_PASSWORD
valueFrom:
secretKeyRef:
name: logger-redis-creds
key: password
ports:
- containerPort: 8088
name: http
Expand Down
2 changes: 2 additions & 0 deletions storage/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@ package storage

// Adapter is an interface for pluggable components that store log messages.
type Adapter interface {
Start()
Write(string, string) error
Read(string, int) ([]string, error)
Destroy(string) error
Reopen() error
Stop()
}
9 changes: 9 additions & 0 deletions storage/file_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ func NewFileAdapter() (Adapter, error) {
return &fileAdapter{files: make(map[string]*os.File)}, nil
}

// Start the storage adapter-- in the case of this implementation, a no-op
func (a *fileAdapter) Start() {
}

// Write adds a log message to to an app-specific log file
func (a *fileAdapter) Write(app string, message string) error {
// Check first if we might actually have to add to the map of file pointers so we can avoid
Expand Down Expand Up @@ -93,6 +97,7 @@ func (a *fileAdapter) Destroy(app string) error {
return nil
}

// Reopen every file referenced by this storage adapter
func (a *fileAdapter) Reopen() error {
// Ensure no other goroutine is trying to add a file pointer to the map of file pointers while
// we're trying to clear it out
Expand All @@ -102,6 +107,10 @@ func (a *fileAdapter) Reopen() error {
return nil
}

// Stop the storage adapter-- in the case of this implementation, a no-op
func (a *fileAdapter) Stop() {
}

func (a *fileAdapter) getFile(app string) (*os.File, error) {
filePath := a.getFilePath(app)
exists, err := fileExists(filePath)
Expand Down
135 changes: 111 additions & 24 deletions storage/redis_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,75 @@ package storage
import (
"fmt"
"log"
"time"

r "gopkg.in/redis.v3"
)

type message struct {
app string
messageBody string
}

func newMessage(app string, messageBody string) *message {
return &message{
app: app,
messageBody: messageBody,
}
}

type messagePipeliner struct {
bufferSize int
messageCount int
pipeline *r.Pipeline
timeoutTicker *time.Ticker
queuedApps map[string]bool
errCh chan error
}

func newMessagePipeliner(bufferSize int, redisClient *r.Client, errCh chan error) *messagePipeliner {
return &messagePipeliner{
bufferSize: bufferSize,
pipeline: redisClient.Pipeline(),
timeoutTicker: time.NewTicker(time.Second),
queuedApps: map[string]bool{},
errCh: errCh,
}
}

func (mp *messagePipeliner) addMessage(message *message) {
if err := mp.pipeline.RPush(message.app, message.messageBody).Err(); err == nil {
mp.queuedApps[message.app] = true
mp.messageCount++
} else {
mp.errCh <- fmt.Errorf("Error adding rpush to %s to the pipeline: %s", message.app, err)
}
}

func (mp messagePipeliner) execPipeline() {
for app := range mp.queuedApps {
if err := mp.pipeline.LTrim(app, int64(-1*mp.bufferSize), -1).Err(); err != nil {
mp.errCh <- fmt.Errorf("Error adding ltrim of %s to the pipeline: %s", app, err)
}
}
go func() {
defer mp.pipeline.Close()
if _, err := mp.pipeline.Exec(); err != nil {
mp.errCh <- fmt.Errorf("Error executing pipeline: %s", err)
}
}()
}

type redisAdapter struct {
bufferSize int
redisClient *r.Client
started bool
bufferSize int
redisClient *r.Client
messageChannel chan *message
stopCh chan struct{}
}

// NewRedisStorageAdapter returns a pointer to a new instance of a redis-based storage.Adapter.
func NewRedisStorageAdapter(bufferSize int) (*redisAdapter, error) {
func NewRedisStorageAdapter(bufferSize int) (Adapter, error) {
if bufferSize <= 0 {
return nil, fmt.Errorf("Invalid buffer size: %d", bufferSize)
}
Expand All @@ -24,35 +82,59 @@ func NewRedisStorageAdapter(bufferSize int) (*redisAdapter, error) {
if err != nil {
return nil, err
}
return &redisAdapter{
rsa := &redisAdapter{
bufferSize: bufferSize,
redisClient: r.NewClient(&r.Options{
Addr: fmt.Sprintf("%s:%d", cfg.RedisHost, cfg.RedisPort),
Password: cfg.RedisPassword, // "" == no password
DB: int64(cfg.RedisDB),
}),
}, nil
messageChannel: make(chan *message),
stopCh: make(chan struct{}),
}
return rsa, nil
}

// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics
func (a *redisAdapter) Write(app string, message string) error {
// Note: Deliberately NOT using MULTI / transactions here since in this implementation of the
// redis client, MULTI is not safe for concurrent use by multiple goroutines. It's been advised
// by the authors of the gopkg.in/redis.v3 package to just use pipelining when possible...
// and here that is technically possible. In the WORST case scenario, not having transactions
// means we may momentarily have more than the desired number of log entries in the list /
// buffer, but an LTRIM will eventually correct that, bringing the list / buffer back down to
// its desired max size.
pipeline := a.redisClient.Pipeline()
if err := pipeline.RPush(app, message).Err(); err != nil {
return err
}
if err := pipeline.LTrim(app, int64(-1*a.bufferSize), -1).Err(); err != nil {
return err
}
if _, err := pipeline.Exec(); err != nil {
return err
// Start the storage adapter. Invocations of this function are not concurrency safe and multiple
// serialized invocations have no effect.
func (a *redisAdapter) Start() {
if !a.started {
a.started = true
errCh := make(chan error)
mp := newMessagePipeliner(a.bufferSize, a.redisClient, errCh)
go func() {
for {
select {
case err := <-errCh:
log.Println(err)
case <-a.stopCh:
return
}
}
}()
go func() {
for {
select {
case message := <-a.messageChannel:
mp.addMessage(message)
if mp.messageCount == 50 {
mp.execPipeline()
mp = newMessagePipeliner(a.bufferSize, a.redisClient, errCh)
}
case <-mp.timeoutTicker.C:
mp.execPipeline()
mp = newMessagePipeliner(a.bufferSize, a.redisClient, errCh)
case <-a.stopCh:
return
}
}
}()
}
}

// Write adds a log message to to an app-specific list in redis using ring-buffer-like semantics
func (a *redisAdapter) Write(app string, messageBody string) error {
a.messageChannel <- newMessage(app, messageBody)
return nil
}

Expand All @@ -77,7 +159,12 @@ func (a *redisAdapter) Destroy(app string) error {
return nil
}

// Reopen the storage adapter-- in the case of this implementation, a no-op
func (a *redisAdapter) Reopen() error {
// No-op
return nil
}

// Stop the storage adapter. Additional writes may not be performed after stopping.
func (a *redisAdapter) Stop() {
close(a.stopCh)
}
18 changes: 16 additions & 2 deletions storage/redis_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package storage
import (
"fmt"
"testing"
"time"
)

func TestRedisReadFromNonExistingApp(t *testing.T) {
Expand Down Expand Up @@ -42,12 +43,17 @@ func TestRedisLogs(t *testing.T) {
if err != nil {
t.Error(err)
}
a.Start()
defer a.Stop()
// And write a few logs to it, but do NOT fill it up
for i := 0; i < 5; i++ {
if err := a.Write(app, fmt.Sprintf("message %d", i)); err != nil {
t.Error(err)
}
}
// Sleep for a bit because the adapter queues logs internally and writes them to Redis only when
// there are 50 queued up OR a 1 second timeout has been reached.
time.Sleep(time.Second * 2)
// Read more logs than there are
messages, err := a.Read(app, 8)
if err != nil {
Expand Down Expand Up @@ -78,6 +84,9 @@ func TestRedisLogs(t *testing.T) {
t.Error(err)
}
}
// Sleep for a bit because the adapter queues logs internally and writes them to Redis only when
// there are 50 queued up OR a 1 second timeout has been reached.
time.Sleep(time.Second * 2)
// Read more logs than the buffer can hold
messages, err = a.Read(app, 20)
if err != nil {
Expand All @@ -101,12 +110,17 @@ func TestRedisDestroy(t *testing.T) {
if err != nil {
t.Error(err)
}
a.Start()
defer a.Stop()
// Write a log to create the file
if err := a.Write(app, "Hello, log!"); err != nil {
t.Error(err)
}
// Sleep for a bit because the adapter queues logs internally and writes them to Redis only when
// there are 50 queued up OR a 1 second timeout has been reached.
time.Sleep(time.Second * 2)
// A redis list should exist for the app
exists, err := a.redisClient.Exists(app).Result()
exists, err := a.(*redisAdapter).redisClient.Exists(app).Result()
if err != nil {
t.Error(err)
}
Expand All @@ -118,7 +132,7 @@ func TestRedisDestroy(t *testing.T) {
t.Error(err)
}
// Now check that the redis list no longer exists
exists, err = a.redisClient.Exists(app).Result()
exists, err = a.(*redisAdapter).redisClient.Exists(app).Result()
if err != nil {
t.Error(err)
}
Expand Down
10 changes: 9 additions & 1 deletion storage/ring_buffer_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,10 @@ func NewRingBufferAdapter(bufferSize int) (Adapter, error) {
return &ringBufferAdapter{bufferSize: bufferSize, ringBuffers: make(map[string]*ringBuffer)}, nil
}

// Start the storage adapter-- in the case of this implementation, a no-op
func (a *ringBufferAdapter) Start() {
}

// Write adds a log message to to an app-specific ringBuffer
func (a *ringBufferAdapter) Write(app string, message string) error {
// Check first if we might actually have to add to the map of ringBuffer pointers so we can avoid
Expand Down Expand Up @@ -110,7 +114,11 @@ func (a *ringBufferAdapter) Destroy(app string) error {
return nil
}

// Reopen the storage adapter-- in the case of this implementation, a no-op
func (a *ringBufferAdapter) Reopen() error {
// No-op
return nil
}

// Stop the storage adapter-- in the case of this implementation, a no-op
func (a *ringBufferAdapter) Stop() {
}

0 comments on commit 25fb250

Please sign in to comment.