Skip to content

Commit

Permalink
feature: use ringBuffer to make log not blocking and configurable
Browse files Browse the repository at this point in the history
Signed-off-by: Starnop <starnop@163.com>
  • Loading branch information
starnop committed Nov 7, 2018
1 parent 26b6d5f commit c6e6f14
Show file tree
Hide file tree
Showing 6 changed files with 167 additions and 33 deletions.
22 changes: 22 additions & 0 deletions daemon/containerio/io.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/alibaba/pouch/daemon/logger"
"github.com/alibaba/pouch/daemon/logger/crilog"
"github.com/alibaba/pouch/daemon/logger/logbuffer"
"github.com/alibaba/pouch/pkg/multierror"
"github.com/alibaba/pouch/pkg/streams"

Expand Down Expand Up @@ -47,6 +48,9 @@ type IO struct {
logdriver logger.LogDriver
logcopier *logger.LogCopier
criLog *crilog.Log

nonBlock bool
maxBufferSize int64
}

// NewIO return IO instance.
Expand Down Expand Up @@ -87,6 +91,16 @@ func (ctrio *IO) SetLogDriver(logdriver logger.LogDriver) {
ctrio.logdriver = logdriver
}

// SetMaxBufferSize set the max size of buffer.
func (ctrio *IO) SetMaxBufferSize(maxBufferSize int64) {
ctrio.maxBufferSize = maxBufferSize
}

// SetNonBlock whether to cache the container's logs with buffer.
func (ctrio *IO) SetNonBlock(nonBlock bool) {
ctrio.nonBlock = nonBlock
}

// Stream is used to export the stream field.
func (ctrio *IO) Stream() *streams.Stream {
return ctrio.stream
Expand Down Expand Up @@ -188,6 +202,14 @@ func (ctrio *IO) startLogging() error {
return nil
}

if ctrio.nonBlock {
logDriver, err := logbuffer.NewBufferLog(ctrio.logdriver, ctrio.maxBufferSize)
if err != nil {
return err
}
ctrio.logdriver = logDriver
}

ctrio.logcopier = logger.NewLogCopier(ctrio.logdriver, map[string]io.Reader{
"stdout": ctrio.stream.NewStdoutPipe(),
"stderr": ctrio.stream.NewStderrPipe(),
Expand Down
10 changes: 6 additions & 4 deletions pkg/ringbuffer/list.go → daemon/logger/logbuffer/list.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
package ringbuffer
package logbuffer

import (
"sync"

"github.com/alibaba/pouch/daemon/logger"
)

var elemPool = &sync.Pool{New: func() interface{} { return new(element) }}

type element struct {
next, prev *element
val interface{}
val *logger.LogMessage
}

func (e *element) reset() {
Expand All @@ -34,7 +36,7 @@ func (q *queue) size() int {
return q.count
}

func (q *queue) enqueue(val interface{}) {
func (q *queue) enqueue(val *logger.LogMessage) {
elem := elemPool.Get().(*element)
elem.val = val

Expand All @@ -47,7 +49,7 @@ func (q *queue) enqueue(val interface{}) {
q.count++
}

func (q *queue) dequeue() interface{} {
func (q *queue) dequeue() *logger.LogMessage {
if q.size() == 0 {
return nil
}
Expand Down
65 changes: 65 additions & 0 deletions daemon/logger/logbuffer/logbuff.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package logbuffer

import (
"github.com/alibaba/pouch/daemon/logger"

"github.com/sirupsen/logrus"
)

// BufferLog is uses to cache the container's logs with ringBuffer.
type LogBuffer struct {
ringBuffer *RingBuffer
logger logger.LogDriver
}

// NewBufferLog return a new BufferLog.
func NewBufferLog(logDriver logger.LogDriver, maxBytes int64) (logger.LogDriver, error) {
bl := &LogBuffer{
logger: logDriver,
ringBuffer: New(-1, maxBytes),
}

// use a goroutine to write logs continuously with specified log driver
go bl.run()
return bl, nil
}

// Name return the log driver's name.
func (bl *LogBuffer) Name() string {
return bl.logger.Name()
}

// WriteLogMessage will write the LogMessage to the ringBuffer.
func (bl *LogBuffer) WriteLogMessage(msg *logger.LogMessage) error {
covered, err := bl.ringBuffer.Push(msg)
if covered {
logrus.Debugf("data coverage occurs: %v", msg)
}
return err
}

// Close close the ringBuffer and drain the messages.
func (bl *LogBuffer) Close() error {
bl.ringBuffer.Close()
for _, msg := range bl.ringBuffer.Drain() {
if err := bl.logger.WriteLogMessage(msg); err != nil {
logrus.Debugf("failed to write log %v when closing with log driver %s", msg, bl.logger.Name())
}
}

return bl.logger.Close()
}

// write logs continuously with specified log driver from ringBuffer.
func (bl *LogBuffer) run() {
for {
msg, err := bl.ringBuffer.Pop()
if err != nil {
return
}

if err := bl.logger.WriteLogMessage(msg); err != nil {
logrus.Debugf("failed to write log %v with log driver %s", msg, bl.logger.Name())
}
}
}
39 changes: 29 additions & 10 deletions pkg/ringbuffer/ringbuff.go → daemon/logger/logbuffer/ringbuff.go
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
package ringbuffer
package logbuffer

import (
"fmt"
"github.com/alibaba/pouch/daemon/logger"
"sync"
)

// ErrClosed is used to indicate the ringbuffer has been closed.
var ErrClosed = fmt.Errorf("closed")

const defaultSize = 1024
const (
defaultSize = 1024
defaultMaxBytes = 1e6 //1MB
)

// RingBuffer implements a fixed-size buffer which will drop oldest data if full.
type RingBuffer struct {
Expand All @@ -18,26 +22,33 @@ type RingBuffer struct {
cap int
closed bool
q *queue

maxBytes int64
currentBytes int64
}

// New creates new RingBuffer.
func New(cap int) *RingBuffer {
func New(cap int, maxBytes int64) *RingBuffer {
if cap <= 0 {
cap = defaultSize
}
if maxBytes < 0 {
maxBytes = defaultMaxBytes
}

rb := &RingBuffer{
cap: cap,
closed: false,
q: newQueue(),
cap: cap,
closed: false,
q: newQueue(),
maxBytes: maxBytes,
}
rb.wait = sync.NewCond(&rb.mu)
return rb
}

// Push pushes value into buffer and return whether it covers the oldest data
// or not.
func (rb *RingBuffer) Push(val interface{}) (bool, error) {
func (rb *RingBuffer) Push(val *logger.LogMessage) (bool, error) {
rb.mu.Lock()
defer rb.mu.Unlock()

Expand All @@ -49,6 +60,12 @@ func (rb *RingBuffer) Push(val interface{}) (bool, error) {
return false, nil
}

msgLength := int64(len(val.Line))
if (rb.currentBytes + msgLength) > rb.maxBytes {
rb.wait.Broadcast()
return false, nil
}

// drop the oldest element if covered
covered := (rb.q.size() == rb.cap)
if covered {
Expand All @@ -63,7 +80,7 @@ func (rb *RingBuffer) Push(val interface{}) (bool, error) {
// Pop pops the value in the buffer.
//
// NOTE: it returns ErrClosed if the buffer has been closed.
func (rb *RingBuffer) Pop() (interface{}, error) {
func (rb *RingBuffer) Pop() (*logger.LogMessage, error) {
rb.mu.Lock()
for rb.q.size() == 0 && !rb.closed {
rb.wait.Wait()
Expand All @@ -75,23 +92,25 @@ func (rb *RingBuffer) Pop() (interface{}, error) {
}

val := rb.q.dequeue()
rb.currentBytes -= int64(len(val.Line))
rb.mu.Unlock()
return val, nil
}

// Drain returns all the data in the buffer.
//
// NOTE: it can be used after closed to make sure the data have been consumed.
func (rb *RingBuffer) Drain() []interface{} {
func (rb *RingBuffer) Drain() []*logger.LogMessage {
rb.mu.Lock()
defer rb.mu.Unlock()

size := rb.q.size()
vals := make([]interface{}, 0, size)
vals := make([]*logger.LogMessage, 0, size)

for i := 0; i < size; i++ {
vals = append(vals, rb.q.dequeue())
}
rb.currentBytes = 0
return vals
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,34 +1,36 @@
package ringbuffer
package logbuffer

import (
"github.com/alibaba/pouch/daemon/logger"
"reflect"
"strconv"
"sync"
"testing"
"time"
)

func TestPushNormal(t *testing.T) {
count := 5
rb := New(count)
rb := New(count, defaultMaxBytes)

// make the buffer full
for i := 0; i < count; i++ {
covered, err := rb.Push(i)
covered, err := rb.Push(wrapLog(i))
assertHelper(t, false, covered, "unexpected to drop data")
assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err)
}

// continue to push new data
for i := 0; i < count; i++ {
covered, err := rb.Push(i + count)
covered, err := rb.Push(wrapLog(i + count))
assertHelper(t, true, covered, "expected to drop data, but not")
assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err)
}

// check the buffer data
expectedDump := make([]interface{}, 0, count)
expectedDump := make([]*logger.LogMessage, 0, count)
for i := 0; i < count; i++ {
expectedDump = append(expectedDump, count+i)
expectedDump = append(expectedDump, wrapLog(count+i))
}

got := rb.Drain()
Expand All @@ -37,11 +39,11 @@ func TestPushNormal(t *testing.T) {

func TestPopNormal(t *testing.T) {
count := 5
rb := New(count)
rb := New(count, defaultMaxBytes)

// make the buffer full
for i := 0; i < count; i++ {
covered, err := rb.Push(i)
covered, err := rb.Push(wrapLog(i))
assertHelper(t, false, covered, "unexpected to drop data")
assertHelper(t, nil, err, "unexpected error during push non-closed queue: %v", err)
}
Expand All @@ -59,56 +61,56 @@ func TestPopNormal(t *testing.T) {

func TestPushAndPop(t *testing.T) {
count := 5
rb := New(count)
rb := New(count, defaultMaxBytes)

for _, v := range []int{1, 3, 5} {
rb.Push(v)
rb.Push(wrapLog(v))
}

{
// get 1 without error
val, err := rb.Pop()
assertHelper(t, val, 1, "expected to get 1, but got %v", val)
assertHelper(t, val, wrapLog(1), "expected to get 1, but got %v", val)
assertHelper(t, nil, err, "unexpected error during pop: %v", err)
}

// push 4, [3, 5, 4]
rb.Push(4)
rb.Push(wrapLog(4))

{
// get 3 without error
val, err := rb.Pop()
assertHelper(t, val, 3, "expected to get 3, but got %v", val)
assertHelper(t, val, wrapLog(3), "expected to get 3, but got %v", val)
assertHelper(t, nil, err, "unexpected error during pop: %v", err)
}

// push 2, [5, 4, 2]
rb.Push(2)
rb.Push(wrapLog(2))

{
// get 5 without error
val, err := rb.Pop()
assertHelper(t, val, 5, "expected to get 5, but got %v", val)
assertHelper(t, val, wrapLog(5), "expected to get 5, but got %v", val)
assertHelper(t, nil, err, "unexpected error during pop: %v", err)
}

rb.Close()

{
// get error if push data into closed buffer
_, err := rb.Push(0)
_, err := rb.Push(wrapLog(0))
assertHelper(t, ErrClosed, err,
"expected to get error(%v) when push data into closed buffer, but got error(%v)", ErrClosed, err)
}

// check the buffer data
expectedDump, got := []interface{}{4, 2}, rb.Drain()
expectedDump, got := []*logger.LogMessage{wrapLog(4), wrapLog(2)}, rb.Drain()
assertHelper(t, expectedDump, got, "expected return %v, but got %v", expectedDump, got)
}

func TestPopWaitWhenNotData(t *testing.T) {
count := 5
rb := New(count)
rb := New(count, defaultMaxBytes)

var (
wg sync.WaitGroup
Expand Down Expand Up @@ -144,3 +146,9 @@ func assertHelper(t *testing.T, expected, got interface{}, format string, args .
t.FailNow()
}
}

func wrapLog(num int) *logger.LogMessage {
return &logger.LogMessage{
Line: []byte(strconv.Itoa(num)),
}
}
Loading

0 comments on commit c6e6f14

Please sign in to comment.