diff --git a/cmd/influx_inspect/buildtsi/buildtsi.go b/cmd/influx_inspect/buildtsi/buildtsi.go index 46d60b3d404..897472075a3 100644 --- a/cmd/influx_inspect/buildtsi/buildtsi.go +++ b/cmd/influx_inspect/buildtsi/buildtsi.go @@ -115,7 +115,7 @@ func (cmd *Command) processDatabase(dbName, dataDir, walDir string) error { } func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpName, dataDir, walDir string) error { - cmd.Logger.Info("rebuilding retention policy", zap.String("db", dbName), zap.String("rp", rpName)) + cmd.Logger.Info("rebuilding retention policy", logger.Database(dbName), logger.RetentionPolicy(rpName)) fis, err := ioutil.ReadDir(dataDir) if err != nil { @@ -142,7 +142,7 @@ func (cmd *Command) processRetentionPolicy(sfile *tsdb.SeriesFile, dbName, rpNam } func (cmd *Command) processShard(sfile *tsdb.SeriesFile, dbName, rpName string, shardID uint64, dataDir, walDir string) error { - cmd.Logger.Info("rebuilding shard", zap.String("db", dbName), zap.String("rp", rpName), zap.Uint64("shard", shardID)) + cmd.Logger.Info("rebuilding shard", logger.Database(dbName), logger.RetentionPolicy(rpName), logger.Shard(shardID)) // Check if shard already has a TSI index. indexPath := filepath.Join(dataDir, "index") diff --git a/logger/fields.go b/logger/fields.go new file mode 100644 index 00000000000..d40fef1f378 --- /dev/null +++ b/logger/fields.go @@ -0,0 +1,111 @@ +package logger + +import ( + "time" + + "github.com/influxdata/influxdb/pkg/snowflake" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +const ( + // TraceIDKey is the logging context key used for identifying unique traces. + TraceIDKey = "trace_id" + + // OperationNameKey is the logging context key used for identifying name of an operation. + OperationNameKey = "op.name" + + // OperationEventKey is the logging context key used for identifying a notable + // event during the course of an operation. + OperationEventKey = "op.event" + + // OperationElapsedKey is the logging context key used for identifying time elapsed to finish an operation. + OperationElapsedKey = "op.elapsed" + + // DBInstanceKey is the logging context key used for identifying name of the relevant database. + DBInstanceKey = "db.instance" + + // DBRetentionKey is the logging context key used for identifying name of the relevant retention policy. + DBRetentionKey = "db.rp" + + // DBShardGroupKey is the logging context key used for identifying relevant shard group. + DBShardGroupKey = "db.shard_group" + + // DBShardIDKey is the logging context key used for identifying name of the relevant shard group. + DBShardIDKey = "db.shard_id" +) +const ( + eventStart = "start" + eventEnd = "end" +) + +var ( + gen = snowflake.New(0) +) + +func nextID() string { + return gen.NextString() +} + +// TraceID returns a field for tracking the trace identifier. +func TraceID(id string) zapcore.Field { + return zap.String(TraceIDKey, id) +} + +// OperationName returns a field for tracking the name of an operation. +func OperationName(name string) zapcore.Field { + return zap.String(OperationNameKey, name) +} + +// OperationElapsed returns a field for tracking the duration of an operation. +func OperationElapsed(d time.Duration) zapcore.Field { + return zap.Duration(OperationElapsedKey, d) +} + +// OperationEventStart returns a field for tracking the start of an operation. +func OperationEventStart() zapcore.Field { + return zap.String(OperationEventKey, eventStart) +} + +// OperationEventFinish returns a field for tracking the end of an operation. +func OperationEventEnd() zapcore.Field { + return zap.String(OperationEventKey, eventEnd) +} + +// Database returns a field for tracking the name of a database. +func Database(name string) zapcore.Field { + return zap.String(DBInstanceKey, name) +} + +// Database returns a field for tracking the name of a database. +func RetentionPolicy(name string) zapcore.Field { + return zap.String(DBRetentionKey, name) +} + +// ShardGroup returns a field for tracking the shard group identifier. +func ShardGroup(id uint64) zapcore.Field { + return zap.Uint64(DBShardGroupKey, id) +} + +// Shard returns a field for tracking the shard identifier. +func Shard(id uint64) zapcore.Field { + return zap.Uint64(DBShardIDKey, id) +} + +// NewOperation uses the exiting log to create a new logger with context +// containing a trace id and the operation. Prior to returning, a standardized message +// is logged indicating the operation has started. The returned function should be +// called when the operation concludes in order to log a corresponding message which +// includes an elapsed time and that the operation has ended. +func NewOperation(log *zap.Logger, msg, name string, fields ...zapcore.Field) (*zap.Logger, func()) { + f := []zapcore.Field{TraceID(nextID()), OperationName(name)} + if len(fields) > 0 { + f = append(f, fields...) + } + + now := time.Now() + log = log.With(f...) + log.Info(msg+" (start)", OperationEventStart()) + + return log, func() { log.Info(msg+" (end)", OperationEventEnd(), OperationElapsed(time.Since(now))) } +} diff --git a/logger/logger.go b/logger/logger.go index 8e61c4cb86b..44dc39cfbfa 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -45,7 +45,7 @@ func (c *Config) New(defaultOutput io.Writer) (*zap.Logger, error) { encoder, zapcore.Lock(zapcore.AddSync(w)), c.Level, - )), nil + ), zap.Fields(zap.String("log_id", nextID()))), nil } func newEncoder(format string) (zapcore.Encoder, error) { diff --git a/monitor/service.go b/monitor/service.go index 60aa95068b2..e88d446d064 100644 --- a/monitor/service.go +++ b/monitor/service.go @@ -390,7 +390,7 @@ func (m *Monitor) createInternalStorage() { } if _, err := m.MetaClient.CreateDatabaseWithRetentionPolicy(m.storeDatabase, &spec); err != nil { - m.Logger.Info("Failed to create storage", zap.String("db", m.storeDatabase), zap.Error(err)) + m.Logger.Info("Failed to create storage", logger.Database(m.storeDatabase), zap.Error(err)) return } } @@ -417,7 +417,7 @@ func (m *Monitor) waitUntilInterval(d time.Duration) error { // storeStatistics writes the statistics to an InfluxDB system. func (m *Monitor) storeStatistics() { defer m.wg.Done() - m.Logger.Info("Storing statistics", zap.String("db", m.storeDatabase), zap.String("rp", m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval)) + m.Logger.Info("Storing statistics", logger.Database(m.storeDatabase), logger.RetentionPolicy(m.storeRetentionPolicy), logger.DurationLiteral("interval", m.storeInterval)) // Wait until an even interval to start recording monitor statistics. // If we are interrupted before the interval for some reason, exit early. diff --git a/pkg/snowflake/README.md b/pkg/snowflake/README.md new file mode 100644 index 00000000000..92166b2361b --- /dev/null +++ b/pkg/snowflake/README.md @@ -0,0 +1,38 @@ +Snowflake ID generator +====================== + +This is a Go implementation of [Twitter Snowflake](https://blog.twitter.com/2010/announcing-snowflake). + +The most useful aspect of these IDs is they are _roughly_ sortable and when generated +at roughly the same time, should have values in close proximity to each other. + +IDs +--- + +Each id will be a 64-bit number represented, structured as follows: + + +``` +6 6 5 4 3 2 1 +3210987654321098765432109876543210987654321098765432109876543210 + +ttttttttttttttttttttttttttttttttttttttttttmmmmmmmmmmssssssssssss +``` + +where + +* s (sequence) is a 12-bit integer that increments if called multiple times for the same millisecond +* m (machine id) is a 10-bit integer representing the server id +* t (time) is a 42-bit integer representing the current timestamp in milliseconds + the number of milliseconds to have elapsed since 1491696000000 or 2017-04-09T00:00:00Z + +### String Encoding + +The 64-bit unsigned integer is base-63 encoded using the following URL-safe characters, which are ordered +according to their ASCII value. + +``` +0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ_abcdefghijklmnopqrstuvwxyz~ +``` + +A binary sort of a list of encoded values will be correctly ordered according to the numerical representation. \ No newline at end of file diff --git a/pkg/snowflake/gen.go b/pkg/snowflake/gen.go new file mode 100644 index 00000000000..1e327fbd8fc --- /dev/null +++ b/pkg/snowflake/gen.go @@ -0,0 +1,107 @@ +package snowflake + +import ( + "fmt" + "sync" + "time" +) + +const ( + epoch = 1491696000000 + serverBits = 10 + sequenceBits = 12 + serverShift = sequenceBits + timeShift = sequenceBits + serverBits + serverMax = ^(-1 << serverBits) + sequenceMask = ^(-1 << sequenceBits) +) + +type Generator struct { + rw sync.Mutex + lastTimestamp uint64 + machineID int + sequence int32 +} + +func New(machineID int) *Generator { + if machineID < 0 || machineID > serverMax { + panic(fmt.Errorf("invalid machine id; must be 0 ≤ id < %d", serverMax)) + } + return &Generator{ + machineID: machineID, + lastTimestamp: 0, + sequence: 0, + } +} + +func (g *Generator) MachineID() int { + return g.machineID +} + +func (g *Generator) Next() uint64 { + t := now() + g.rw.Lock() + if t == g.lastTimestamp { + g.sequence = (g.sequence + 1) & sequenceMask + if g.sequence == 0 { + t = g.nextMillis() + } + } else if t < g.lastTimestamp { + t = g.nextMillis() + } else { + g.sequence = 0 + } + g.lastTimestamp = t + seq := g.sequence + g.rw.Unlock() + + tp := (t - epoch) << timeShift + sp := uint64(g.machineID << serverShift) + n := tp | sp | uint64(seq) + + return n +} + +func (g *Generator) NextString() string { + var s [11]byte + encode(&s, g.Next()) + return string(s[:]) +} + +func (g *Generator) AppendNext(s *[11]byte) { + encode(s, g.Next()) +} + +func (g *Generator) nextMillis() uint64 { + t := now() + for t <= g.lastTimestamp { + time.Sleep(100 * time.Microsecond) + t = now() + } + return t +} + +func now() uint64 { return uint64(time.Now().UnixNano() / 1e6) } + +var digits = [...]byte{ + '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', + 'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', + 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', + 'U', 'V', 'W', 'X', 'Y', 'Z', '_', 'a', 'b', 'c', + 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', + 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', + 'x', 'y', 'z', '~'} + +func encode(s *[11]byte, n uint64) { + s[10], n = digits[n&0x3f], n>>6 + s[9], n = digits[n&0x3f], n>>6 + s[8], n = digits[n&0x3f], n>>6 + s[7], n = digits[n&0x3f], n>>6 + s[6], n = digits[n&0x3f], n>>6 + s[5], n = digits[n&0x3f], n>>6 + s[4], n = digits[n&0x3f], n>>6 + s[3], n = digits[n&0x3f], n>>6 + s[2], n = digits[n&0x3f], n>>6 + s[1], n = digits[n&0x3f], n>>6 + s[0] = digits[n&0x3f] +} diff --git a/pkg/snowflake/gen_test.go b/pkg/snowflake/gen_test.go new file mode 100644 index 00000000000..bd1dd28d832 --- /dev/null +++ b/pkg/snowflake/gen_test.go @@ -0,0 +1,68 @@ +package snowflake + +import ( + "fmt" + "math/rand" + "sort" + "testing" + + "github.com/influxdata/influxdb/pkg/testing/assert" +) + +func TestEncode(t *testing.T) { + tests := []struct { + v uint64 + exp string + }{ + {0x000, "00000000000"}, + {0x001, "00000000001"}, + {0x03f, "0000000000~"}, + {0x07f, "0000000001~"}, + {0xf07f07f07f07f07f, "F1~1~1~1~1~"}, + } + for _, test := range tests { + t.Run(fmt.Sprintf("0x%03x→%s", test.v, test.exp), func(t *testing.T) { + var s [11]byte + encode(&s, test.v) + assert.Equal(t, string(s[:]), test.exp) + }) + } +} + +// TestSorting verifies numbers using base 63 encoding are ordered according to their numerical representation. +func TestSorting(t *testing.T) { + var ( + vals = make([]string, 1000) + exp = make([]string, 1000) + ) + + for i := 0; i < len(vals); i++ { + var s [11]byte + encode(&s, uint64(i*47)) + vals[i] = string(s[:]) + exp[i] = string(s[:]) + } + + // randomize them + shuffle(len(vals), func(i, j int) { + vals[i], vals[j] = vals[j], vals[i] + }) + + sort.Strings(vals) + assert.Equal(t, vals, exp) +} + +func BenchmarkEncode(b *testing.B) { + b.ReportAllocs() + var s [11]byte + for i := 0; i < b.N; i++ { + encode(&s, 100) + } +} + +func shuffle(n int, swap func(i, j int)) { + for i := n - 1; i > 0; i-- { + j := rand.Intn(i + 1) + swap(i, j) + } +} diff --git a/services/collectd/service.go b/services/collectd/service.go index e1c8fd89524..05d515ba5c0 100644 --- a/services/collectd/service.go +++ b/services/collectd/service.go @@ -15,6 +15,7 @@ import ( "collectd.org/api" "collectd.org/network" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" @@ -395,7 +396,7 @@ func (s *Service) writePoints() { // Will attempt to create database if not yet created. if err := s.createInternalStorage(); err != nil { s.Logger.Info("Required database not yet created", - zap.String("db", s.Config.Database), zap.Error(err)) + logger.Database(s.Config.Database), zap.Error(err)) continue } @@ -404,7 +405,7 @@ func (s *Service) writePoints() { atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { s.Logger.Info("Failed to write point batch to database", - zap.String("db", s.Config.Database), zap.Error(err)) + logger.Database(s.Config.Database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } } diff --git a/services/continuous_querier/service.go b/services/continuous_querier/service.go index 8feaf47715e..b0e019532fc 100644 --- a/services/continuous_querier/service.go +++ b/services/continuous_querier/service.go @@ -360,13 +360,20 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti return false, fmt.Errorf("unable to set time range: %s", err) } - var start time.Time + var ( + start time.Time + log = s.Logger + ) if s.loggingEnabled || s.queryStatsEnabled { start = time.Now() } if s.loggingEnabled { - s.Logger.Info("Executing continuous query", + var logEnd func() + log, logEnd = logger.NewOperation(s.Logger, "Continuous query execution", "continuous_querier.execute") + defer logEnd() + + log.Info("Executing continuous query", zap.String("name", cq.Info.Name), zap.Time("start", startTime), zap.Time("end", endTime)) @@ -391,7 +398,7 @@ func (s *Service) ExecuteContinuousQuery(dbi *meta.DatabaseInfo, cqi *meta.Conti } if s.loggingEnabled { - s.Logger.Info("Finished continuous query", + log.Info("Finished continuous query", zap.String("name", cq.Info.Name), zap.Int64("written", written), zap.Time("start", startTime), diff --git a/services/graphite/service.go b/services/graphite/service.go index b86aaccd679..dea3d8ddba4 100644 --- a/services/graphite/service.go +++ b/services/graphite/service.go @@ -463,7 +463,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { s.logger.Info("Failed to write point batch to database", - zap.String("db", s.database), zap.Error(err)) + logger.Database(s.database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } diff --git a/services/httpd/handler.go b/services/httpd/handler.go index 3a4d3e1fc4e..62b242f1b3d 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -24,6 +24,7 @@ import ( "github.com/gogo/protobuf/proto" "github.com/golang/snappy" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/monitor/diagnostics" @@ -421,7 +422,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U h.Logger.Info("Unauthorized request", zap.String("user", err.User), zap.Stringer("query", err.Query), - zap.String("db", err.Database)) + logger.Database(err.Database)) } h.httpError(rw, "error authorizing query: "+err.Error(), http.StatusForbidden) return @@ -981,7 +982,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met h.Logger.Info("Unauthorized request", zap.String("user", err.User), zap.Stringer("query", err.Query), - zap.String("db", err.Database)) + logger.Database(err.Database)) } h.httpError(w, "error authorizing query: "+err.Error(), http.StatusForbidden) return diff --git a/services/meta/client.go b/services/meta/client.go index 7bc4f032565..fa9a7289928 100644 --- a/services/meta/client.go +++ b/services/meta/client.go @@ -18,6 +18,7 @@ import ( "time" "github.com/influxdata/influxdb" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxql" "go.uber.org/zap" @@ -800,9 +801,9 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error { // if it already exists, continue if sg, _ := data.ShardGroupByTimestamp(di.Name, rp.Name, nextShardGroupTime); sg != nil { c.logger.Info("Shard group already exists", - zap.Uint64("id", sg.ID), - zap.String("db", di.Name), - zap.String("rp", rp.Name)) + logger.ShardGroup(sg.ID), + logger.Database(di.Name), + logger.RetentionPolicy(rp.Name)) continue } newGroup, err := createShardGroup(data, di.Name, rp.Name, nextShardGroupTime) @@ -813,9 +814,9 @@ func (c *Client) PrecreateShardGroups(from, to time.Time) error { } changed = true c.logger.Info("New shard group successfully precreated", - zap.Uint64("group_id", newGroup.ID), - zap.String("db", di.Name), - zap.String("rp", rp.Name)) + logger.ShardGroup(newGroup.ID), + logger.Database(di.Name), + logger.RetentionPolicy(rp.Name)) } } } diff --git a/services/opentsdb/service.go b/services/opentsdb/service.go index 73c979af279..041d69fe7f0 100644 --- a/services/opentsdb/service.go +++ b/services/opentsdb/service.go @@ -15,6 +15,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" @@ -464,7 +465,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { case batch := <-batcher.Out(): // Will attempt to create database if not yet created. if err := s.createInternalStorage(); err != nil { - s.Logger.Info("Required database does not yet exist", zap.String("db", s.Database), zap.Error(err)) + s.Logger.Info("Required database does not yet exist", logger.Database(s.Database), zap.Error(err)) continue } @@ -473,7 +474,7 @@ func (s *Service) processBatches(batcher *tsdb.PointBatcher) { atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { s.Logger.Info("Failed to write point batch to database", - zap.String("db", s.Database), zap.Error(err)) + logger.Database(s.Database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } } diff --git a/services/retention/service.go b/services/retention/service.go index 0b191296aa3..8b8d140643d 100644 --- a/services/retention/service.go +++ b/services/retention/service.go @@ -80,7 +80,7 @@ func (s *Service) run() { return case <-ticker.C: - s.logger.Info("Shard deletion check commencing") + log, logEnd := logger.NewOperation(s.logger, "Retention policy deletion check", "retention.delete_check") type deletionInfo struct { db string @@ -98,19 +98,19 @@ func (s *Service) run() { for _, r := range d.RetentionPolicies { for _, g := range r.ExpiredShardGroups(time.Now().UTC()) { if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil { - s.logger.Info("Failed to delete shard group", - zap.Uint64("id", g.ID), - zap.String("db", d.Name), - zap.String("rp", r.Name), + log.Info("Failed to delete shard group", + logger.Database(d.Name), + logger.ShardGroup(g.ID), + logger.RetentionPolicy(r.Name), zap.Error(err)) retryNeeded = true continue } - s.logger.Info("Deleted shard group", - zap.Uint64("id", g.ID), - zap.String("db", d.Name), - zap.String("rp", r.Name)) + log.Info("Deleted shard group", + logger.Database(d.Name), + logger.ShardGroup(g.ID), + logger.RetentionPolicy(r.Name)) // Store all the shard IDs that may possibly need to be removed locally. for _, sh := range g.Shards { @@ -124,29 +124,31 @@ func (s *Service) run() { for _, id := range s.TSDBStore.ShardIDs() { if info, ok := deletedShardIDs[id]; ok { if err := s.TSDBStore.DeleteShard(id); err != nil { - s.logger.Info("Failed to delete shard", - zap.Uint64("id", id), - zap.String("db", info.db), - zap.String("rp", info.rp), + log.Info("Failed to delete shard", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp), zap.Error(err)) retryNeeded = true continue } - s.logger.Info("Deleted shard", - zap.Uint64("id", id), - zap.String("db", info.db), - zap.String("rp", info.rp)) + log.Info("Deleted shard", + logger.Database(info.db), + logger.Shard(id), + logger.RetentionPolicy(info.rp)) } } if err := s.MetaClient.PruneShardGroups(); err != nil { - s.logger.Info("Problem pruning shard groups", zap.Error(err)) + log.Info("Problem pruning shard groups", zap.Error(err)) retryNeeded = true } if retryNeeded { - s.logger.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) + log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval))) } + + logEnd() } } } diff --git a/services/subscriber/service.go b/services/subscriber/service.go index 74afc153cdf..a460cb8a61c 100644 --- a/services/subscriber/service.go +++ b/services/subscriber/service.go @@ -11,6 +11,7 @@ import ( "time" "github.com/influxdata/influxdb/coordinator" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/monitor" "github.com/influxdata/influxdb/services/meta" @@ -315,8 +316,8 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) { } s.subs[se] = cw s.Logger.Info("Added new subscription", - zap.String("db", se.db), - zap.String("rp", se.rp)) + logger.Database(se.db), + logger.RetentionPolicy(se.rp)) } } } @@ -330,8 +331,8 @@ func (s *Service) updateSubs(wg *sync.WaitGroup) { // Remove it from the set delete(s.subs, se) s.Logger.Info("Deleted old subscription", - zap.String("db", se.db), - zap.String("rp", se.rp)) + logger.Database(se.db), + logger.RetentionPolicy(se.rp)) } } } diff --git a/services/udp/service.go b/services/udp/service.go index 69e7bad9a05..8db1332849b 100644 --- a/services/udp/service.go +++ b/services/udp/service.go @@ -8,6 +8,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/tsdb" @@ -161,7 +162,7 @@ func (s *Service) writer() { // Will attempt to create database if not yet created. if err := s.createInternalStorage(); err != nil { s.Logger.Info("Required database does not yet exist", - zap.String("db", s.config.Database), zap.Error(err)) + logger.Database(s.config.Database), zap.Error(err)) continue } @@ -170,7 +171,7 @@ func (s *Service) writer() { atomic.AddInt64(&s.stats.PointsTransmitted, int64(len(batch))) } else { s.Logger.Info("Failed to write point batch to database", - zap.String("db", s.config.Database), zap.Error(err)) + logger.Database(s.config.Database), zap.Error(err)) atomic.AddInt64(&s.stats.BatchesTransmitFail, 1) } diff --git a/tsdb/engine/tsm1/engine.go b/tsdb/engine/tsm1/engine.go index a349266f0ad..e3ee6b35735 100644 --- a/tsdb/engine/tsm1/engine.go +++ b/tsdb/engine/tsm1/engine.go @@ -20,6 +20,7 @@ import ( "sync/atomic" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" @@ -1558,24 +1559,22 @@ func (e *Engine) WriteSnapshot() error { // Lock and grab the cache snapshot along with all the closed WAL // filenames associated with the snapshot - var started *time.Time + started := time.Now() + log, logEnd := logger.NewOperation(e.logger, "Cache snapshot", "cache.snapshot") defer func() { - if started != nil { - e.Cache.UpdateCompactTime(time.Since(*started)) - e.logger.Info("Snapshot for path written", - zap.String("path", e.path), - zap.Duration("duration", time.Since(*started))) - } + elapsed := time.Since(started) + e.Cache.UpdateCompactTime(elapsed) + e.logger.Info("Snapshot for path written", + zap.String("path", e.path), + zap.Duration("duration", elapsed)) + logEnd() }() closedFiles, snapshot, err := func() ([]string, *Cache, error) { e.mu.Lock() defer e.mu.Unlock() - now := time.Now() - started = &now - if err := e.WAL.CloseSegment(); err != nil { return nil, nil, err } @@ -1611,7 +1610,7 @@ func (e *Engine) WriteSnapshot() error { zap.String("path", e.path), zap.Duration("duration", time.Since(dedup))) - return e.writeSnapshotAndCommit(closedFiles, snapshot) + return e.writeSnapshotAndCommit(log, closedFiles, snapshot) } // CreateSnapshot will create a temp directory that holds @@ -1633,7 +1632,7 @@ func (e *Engine) CreateSnapshot() (string, error) { } // writeSnapshotAndCommit will write the passed cache to a new TSM file and remove the closed WAL segments. -func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) (err error) { +func (e *Engine) writeSnapshotAndCommit(log *zap.Logger, closedFiles []string, snapshot *Cache) (err error) { defer func() { if err != nil { e.Cache.ClearSnapshot(false) @@ -1643,7 +1642,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // write the new snapshot files newFiles, err := e.Compactor.WriteSnapshot(snapshot) if err != nil { - e.logger.Info("Error writing snapshot from compactor", zap.Error(err)) + log.Info("Error writing snapshot from compactor", zap.Error(err)) return err } @@ -1652,7 +1651,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( // update the file store with these new files if err := e.FileStore.Replace(nil, newFiles); err != nil { - e.logger.Info("Error adding new TSM files from snapshot", zap.Error(err)) + log.Info("Error adding new TSM files from snapshot", zap.Error(err)) return err } @@ -1660,7 +1659,7 @@ func (e *Engine) writeSnapshotAndCommit(closedFiles []string, snapshot *Cache) ( e.Cache.ClearSnapshot(true) if err := e.WAL.Remove(closedFiles); err != nil { - e.logger.Info("Error removing closed WAL segments", zap.Error(err)) + log.Info("Error removing closed WAL segments", zap.Error(err)) } return nil @@ -1913,9 +1912,12 @@ func (s *compactionStrategy) Apply() { func (s *compactionStrategy) compactGroup() { group := s.group start := time.Now() - s.logger.Info("Beginning compaction", zap.Int("files", len(group))) + log, logEnd := logger.NewOperation(s.logger, "TSM compaction", "tsm1.compact_group") + defer logEnd() + + log.Info("Beginning compaction", zap.Int("files", len(group))) for i, f := range group { - s.logger.Info("Compacting file", zap.Int("index", i), zap.String("file", f)) + log.Info("Compacting file", zap.Int("index", i), zap.String("file", f)) } var ( @@ -1932,7 +1934,7 @@ func (s *compactionStrategy) compactGroup() { if err != nil { _, inProgress := err.(errCompactionInProgress) if err == errCompactionsDisabled || inProgress { - s.logger.Info("Aborted compaction", zap.Error(err)) + log.Info("Aborted compaction", zap.Error(err)) if _, ok := err.(errCompactionInProgress); ok { time.Sleep(time.Second) @@ -1940,23 +1942,23 @@ func (s *compactionStrategy) compactGroup() { return } - s.logger.Info("Error compacting TSM files", zap.Error(err)) + log.Info("Error compacting TSM files", zap.Error(err)) atomic.AddInt64(s.errorStat, 1) time.Sleep(time.Second) return } if err := s.fileStore.ReplaceWithCallback(group, files, nil); err != nil { - s.logger.Info("Error replacing new TSM files", zap.Error(err)) + log.Info("Error replacing new TSM files", zap.Error(err)) atomic.AddInt64(s.errorStat, 1) time.Sleep(time.Second) return } for i, f := range files { - s.logger.Info("Compacted file", zap.Int("index", i), zap.String("file", f)) + log.Info("Compacted file", zap.Int("index", i), zap.String("file", f)) } - s.logger.Info("Finished compacting files", + log.Info("Finished compacting files", zap.Int("groups", len(group)), zap.Int("files", len(files)), zap.Duration("duration", time.Since(start))) diff --git a/tsdb/index/tsi1/partition.go b/tsdb/index/tsi1/partition.go index 02f442dea20..3b7c4918bc8 100644 --- a/tsdb/index/tsi1/partition.go +++ b/tsdb/index/tsi1/partition.go @@ -1,7 +1,6 @@ package tsi1 import ( - "crypto/rand" "encoding/json" "errors" "fmt" @@ -14,6 +13,7 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/bytesutil" "github.com/influxdata/influxdb/pkg/estimator" @@ -887,12 +887,13 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch assert(level > 0, "cannot compact level zero") // Build a logger for this compaction. - logger := i.logger.With(zap.String("token", generateCompactionToken())) + log, logEnd := logger.NewOperation(i.logger, "TSI level compaction", "index.tsi.compact_to_level", zap.Int("tsi_level", level)) + defer logEnd() // Check for cancellation. select { case <-interrupt: - logger.Error("cannot begin compaction", zap.Error(ErrCompactionInterrupted)) + log.Error("Cannot begin compaction", zap.Error(ErrCompactionInterrupted)) return default: } @@ -909,12 +910,12 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch path := filepath.Join(i.path, FormatIndexFileName(i.NextSequence(), level)) f, err := os.Create(path) if err != nil { - logger.Error("cannot create compaction files", zap.Error(err)) + log.Error("Cannot create compaction files", zap.Error(err)) return } defer f.Close() - logger.Info("performing full compaction", + log.Info("Performing full compaction", zap.String("src", joinIntSlice(IndexFiles(files).IDs(), ",")), zap.String("dst", path), ) @@ -923,13 +924,13 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch lvl := i.levels[level] n, err := IndexFiles(files).CompactTo(f, i.sfile, lvl.M, lvl.K, interrupt) if err != nil { - logger.Error("cannot compact index files", zap.Error(err)) + log.Error("Cannot compact index files", zap.Error(err)) return } // Close file. if err := f.Close(); err != nil { - logger.Error("error closing index file", zap.Error(err)) + log.Error("Error closing index file", zap.Error(err)) return } @@ -937,7 +938,7 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch file := NewIndexFile(i.sfile) file.SetPath(path) if err := file.Open(); err != nil { - logger.Error("cannot open new index file", zap.Error(err)) + log.Error("Cannot open new index file", zap.Error(err)) return } @@ -958,14 +959,14 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch i.manifestSize = manifestSize return nil }(); err != nil { - logger.Error("cannot write manifest", zap.Error(err)) + log.Error("Cannot write manifest", zap.Error(err)) return } elapsed := time.Since(start) - logger.Info("full compaction complete", + log.Info("Full compaction complete", zap.String("path", path), - zap.String("elapsed", elapsed.String()), + logger.DurationLiteral("elapsed", elapsed), zap.Int64("bytes", n), zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024), ) @@ -975,13 +976,13 @@ func (i *Partition) compactToLevel(files []*IndexFile, level int, interrupt <-ch // Close and delete all old index files. for _, f := range files { - logger.Info("removing index file", zap.String("path", f.Path())) + log.Info("Removing index file", zap.String("path", f.Path())) if err := f.Close(); err != nil { - logger.Error("cannot close index file", zap.Error(err)) + log.Error("Cannot close index file", zap.Error(err)) return } else if err := os.Remove(f.Path()); err != nil { - logger.Error("cannot remove index file", zap.Error(err)) + log.Error("Cannot remove index file", zap.Error(err)) return } } @@ -1048,16 +1049,14 @@ func (i *Partition) compactLogFile(logFile *LogFile) { assert(id != 0, "cannot parse log file id: %s", logFile.Path()) // Build a logger for this compaction. - logger := i.logger.With( - zap.String("token", generateCompactionToken()), - zap.Int("id", id), - ) + log, logEnd := logger.NewOperation(i.logger, "TSI log compaction", "index.tsi.compact_log_file", zap.Int("log_file_id", id)) + defer logEnd() // Create new index file. path := filepath.Join(i.path, FormatIndexFileName(id, 1)) f, err := os.Create(path) if err != nil { - logger.Error("cannot create index file", zap.Error(err)) + log.Error("Cannot create index file", zap.Error(err)) return } defer f.Close() @@ -1066,13 +1065,13 @@ func (i *Partition) compactLogFile(logFile *LogFile) { lvl := i.levels[1] n, err := logFile.CompactTo(f, lvl.M, lvl.K, interrupt) if err != nil { - logger.Error("cannot compact log file", zap.Error(err), zap.String("path", logFile.Path())) + log.Error("Cannot compact log file", zap.Error(err), zap.String("path", logFile.Path())) return } // Close file. if err := f.Close(); err != nil { - logger.Error("cannot close log file", zap.Error(err)) + log.Error("Cannot close log file", zap.Error(err)) return } @@ -1080,7 +1079,7 @@ func (i *Partition) compactLogFile(logFile *LogFile) { file := NewIndexFile(i.sfile) file.SetPath(path) if err := file.Open(); err != nil { - logger.Error("cannot open compacted index file", zap.Error(err), zap.String("path", file.Path())) + log.Error("Cannot open compacted index file", zap.Error(err), zap.String("path", file.Path())) return } @@ -1102,23 +1101,23 @@ func (i *Partition) compactLogFile(logFile *LogFile) { i.manifestSize = manifestSize return nil }(); err != nil { - logger.Error("cannot update manifest", zap.Error(err)) + log.Error("Cannot update manifest", zap.Error(err)) return } elapsed := time.Since(start) - logger.Info("log file compacted", - zap.String("elapsed", elapsed.String()), + log.Info("Log file compacted", + logger.DurationLiteral("elapsed", elapsed), zap.Int64("bytes", n), zap.Int("kb_per_sec", int(float64(n)/elapsed.Seconds())/1024), ) // Closing the log file will automatically wait until the ref count is zero. if err := logFile.Close(); err != nil { - logger.Error("cannot close log file", zap.Error(err)) + log.Error("Cannot close log file", zap.Error(err)) return } else if err := os.Remove(logFile.Path()); err != nil { - logger.Error("cannot remove log file", zap.Error(err)) + log.Error("Cannot remove log file", zap.Error(err)) return } } @@ -1277,11 +1276,3 @@ const MaxIndexMergeCount = 2 // MaxIndexFileSize is the maximum expected size of an index file. const MaxIndexFileSize = 4 * (1 << 30) - -// generateCompactionToken returns a short token to track an individual compaction. -// It is only used for logging so it doesn't need strong uniqueness guarantees. -func generateCompactionToken() string { - token := make([]byte, 3) - rand.Read(token) - return fmt.Sprintf("%x", token) -} diff --git a/tsdb/series_partition.go b/tsdb/series_partition.go index bebfd88e393..97c01ec6f12 100644 --- a/tsdb/series_partition.go +++ b/tsdb/series_partition.go @@ -8,8 +8,8 @@ import ( "os" "path/filepath" "sync" - "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/models" "github.com/influxdata/influxdb/pkg/rhh" "go.uber.org/zap" @@ -258,10 +258,8 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio // Check if we've crossed the compaction threshold. if p.compactionsEnabled() && !p.compacting && p.CompactThreshold != 0 && p.index.InMemCount() >= uint64(p.CompactThreshold) { p.compacting = true - logger := p.Logger.With(zap.String("path", p.path)) - logger.Info("beginning series partition compaction") + log, logEnd := logger.NewOperation(p.Logger, "Series partition compaction", "series_partition.compaction", zap.String("path", p.path)) - startTime := time.Now() p.wg.Add(1) go func() { defer p.wg.Done() @@ -269,10 +267,10 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio compactor := NewSeriesPartitionCompactor() compactor.cancel = p.closing if err := compactor.Compact(p); err != nil { - logger.With(zap.Error(err)).Error("series partition compaction failed") + log.Error("series partition compaction failed", zap.Error(err)) } - logger.With(zap.Duration("elapsed", time.Since(startTime))).Info("completed series partition compaction") + logEnd() // Clear compaction flag. p.mu.Lock() diff --git a/tsdb/store.go b/tsdb/store.go index f031fe81a98..0cde5e8c1e0 100644 --- a/tsdb/store.go +++ b/tsdb/store.go @@ -15,6 +15,7 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/logger" "github.com/influxdata/influxdb/pkg/estimator/hll" "github.com/influxdata/influxdb/models" @@ -203,6 +204,9 @@ func (s *Store) loadShards() error { s.Logger.Info("Compaction throughput limit disabled") } + log, logEnd := logger.NewOperation(s.Logger, "Open store", "tsdb.open") + defer logEnd() + t := limiter.NewFixed(runtime.GOMAXPROCS(0)) resC := make(chan *res) var n int @@ -214,8 +218,9 @@ func (s *Store) loadShards() error { } for _, db := range dbDirs { + dbPath := filepath.Join(s.path, db.Name()) if !db.IsDir() { - s.Logger.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) + log.Info("Skipping database dir", zap.String("name", db.Name()), zap.String("reason", "not a directory")) continue } @@ -232,14 +237,15 @@ func (s *Store) loadShards() error { } // Load each retention policy within the database directory. - rpDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name())) + rpDirs, err := ioutil.ReadDir(dbPath) if err != nil { return err } for _, rp := range rpDirs { + rpPath := filepath.Join(s.path, db.Name(), rp.Name()) if !rp.IsDir() { - s.Logger.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory")) + log.Info("Skipping retention policy dir", zap.String("name", rp.Name()), zap.String("reason", "not a directory")) continue } @@ -248,7 +254,7 @@ func (s *Store) loadShards() error { continue } - shardDirs, err := ioutil.ReadDir(filepath.Join(s.path, db.Name(), rp.Name())) + shardDirs, err := ioutil.ReadDir(rpPath) if err != nil { return err } @@ -266,6 +272,7 @@ func (s *Store) loadShards() error { // Shard file names are numeric shardIDs shardID, err := strconv.ParseUint(sh, 10, 64) if err != nil { + log.Info("invalid shard ID found at path", zap.String("path", path)) resC <- &res{err: fmt.Errorf("%s is not a valid ID. Skipping shard.", sh)} return } @@ -291,12 +298,13 @@ func (s *Store) loadShards() error { err = shard.Open() if err != nil { + log.Info("Failed to open shard", logger.Shard(shardID), zap.Error(err)) resC <- &res{err: fmt.Errorf("Failed to open shard: %d: %s", shardID, err)} return } resC <- &res{s: shard} - s.Logger.Info("Opened shard", zap.String("path", path), zap.Duration("duration", time.Since(start))) + log.Info("Opened shard", zap.String("path", path), zap.Duration("duration", time.Since(start))) }(db.Name(), rp.Name(), sh.Name()) } } @@ -307,7 +315,6 @@ func (s *Store) loadShards() error { for i := 0; i < n; i++ { res := <-resC if res.err != nil { - s.Logger.Info(res.err.Error()) continue } s.shards[res.s.id] = res.s @@ -1764,7 +1771,7 @@ func (s *Store) monitorShards() { zap.String("perc", fmt.Sprintf("%d%%", perc)), zap.Int("n", n), zap.Int("max", s.EngineOptions.Config.MaxValuesPerTag), - zap.String("db", db), + logger.Database(db), zap.ByteString("measurement", name), zap.ByteString("tag", k)) }