Skip to content

Commit

Permalink
Merge pull request #37 from Juniper/flat-formatting
Browse files Browse the repository at this point in the history
added csv stats knob
  • Loading branch information
M-Vivek-Juniper authored May 31, 2024
2 parents cddec2a + 74e280b commit 7e291a8
Show file tree
Hide file tree
Showing 16 changed files with 319 additions and 1,577 deletions.
1,478 changes: 0 additions & 1,478 deletions coverage.out

This file was deleted.

4 changes: 4 additions & 0 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func getGPRCDialOptions(jctx *JCtx, vendor *vendor) ([]grpc.DialOption, error) {

if *stateHandler {
opts = append(opts, grpc.WithStatsHandler(&statshandler{jctx: jctx}))
if isCsvStatsEnabled(jctx) {
jctx.config.InternalJtimon.csvLogger.Printf(fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
"sensor-path", "sequence-number", "component-id", "sub-component-id", "packet-size", "p-ts", "e-ts", "re-stream-creation-ts", "re-payload-get-ts"))
}
}

switch *compression {
Expand Down
36 changes: 12 additions & 24 deletions internal_jtimon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,24 @@ package main
import (
"encoding/json"
"fmt"
gnmi "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
"log"
"os"
"regexp"
"strings"

gnmi "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
)

// InternalJtimonConfig type
type InternalJtimonConfig struct {
DataLog string `json:"data-log-file"`
CsvLog string `json:"csv-log-file"`
out *os.File
preGnmiOut *os.File
csvOut *os.File
logger *log.Logger
preGnmiLogger *log.Logger
}

type InternalJtimonPathElem struct {
Name string `json:"name"`
}

type InternalJtimonPath struct {
Elems []InternalJtimonPathElem `json:"elem"`
}

type InternalJtimonVal struct {
StringVal string `json:"string_val"`
}

type InternalJtimonUpdate struct {
Path InternalJtimonPath `json:"path"`
Val InternalJtimonVal `json:"val"`
csvLogger *log.Logger
}

func internalJtimonLogInit(jctx *JCtx) {
Expand Down Expand Up @@ -77,6 +62,10 @@ func internalJtimonLogInit(jctx *JCtx) {
log.Printf("logging in %s_pre-gnmi for %s:%d [in the format of internal jtimon tool]\n",
jctx.config.InternalJtimon.DataLog, jctx.config.Host, jctx.config.Port)
}

if *stateHandler && jctx.config.InternalJtimon.CsvLog != "" {
csvStatsLogInit(jctx)
}
}

func internalJtimonLogStop(jctx *JCtx) {
Expand All @@ -90,6 +79,9 @@ func internalJtimonLogStop(jctx *JCtx) {
jctx.config.InternalJtimon.preGnmiOut = nil
jctx.config.InternalJtimon.preGnmiLogger = nil
}
if *stateHandler && jctx.config.InternalJtimon.CsvLog != "" {
csvStatsLogStop(jctx)
}
}

func isInternalJtimonLogging(jctx *JCtx) bool {
Expand Down Expand Up @@ -228,7 +220,3 @@ func jLogInternalJtimonForPreGnmi(jctx *JCtx, ocdata *na_pb.OpenConfigData, outS
// Log here in the format of internal jtimon
jctx.config.InternalJtimon.preGnmiLogger.Printf("%s", outString)
}

func jLogUpdateOnChange(jctx *JCtx, kv map[string]string) {
return
}
162 changes: 162 additions & 0 deletions statshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,16 @@ package main

import (
"fmt"
gnmi_ext1 "github.com/Juniper/jtimon/gnmi/gnmi_ext"
gnmi_juniper_header_ext "github.com/Juniper/jtimon/gnmi/gnmi_juniper_header_ext"
"log"
"os"
"sync"
"time"

gnmi_pb "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
proto "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/stats"
)
Expand All @@ -20,6 +26,20 @@ type statsCtx struct {
totalInHeaderWireLength uint64
}

type kpiStats struct {
SensorName string
Path string
Streamed_path string
Component string
SequenceNumber uint64
ComponentId uint32
SubComponentId uint32
Timestamp uint64
notif_timestamp int64
re_stream_creation_timestamp uint64
re_payload_get_timestamp uint64
}

type statshandler struct {
jctx *JCtx
}
Expand Down Expand Up @@ -52,12 +72,111 @@ func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
case *stats.InPayload:
h.jctx.stats.totalInPayloadLength += uint64(s.(*stats.InPayload).Length)
h.jctx.stats.totalInPayloadWireLength += uint64(s.(*stats.InPayload).WireLength)
if *stateHandler && h.jctx.config.InternalJtimon.CsvLog != "" {
switch v := (s.(*stats.InPayload).Payload).(type) {
case *na_pb.OpenConfigData:
updateStats(h.jctx, v, false)
for idx, kv := range v.Kv {
updateStatsKV(h.jctx, false, 0)
switch kvvalue := kv.Value.(type) {
case *na_pb.KeyValue_UintValue:
if kv.Key == "__timestamp__" {
var re_c_ts uint64 = 0
var re_p_get_ts uint64 = 0
if len(v.Kv) > idx+2 {
nextKV := v.Kv[idx+1]
if nextKV.Key == "__junos_re_stream_creation_timestamp__" {
re_c_ts = nextKV.GetUintValue()
}
nextnextKV := v.Kv[idx+2]
if nextnextKV.Key == "__junos_re_payload_get_timestamp__" {
re_p_get_ts = nextnextKV.GetUintValue()
}
}

//"sensor-path", "sequence-number", "component-id", "sub-component-id", "packet-size", "p-ts", "e-ts", "re-stream-creation-ts", "re-payload-get-ts"))
h.jctx.config.InternalJtimon.csvLogger.Printf(
fmt.Sprintf("%s,%d,%d,%d,%d,%d,%d,%d,%d\n",
v.Path, v.SequenceNumber, v.ComponentId, v.SubComponentId, s.(*stats.InPayload).Length, v.Timestamp, kvvalue.UintValue, re_c_ts, re_p_get_ts))
}
}
}
case *gnmi_pb.SubscribeResponse:
stat := h.getKPIStats(v)
if stat != nil && stat.Timestamp != 0 {
path := stat.SensorName + ":" + stat.Streamed_path + ":" + stat.Path + ":" + stat.Component
h.jctx.config.InternalJtimon.csvLogger.Printf(
fmt.Sprintf("%s,%d,%d,%d,%d,%d,%d,%d,%d\n",
path, stat.SequenceNumber, stat.ComponentId, stat.SubComponentId,
s.(*stats.InPayload).Length, stat.notif_timestamp, int64(stat.Timestamp*uint64(1000000)),
int64(stat.re_stream_creation_timestamp*uint64(1000000)),
int64(stat.re_payload_get_timestamp*uint64(1000000)),
),
)
}
}
}
case *stats.InTrailer:
case *stats.End:
default:
}
}

func (h *statshandler) getKPIStats(subResponse *gnmi_pb.SubscribeResponse) *kpiStats {
var jHdrPresent bool
stats := new(kpiStats)
notfn := subResponse.GetUpdate()
if notfn == nil {
return nil
}
stats.notif_timestamp = notfn.Timestamp
extns := subResponse.GetExtension()

if extns != nil {
var extIds []gnmi_ext1.ExtensionID
for _, ext := range extns {
regExtn := ext.GetRegisteredExt()
if (regExtn.GetId()) != gnmi_ext1.ExtensionID_EID_JUNIPER_TELEMETRY_HEADER {
extIds = append(extIds, regExtn.GetId())
continue
}

jHdrPresent = true
var hdr gnmi_juniper_header_ext.GnmiJuniperTelemetryHeaderExtension
msg := regExtn.GetMsg()
err := proto.Unmarshal(msg, &hdr)
if err != nil {
log.Fatal("unmarshaling error: ", err)
}

stats.ComponentId = hdr.ComponentId
stats.SequenceNumber = hdr.SequenceNumber
stats.Path = hdr.SubscribedPath
stats.SubComponentId = hdr.SubComponentId
stats.Component = hdr.Component
stats.Streamed_path = hdr.StreamedPath
stats.SensorName = hdr.SensorName

if hdr.ExportTimestamp > 0 {
stats.Timestamp = uint64(hdr.ExportTimestamp)
}
if hdr.PayloadGetTimestamp > 0 {
stats.re_payload_get_timestamp = uint64(hdr.PayloadGetTimestamp)
}
if hdr.StreamCreationTimestamp > 0 {
stats.re_stream_creation_timestamp = uint64(hdr.StreamCreationTimestamp)
}
break
}
if !jHdrPresent {
jLog(h.jctx, fmt.Sprintf(
"Juniper header extension not present, available extensions: %v", extIds))
}
}
return stats

}

func updateStats(jctx *JCtx, ocData *na_pb.OpenConfigData, needLock bool) {
if !*stateHandler {
return
Expand Down Expand Up @@ -146,3 +265,46 @@ func printSummary(jctx *JCtx) {
s += fmt.Sprintf("\n")
jLog(jctx, fmt.Sprintf("\n%s\n", s))
}

func isCsvStatsEnabled(jctx *JCtx) bool {
if *stateHandler && jctx.config.InternalJtimon.CsvLog != "" {
return true
}
return false
}

func csvStatsLogInit(jctx *JCtx) {
if !*stateHandler && jctx.config.InternalJtimon.CsvLog == "" {
return
}
var out *os.File
var err error

csvStatsFile := "csv-stats.csv"
if jctx.config.InternalJtimon.CsvLog == "" {
jctx.config.InternalJtimon.CsvLog = csvStatsFile
}

out, err = os.OpenFile(jctx.config.InternalJtimon.CsvLog, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
log.Printf("Could not create csv stats file(%s): %v\n", csvStatsFile, err)
}

if out != nil {
flags := 0

jctx.config.InternalJtimon.csvLogger = log.New(out, "", flags)
jctx.config.InternalJtimon.csvOut = out

log.Printf("Writing stats in %s for %s:%d [in csv format]\n",
jctx.config.InternalJtimon.CsvLog, jctx.config.Host, jctx.config.Port)
}
}

func csvStatsLogStop(jctx *JCtx) {
if jctx.config.InternalJtimon.csvOut != nil {
jctx.config.InternalJtimon.csvOut.Close()
jctx.config.InternalJtimon.csvOut = nil
jctx.config.InternalJtimon.csvLogger = nil
}
}
31 changes: 18 additions & 13 deletions subscribe_gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -515,23 +515,28 @@ func subscribegNMI(conn *grpc.ClientConn, jctx *JCtx, cfg Config, paths []PathsC
// 3. Receive rsp
go func() {
var (
rsp *gnmi.SubscribeResponse
rsp *gnmi.SubscribeResponse
err1 error
)

jLog(jctx, fmt.Sprintf("gNMI host: %v, receiving data..", hostname))
for {
rsp, err = gNMISubHandle.Recv()
if err == io.EOF {
rsp, err1 = gNMISubHandle.Recv()
if err1 == io.EOF {
printSummary(jctx)
jLog(jctx, fmt.Sprintf("gNMI host: %v, received eof", hostname))
datach <- SubRcConnRetry
return
}

if err != nil {
jLog(jctx, fmt.Sprintf("gNMI host: %v, receive response failed: %v", hostname, err))
sc, _ := status.FromError(err)

if err1 != nil {
jLog(jctx, fmt.Sprintf("gNMI host: %v, receive response failed: %v", hostname, err1))
sc, sErr := status.FromError(err)
if !sErr {
jLog(jctx, fmt.Sprintf("Failed to retrieve status from error: %v", sErr))
datach <- SubRcConnRetry
return
}
/*
* Unavailable is just a cover-up for JUNOS, ideally the device is expected to return:
* 1. Unimplemented if RPC is not available yet
Expand All @@ -547,16 +552,16 @@ func subscribegNMI(conn *grpc.ClientConn, jctx *JCtx, cfg Config, paths []PathsC
}

if *noppgoroutines {
err = gnmiHandleResponse(jctx, rsp)
if err != nil && strings.Contains(err.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, err))
gnmiErr := gnmiHandleResponse(jctx, rsp)
if gnmiErr != nil && strings.Contains(gnmiErr.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, gnmiErr))
continue
}
} else {
go func() {
err = gnmiHandleResponse(jctx, rsp)
if err != nil && strings.Contains(err.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, err))
gnmiErr1 := gnmiHandleResponse(jctx, rsp)
if gnmiErr1 != nil && strings.Contains(gnmiErr1.Error(), gGnmiJtimonIgnoreErrorSubstr) {
jLog(jctx, fmt.Sprintf("gNMI host: %v, parsing response failed: %v", hostname, gnmiErr1))
}
}()
}
Expand Down
5 changes: 5 additions & 0 deletions tests/data/cisco-ios-xr/config/xr-all-influx.log
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ Running config of JTIMON:
"write-per-measurement": false
},
"kafka": null,
"internal-jtimon": {
"data-log-file": "",
"csv-log-file": "",
"csv-stats": false
},
"paths": [
{
"path": "SUB_JTIMON_ALL",
Expand Down
5 changes: 5 additions & 0 deletions tests/data/cisco-ios-xr/config/xr-wdsysmon-influx.log
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ Running config of JTIMON:
"write-per-measurement": false
},
"kafka": null,
"internal-jtimon": {
"data-log-file": "",
"csv-log-file": "",
"csv-stats": false
},
"paths": [
{
"path": "sub_wdsysmon-fd",
Expand Down
Loading

0 comments on commit 7e291a8

Please sign in to comment.