Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generate trace logs for a number of important InfluxDB operations #9456

Merged
merged 1 commit into from
Feb 21, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/influx_inspect/buildtsi/buildtsi.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error {
}

func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpName, dataDir, walDir string) error {
cmd.Logger.Info("rebuilding retention policy", zap.String("db", dbName), zap.String("rp", rpName))
cmd.Logger.Info("rebuilding retention policy", logger.Database(dbName), logger.RetentionPolicy(rpName))

fis, err := ioutil.ReadDir(dataDir)
if err != nil {
Expand All @@ -142,7 +142,7 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam
}

func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string, shardID uint64, dataDir, walDir string) error {
cmd.Logger.Info("rebuilding shard", zap.String("db", dbName), zap.String("rp", rpName), zap.Uint64("shard", shardID))
cmd.Logger.Info("rebuilding shard", logger.Database(dbName), logger.RetentionPolicy(rpName), logger.Shard(shardID))

// Check if shard already has a TSI index.
indexPath := filepath.Join(dataDir, "index")
Expand Down
111 changes: 111 additions & 0 deletions logger/fields.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package logger

import (
"time"

"github.com/influxdata/influxdb/pkg/snowflake"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

const (
// TraceIDKey is the logging context key used for identifying unique traces.
TraceIDKey = "trace_id"

// OperationNameKey is the logging context key used for identifying name of an operation.
OperationNameKey = "op.name"

// OperationEventKey is the logging context key used for identifying a notable
// event during the course of an operation.
OperationEventKey = "op.event"

// OperationElapsedKey is the logging context key used for identifying time elapsed to finish an operation.
OperationElapsedKey = "op.elapsed"

// DBInstanceKey is the logging context key used for identifying name of the relevant database.
DBInstanceKey = "db.instance"

// DBRetentionKey is the logging context key used for identifying name of the relevant retention policy.
DBRetentionKey = "db.rp"

// DBShardGroupKey is the logging context key used for identifying relevant shard group.
DBShardGroupKey = "db.shard_group"

// DBShardIDKey is the logging context key used for identifying name of the relevant shard group.
DBShardIDKey = "db.shard_id"
)
const (
eventStart = "start"
eventEnd = "end"
)

var (
gen = snowflake.New(0)
)

func nextID() string {
return gen.NextString()
}

// TraceID returns a field for tracking the trace identifier.
func TraceID(id string) zapcore.Field {
return zap.String(TraceIDKey, id)
}

// OperationName returns a field for tracking the name of an operation.
func OperationName(name string) zapcore.Field {
return zap.String(OperationNameKey, name)
}

// OperationElapsed returns a field for tracking the duration of an operation.
func OperationElapsed(d time.Duration) zapcore.Field {
return zap.Duration(OperationElapsedKey, d)
}

// OperationEventStart returns a field for tracking the start of an operation.
func OperationEventStart() zapcore.Field {
return zap.String(OperationEventKey, eventStart)
}

// OperationEventFinish returns a field for tracking the end of an operation.
func OperationEventEnd() zapcore.Field {
return zap.String(OperationEventKey, eventEnd)
}

// Database returns a field for tracking the name of a database.
func Database(name string) zapcore.Field {
return zap.String(DBInstanceKey, name)
}

// Database returns a field for tracking the name of a database.
func RetentionPolicy(name string) zapcore.Field {
return zap.String(DBRetentionKey, name)
}

// ShardGroup returns a field for tracking the shard group identifier.
func ShardGroup(id uint64) zapcore.Field {
return zap.Uint64(DBShardGroupKey, id)
}

// Shard returns a field for tracking the shard identifier.
func Shard(id uint64) zapcore.Field {
return zap.Uint64(DBShardIDKey, id)
}

// NewOperation uses the exiting log to create a new logger with context
// containing a trace id and the operation. Prior to returning, a standardized message
// is logged indicating the operation has started. The returned function should be
// called when the operation concludes in order to log a corresponding message which
// includes an elapsed time and that the operation has ended.
func NewOperation(log *zap.Logger, msg, name string, fields ...zapcore.Field) (*zap.Logger, func()) {
f := []zapcore.Field{TraceID(nextID()), OperationName(name)}
if len(fields) > 0 {
f = append(f, fields...)
}

now := time.Now()
log = log.With(f...)
log.Info(msg+" (start)", OperationEventStart())

return log, func() { log.Info(msg+" (end)", OperationEventEnd(), OperationElapsed(time.Since(now))) }
}
2 changes: 1 addition & 1 deletion logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) {
encoder,
zapcore.Lock(zapcore.AddSync(w)),
c.Level,
)), nil
), zap.Fields(zap.String("log_id", nextID()))), nil
}

func newEncoder(format string) (zapcore.Encoder, error) {
Expand Down
4 changes: 2 additions & 2 deletions monitor/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ func (m *Monitor) createInternalStorage() {
}

if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil {
m.Logger.Info("Failed to create storage", zap.String("db", m.storeDatabase), zap.Error(err))
m.Logger.Info("Failed to create storage", logger.Database(m.storeDatabase), zap.Error(err))
return
}
}
Expand All @@ -417,7 +417,7 @@ func (m *Monitor) waitUntilInterval(d time.Duration) error {
// storeStatistics writes the statistics to an InfluxDB system.
func (m *Monitor) storeStatistics() {
defer m.wg.Done()
m.Logger.Info("Storing statistics", zap.String("db", m.storeDatabase), zap.String("rp", m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))
m.Logger.Info("Storing statistics", logger.Database(m.storeDatabase), logger.RetentionPolicy(m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval))

// Wait until an even interval to start recording monitor statistics.
// If we are interrupted before the interval for some reason, exit early.
Expand Down
38 changes: 38 additions & 0 deletions pkg/snowflake/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
Snowflake ID generator
======================

This is a Go implementation of [Twitter Snowflake](https://blog.twitter.com/2010/announcing-snowflake).

The most useful aspect of these IDs is they are _roughly_ sortable and when generated
at roughly the same time, should have values in close proximity to each other.

IDs
---

Each id will be a 64-bit number represented, structured as follows:


```
6 6 5 4 3 2 1
3210987654321098765432109876543210987654321098765432109876543210

ttttttttttttttttttttttttttttttttttttttttttmmmmmmmmmmssssssssssss
```

where

* s (sequence) is a 12-bit integer that increments if called multiple times for the same millisecond
* m (machine id) is a 10-bit integer representing the server id
* t (time) is a 42-bit integer representing the current timestamp in milliseconds
the number of milliseconds to have elapsed since 1491696000000 or 2017-04-09T00:00:00Z

### String Encoding

The 64-bit unsigned integer is base-63 encoded using the following URL-safe characters, which are ordered
according to their ASCII value.

```
0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~
```

A binary sort of a list of encoded values will be correctly ordered according to the numerical representation.
107 changes: 107 additions & 0 deletions pkg/snowflake/gen.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package snowflake

import (
"fmt"
"sync"
"time"
)

const (
epoch = 1491696000000
serverBits = 10
sequenceBits = 12
serverShift = sequenceBits
timeShift = sequenceBits + serverBits
serverMax = ^(-1 << serverBits)
sequenceMask = ^(-1 << sequenceBits)
)

type Generator struct {
rw sync.Mutex
lastTimestamp uint64
machineID int
sequence int32
}

func New(machineID int) *Generator {
if machineID < 0 || machineID > serverMax {
panic(fmt.Errorf("invalid machine id; must be 0 ≤ id < %d", serverMax))
}
return &Generator{
machineID: machineID,
lastTimestamp: 0,
sequence: 0,
}
}

func (g *Generator) MachineID() int {
return g.machineID
}

func (g *Generator) Next() uint64 {
t := now()
g.rw.Lock()
if t == g.lastTimestamp {
g.sequence = (g.sequence + 1) & sequenceMask
if g.sequence == 0 {
t = g.nextMillis()
}
} else if t < g.lastTimestamp {
t = g.nextMillis()
} else {
g.sequence = 0
}
g.lastTimestamp = t
seq := g.sequence
g.rw.Unlock()

tp := (t - epoch) << timeShift
sp := uint64(g.machineID << serverShift)
n := tp | sp | uint64(seq)

return n
}

func (g *Generator) NextString() string {
var s [11]byte
encode(&s, g.Next())
return string(s[:])
}

func (g *Generator) AppendNext(s *[11]byte) {
encode(s, g.Next())
}

func (g *Generator) nextMillis() uint64 {
t := now()
for t <= g.lastTimestamp {
time.Sleep(100 * time.Microsecond)
t = now()
}
return t
}

func now() uint64 { return uint64(time.Now().UnixNano() / 1e6) }

var digits = [...]byte{
'0', '1', '2', '3', '4', '5', '6', '7', '8', '9',
'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J',
'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T',
'U', 'V', 'W', 'X', 'Y', 'Z', '_', 'a', 'b', 'c',
'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm',
'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w',
'x', 'y', 'z', '~'}

func encode(s *[11]byte, n uint64) {
s[10], n = digits[n&0x3f], n>>6
s[9], n = digits[n&0x3f], n>>6
s[8], n = digits[n&0x3f], n>>6
s[7], n = digits[n&0x3f], n>>6
s[6], n = digits[n&0x3f], n>>6
s[5], n = digits[n&0x3f], n>>6
s[4], n = digits[n&0x3f], n>>6
s[3], n = digits[n&0x3f], n>>6
s[2], n = digits[n&0x3f], n>>6
s[1], n = digits[n&0x3f], n>>6
s[0] = digits[n&0x3f]
}
68 changes: 68 additions & 0 deletions pkg/snowflake/gen_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package snowflake

import (
"fmt"
"math/rand"
"sort"
"testing"

"github.com/influxdata/influxdb/pkg/testing/assert"
)

func TestEncode(t *testing.T) {
tests := []struct {
v uint64
exp string
}{
{0x000, "00000000000"},
{0x001, "00000000001"},
{0x03f, "0000000000~"},
{0x07f, "0000000001~"},
{0xf07f07f07f07f07f, "F1~1~1~1~1~"},
}
for _, test := range tests {
t.Run(fmt.Sprintf("0x%03x→%s", test.v, test.exp), func(t *testing.T) {
var s [11]byte
encode(&s, test.v)
assert.Equal(t, string(s[:]), test.exp)
})
}
}

// TestSorting verifies numbers using base 63 encoding are ordered according to their numerical representation.
func TestSorting(t *testing.T) {
var (
vals = make([]string, 1000)
exp = make([]string, 1000)
)

for i := 0; i < len(vals); i++ {
var s [11]byte
encode(&s, uint64(i*47))
vals[i] = string(s[:])
exp[i] = string(s[:])
}

// randomize them
shuffle(len(vals), func(i, j int) {
vals[i], vals[j] = vals[j], vals[i]
})

sort.Strings(vals)
assert.Equal(t, vals, exp)
}

func BenchmarkEncode(b *testing.B) {
b.ReportAllocs()
var s [11]byte
for i := 0; i < b.N; i++ {
encode(&s, 100)
}
}

func shuffle(n int, swap func(i, j int)) {
for i := n - 1; i > 0; i-- {
j := rand.Intn(i + 1)
swap(i, j)
}
}
5 changes: 3 additions & 2 deletions services/collectd/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"collectd.org/api"
"collectd.org/network"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
"github.com/influxdata/influxdb/services/meta"
"github.com/influxdata/influxdb/tsdb"
Expand Down Expand Up @@ -395,7 +396,7 @@ func (s *Service) writePoints() {
// Will attempt to create database if not yet created.
if err := s.createInternalStorage(); err != nil {
s.Logger.Info("Required database not yet created",
zap.String("db", s.Config.Database), zap.Error(err))
logger.Database(s.Config.Database), zap.Error(err))
continue
}

Expand All @@ -404,7 +405,7 @@ func (s *Service) writePoints() {
atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch)))
} else {
s.Logger.Info("Failed to write point batch to database",
zap.String("db", s.Config.Database), zap.Error(err))
logger.Database(s.Config.Database), zap.Error(err))
atomic.AddInt64(&s.stats.BatchesTransmitFail, 1)
}
}
Expand Down
Loading