Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tracing improvements #31

Merged
merged 4 commits into from
Mar 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
# PR [#31](https://github.com/theplant/appkit/pull/31)

* Log request id in trace log.
* Log trace id in InfluxDB.
* Add `service-name` config for InfluxDB.

# PR [#30](https://github.com/theplant/appkit/pull/30)

## Breaking Changes
Expand Down
21 changes: 20 additions & 1 deletion monitoring/influxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type influxMonitorCfg struct {
BatchWriteInterval time.Duration
BufferSize int
MaxBufferSize int
ServiceName string
}

const (
Expand All @@ -43,6 +44,8 @@ const (
batchWriteIntervalParamName = "batch-write-interval"
bufferSizeParamName = "buffer-size"
maxBufferSizeParamName = "max-buffer-size"

serviceNameParamName = "service-name"
junhuif marked this conversation as resolved.
Show resolved Hide resolved
)

func getBufferSize(values url.Values, key string, defaultValue int) (int, error) {
Expand Down Expand Up @@ -128,19 +131,22 @@ func parseInfluxMonitorConfig(config InfluxMonitorConfig) (*influxMonitorCfg, er
BatchWriteInterval: batchWriteInterval,
BufferSize: bufferSize,
MaxBufferSize: maxBufferSize,
ServiceName: values.Get(serviceNameParamName),
}, nil
}

// NewInfluxdbMonitor creates new monitoring influxdb
// client. config URL syntax is
// `https://<username>:<password>@<influxDB host>/<database>?batch-write-interval=timeDuration&buffer-size=number&max-buffer-size=number`
// `https://<username>:<password>@<influxDB host>/<database>?batch-write-interval=timeDuration&buffer-size=number&max-buffer-size=number&service-name=name`
// batch-write-interval is optional, default is 60s,
// valid time units are "ns", "us" (or "µs"), "ms", "s", "m", "h".
// exec batch write when we haven't sent data since batch-write-interval ago
// buffer-size is optional, default is 5000.
// if buffered size reach buffer size then exec batch write.
// max-buffer-size is optional, default is 10000, it must > buffer-size,
// if the batch write fails and buffered size reach max-buffer-size then clean up the buffer (mean the data is lost).
// service-name is optional
// if set then all points will add tag service=service-name.
//
// The second return value is a function that will cause the batching
// goroutine to write buffered points, then terminate. This function
Expand Down Expand Up @@ -184,6 +190,8 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
maxBufferSize: cfg.MaxBufferSize,

done: &sync.WaitGroup{},

serviceName: cfg.ServiceName,
}

running := make(chan struct{})
Expand Down Expand Up @@ -230,6 +238,7 @@ func NewInfluxdbMonitor(config InfluxMonitorConfig, logger log.Logger) (Monitor,
"batch-write-interval", cfg.BatchWriteInterval.String(),
"buffer-size", cfg.BufferSize,
"max-buffer-size", cfg.MaxBufferSize,
"service-name", cfg.ServiceName,
)

return monitor, func() {
Expand Down Expand Up @@ -259,6 +268,8 @@ type influxdbMonitor struct {
//
// https://godoc.org/sync#WaitGroup
done *sync.WaitGroup

serviceName string
}

func (im influxdbMonitor) batchWriteDaemon(running chan struct{}) {
Expand Down Expand Up @@ -389,6 +400,14 @@ func (im influxdbMonitor) newRecord(measurement string, value interface{}, tags

fields["value"] = value

if im.serviceName != "" {
if tags == nil {
tags = map[string]string{}
}

tags["service"] = im.serviceName
}

pt, err := influxdb.NewPoint(measurement, tags, fields, at)

if err != nil {
Expand Down
80 changes: 72 additions & 8 deletions monitoring/influxdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func TestParseInfluxMonitorConfig(t *testing.T) {
}{
{
name: "default batch-write-interval, buffer-size, max-buffer-size",
config: "https://root:password@localhost:8086/local",
config: "https://root:password@localhost:8086/local?service-name=api",
expectedCfg: &influxMonitorCfg{
Scheme: "https",
Host: "localhost:8086",
Expand All @@ -69,6 +69,7 @@ func TestParseInfluxMonitorConfig(t *testing.T) {
BatchWriteInterval: defaultBatchWriteInterval,
BufferSize: defaultBufferSize,
MaxBufferSize: defaultMaxBufferSize,
ServiceName: "api",
},
},

Expand All @@ -85,6 +86,7 @@ func TestParseInfluxMonitorConfig(t *testing.T) {
BatchWriteInterval: time.Second * 30,
BufferSize: 1000,
MaxBufferSize: 5000,
ServiceName: "",
},
},

Expand Down Expand Up @@ -128,8 +130,8 @@ func TestParseInfluxMonitorConfig(t *testing.T) {
}
}

func newMonitor(client influxdb.Client, bufferSize int, maxBufferSize int) *influxdbMonitor {
monitor := &influxdbMonitor{
func newMonitor(client influxdb.Client, bufferSize int, maxBufferSize int, serviceName string) (monitor *influxdbMonitor, closeFunc func()) {
monitor = &influxdbMonitor{
database: "test_database",
client: client,
logger: log.NewNopLogger(),
Expand All @@ -140,11 +142,17 @@ func newMonitor(client influxdb.Client, bufferSize int, maxBufferSize int) *infl
maxBufferSize: maxBufferSize,

done: &sync.WaitGroup{},

serviceName: serviceName,
}

go monitor.batchWriteDaemon(nil)
running := make(chan struct{})
go monitor.batchWriteDaemon(running)

return monitor
return monitor, func() {
close(running)
monitor.done.Wait()
}
}

func insertRecords(monitor Monitor, callTimes int) {
Expand Down Expand Up @@ -174,7 +182,7 @@ func TestInfluxdbBatchWrite(t *testing.T) {
},
}

monitor := newMonitor(mockedClient, 5000, 10000)
monitor, _ := newMonitor(mockedClient, 5000, 10000, "")

insertRecords(monitor, 4000)

Expand Down Expand Up @@ -210,7 +218,7 @@ func TestInfluxdbBatchWrite__WriteFailed(t *testing.T) {
},
}

monitor := newMonitor(mockedClient, 5000, 16000)
monitor, _ := newMonitor(mockedClient, 5000, 16000, "")

insertRecords(monitor, 5000)

Expand Down Expand Up @@ -286,7 +294,7 @@ func TestInfluxdbBatchWrite__WriteFailed__BufferSizeAndMaxBufferSizeIsDefault(t
},
}

monitor := newMonitor(mockedClient, 5000, 10000)
monitor, _ := newMonitor(mockedClient, 5000, 10000, "")

insertRecords(monitor, 9000)

Expand All @@ -302,3 +310,59 @@ func TestInfluxdbBatchWrite__WriteFailed__BufferSizeAndMaxBufferSizeIsDefault(t

assertWriteCalls(t, mockedClient, 3, []int{5001, 10001, 1001})
}

func TestServiceName(t *testing.T) {
var bp influxdb.BatchPoints

mockedClient := &ClientMock{
WriteFunc: func(p influxdb.BatchPoints) error {
bp = p
return nil
},
}

// tag is nil

monitor, cf := newMonitor(mockedClient, 1, 1, "api")

monitor.InsertRecord("request", 100, nil, nil, time.Time{})
cf()

fatalassert.Equal(t, 1, len(mockedClient.WriteCalls()))
fatalassert.Equal(t, bp.Points()[0].Tags(), map[string]string{
"service": "api",
})
fatalassert.Equal(t, bp.Points()[1].Name(), "influxdb-queue-length")
fatalassert.Equal(t, bp.Points()[1].Tags(), map[string]string{
"service": "api",
})

// tag is not nil

monitor, cf = newMonitor(mockedClient, 1, 1, "api")

monitor.InsertRecord("request", 100, map[string]string{"tag1": "value1"}, nil, time.Time{})
cf()

fatalassert.Equal(t, 2, len(mockedClient.WriteCalls()))
fatalassert.Equal(t, bp.Points()[0].Tags(), map[string]string{
"tag1": "value1",
"service": "api",
})
fatalassert.Equal(t, bp.Points()[1].Tags(), map[string]string{
"service": "api",
})

// service name is empty

monitor, cf = newMonitor(mockedClient, 1, 1, "")

monitor.InsertRecord("request", 100, map[string]string{"tag1": "value1"}, nil, time.Time{})
cf()

fatalassert.Equal(t, 3, len(mockedClient.WriteCalls()))
fatalassert.Equal(t, bp.Points()[0].Tags(), map[string]string{
"tag1": "value1",
})
fatalassert.Equal(t, bp.Points()[1].Tags(), map[string]string{})
}
4 changes: 4 additions & 0 deletions monitoring/middleware.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"strconv"
"time"

"github.com/opentracing/opentracing-go"
"github.com/theplant/appkit/contexts"
"github.com/theplant/appkit/contexts/trace"
"github.com/theplant/appkit/log"
Expand Down Expand Up @@ -90,6 +91,9 @@ func fieldsForContext(ctx context.Context) map[string]interface{} {
if reqID, ok := trace.RequestTrace(ctx); ok {
fields["req_id"] = fmt.Sprintf("%v", reqID)
}
if span := opentracing.SpanFromContext(ctx); span != nil {
fields["span_context"] = fmt.Sprintf("%v", span.Context())
}

return fields
}
Expand Down
4 changes: 4 additions & 0 deletions tracing/tracing.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
opentracing "github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/ext"
"github.com/theplant/appkit/contexts"
ctxtrace "github.com/theplant/appkit/contexts/trace"
"github.com/theplant/appkit/log"
"github.com/theplant/appkit/server"
jaegercfg "github.com/uber/jaeger-client-go/config"
Expand Down Expand Up @@ -54,6 +55,9 @@ func trace(h http.Handler) http.Handler {
ext.SpanKind.Set(span, ext.SpanKindRPCServerEnum)
ext.HTTPMethod.Set(span, r.Method)
ext.HTTPUrl.Set(span, r.URL.String())
if ctxtraceID, ok := ctxtrace.RequestTrace(ctx); ok {
span.LogKV("req_id", ctxtraceID)
}

h.ServeHTTP(w, r.WithContext(ctx))
s, _ := contexts.HTTPStatus(ctx)
Expand Down