Skip to content

Commit

Permalink
Fix log data node
Browse files Browse the repository at this point in the history
  • Loading branch information
desa committed Sep 8, 2017
1 parent 5b921ea commit 245be10
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 24 deletions.
19 changes: 2 additions & 17 deletions log.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,29 +62,14 @@ func (n *LogNode) EndBatch(end edge.EndBatchMessage) (edge.Message, error) {
}

func (n *LogNode) BufferedBatch(batch edge.BufferedBatchMessage) (edge.Message, error) {
n.buf.Reset()
if err := n.enc.Encode(batch); err != nil {
n.incrementErrorCount()
n.diag.Error("failed to encode batch data", err)
return batch, nil
}
// TODO: fix prefix and other loger here
// dont log log this as the type of string it currently is
n.diag.LogData(n.key, "", n.buf.String())
n.diag.LogBatchData(n.key, "batch", batch)
return batch, nil
}

func (n *LogNode) Point(p edge.PointMessage) (edge.Message, error) {
n.buf.Reset()
if err := n.enc.Encode(p); err != nil {
n.incrementErrorCount()
n.diag.Error("failed to encode stream data", err)
return p, nil
}

// TODO: fix prefix and other loger here
// dont log log this as the type of string it currently is
n.diag.LogData(n.key, "Prefix", n.buf.String())
n.diag.LogPointData(n.key, "point", p)
return p, nil
}

Expand Down
3 changes: 2 additions & 1 deletion node.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ type NodeDiagnostic interface {
StartingBatchQuery(q string)

// LogNode
LogData(key, prefix, data string)
LogPointData(key, prefix string, data edge.PointMessage)
LogBatchData(key, prefix string, data edge.BufferedBatchMessage)

//UDF
UDFLog(s string)
Expand Down
64 changes: 58 additions & 6 deletions services/diagnostic/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,13 +229,65 @@ func (h *KapacitorHandler) StartingBatchQuery(q string) {
h.l.Debug("starting next batch query", zap.String("query", q))
}

func (h *KapacitorHandler) LogData(level string, prefix, data string) {
switch level {
case "info":
h.l.Info("listing data", zap.String("prefix", prefix), zap.String("data", data))
default:
func TagPairs(tags models.Tags) []string {
ts := []string{}
for k, v := range tags {
ts = append(ts, k+"="+v)
}

return ts
}

func FieldPairs(tags models.Fields) []string {
ts := []string{}
for k, v := range tags {
ts = append(ts, fmt.Sprintf("%s=%v", k, v))
}
h.l.Info("listing data", zap.String("prefix", prefix), zap.String("data", data))

return ts
}

// TODO: deal with prefix and level
func (h *KapacitorHandler) LogPointData(level, prefix string, point edge.PointMessage) {

h.l.Info(prefix,
zap.String("name", point.Name()),
zap.String("db", point.Database()),
zap.String("rp", point.RetentionPolicy()),
zap.String("group", string(point.GroupID())),
zap.Strings("dimensions", point.Dimensions().TagNames),
zap.Strings("tags", TagPairs(point.Tags())),
zap.Strings("fields", FieldPairs(point.Fields())),
zap.Time("time", point.Time()),
)
}

// TODO: deal with prefix and level
func (h *KapacitorHandler) LogBatchData(level, prefix string, batch edge.BufferedBatchMessage) {
begin := batch.Begin()
h.l.Info("Begin batch",
zap.String("name", begin.Name()),
zap.String("group", string(begin.GroupID())),
zap.Strings("tags", TagPairs(begin.Tags())),
zap.Time("time", begin.Time()),
)

for _, p := range batch.Points() {
h.l.Info("batch point",
zap.String("name", begin.Name()),
zap.String("group", string(begin.GroupID())),
zap.Strings("tags", TagPairs(p.Tags())),
zap.Strings("fields", FieldPairs(p.Fields())),
zap.Time("time", p.Time()),
)
}

h.l.Info("End batch",
zap.String("name", begin.Name()),
zap.String("group", string(begin.GroupID())),
zap.Strings("tags", TagPairs(begin.Tags())),
zap.Time("time", begin.Time()),
)
}

func (h *KapacitorHandler) UDFLog(s string) {
Expand Down

0 comments on commit 245be10

Please sign in to comment.