Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added csv stats knob #37

Merged
merged 9 commits into from
May 31, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Config struct {
Influx InfluxConfig `json:"influx"`
Kafka *KafkaConfig `json:"kafka"`
InternalJtimon InternalJtimonConfig `json:"internal-jtimon"`
CsvStatsJtimon CsvStatsLogging `json:"csv-stats-log"`
Paths []PathsConfig `json:"paths"`
Log LogConfig `json:"log"`
Vendor VendorConfig `json:"vendor"`
Expand Down Expand Up @@ -320,9 +321,11 @@ func HandleConfigChange(jctx *JCtx, config Config, restart *bool) error {
config.Password = value // Revert back to decoded password
logStop(jctx)
internalJtimonLogStop(jctx)
csvStatsLogStop(jctx)
jctx.config = config
logInit(jctx)
internalJtimonLogInit(jctx)
csvStatsLogInit(jctx)
if restart != nil {
jLog(jctx, fmt.Sprintf("Restarting worker process to spawn new device connection for: %s", jctx.file))
*restart = true
Expand All @@ -348,6 +351,7 @@ func ConfigRead(jctx *JCtx, init bool, restart *bool) error {
jctx.config = config
logInit(jctx)
internalJtimonLogInit(jctx)
csvStatsLogInit(jctx)
b, err := json.MarshalIndent(jctx.config, "", " ")
if err != nil {
return fmt.Errorf("config parsing error (json marshal) for %s: %v", jctx.file, err)
Expand Down
2,423 changes: 1,260 additions & 1,163 deletions coverage.out

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ func getGPRCDialOptions(jctx *JCtx, vendor *vendor) ([]grpc.DialOption, error) {

if *stateHandler {
opts = append(opts, grpc.WithStatsHandler(&statshandler{jctx: jctx}))
if isCsvStatsEnabled(jctx) {
jctx.config.CsvStatsJtimon.logger.Printf(fmt.Sprintf("%s,%s,%s,%s,%s,%s,%s,%s,%s\n",
"sensor-path", "sequence-number", "component-id", "sub-component-id", "packet-size", "p-ts", "e-ts", "re-stream-creation-ts", "re-payload-get-ts"))
}
}

switch *compression {
Expand Down
26 changes: 2 additions & 24 deletions internal_jtimon.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@ package main
import (
"encoding/json"
"fmt"
gnmi "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
"log"
"os"
"regexp"
"strings"

gnmi "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
)

// InternalJtimonConfig type
Expand All @@ -21,23 +20,6 @@ type InternalJtimonConfig struct {
preGnmiLogger *log.Logger
}

type InternalJtimonPathElem struct {
Name string `json:"name"`
}

type InternalJtimonPath struct {
Elems []InternalJtimonPathElem `json:"elem"`
}

type InternalJtimonVal struct {
StringVal string `json:"string_val"`
}

type InternalJtimonUpdate struct {
Path InternalJtimonPath `json:"path"`
Val InternalJtimonVal `json:"val"`
}

func internalJtimonLogInit(jctx *JCtx) {
if jctx.config.InternalJtimon.DataLog == "" {
return
Expand Down Expand Up @@ -216,7 +198,3 @@ func jLogInternalJtimonForPreGnmi(jctx *JCtx, ocdata *na_pb.OpenConfigData, outS
// Log here in the format of internal jtimon
jctx.config.InternalJtimon.preGnmiLogger.Printf("%s", outString)
}

func jLogUpdateOnChange(jctx *JCtx, kv map[string]string) {
return
}
1 change: 1 addition & 0 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ var (
myCert = flag.String("cert", "./certs/self_signed/server-cert.pem", "Path of server cert")
myKey = flag.String("pem", "./certs/self_signed/server-key.pem", "Path of server key")
kafkaBroker = flag.String("kafka-broker", "kafka:9092", "Comma seperated list of Kafka brokers each in the form ip:port")
csvStats = flag.Bool("csv-stats", false, "Output telemetry data stats in CSV format")

jtimonVersion = "version-not-available"
buildTime = "build-time-not-available"
Expand Down
155 changes: 155 additions & 0 deletions statshandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@ package main

import (
"fmt"
gnmi_juniper_header_ext "github.com/Juniper/jtimon/gnmi/gnmi_juniper_header_ext"
"log"
"os"
"sync"
"time"

gnmi_pb "github.com/Juniper/jtimon/gnmi/gnmi"
na_pb "github.com/Juniper/jtimon/telemetry"
proto "github.com/golang/protobuf/proto"
"golang.org/x/net/context"
"google.golang.org/grpc/stats"
)
Expand All @@ -20,6 +25,27 @@ type statsCtx struct {
totalInHeaderWireLength uint64
}

type kpiStats struct {
SensorName string
Path string
Streamed_path string
Component string
SequenceNumber uint64
ComponentId uint32
SubComponentId uint32
Timestamp uint64
notif_timestamp int64
re_stream_creation_timestamp uint64
re_payload_get_timestamp uint64
}

// CsvStatsLogging type
type CsvStatsLogging struct {
CsvLog string `json:"csv-log-file"`
out *os.File
logger *log.Logger
}

type statshandler struct {
jctx *JCtx
}
Expand Down Expand Up @@ -52,12 +78,101 @@ func (h *statshandler) HandleRPC(ctx context.Context, s stats.RPCStats) {
case *stats.InPayload:
h.jctx.stats.totalInPayloadLength += uint64(s.(*stats.InPayload).Length)
h.jctx.stats.totalInPayloadWireLength += uint64(s.(*stats.InPayload).WireLength)
if *csvStats {
switch v := (s.(*stats.InPayload).Payload).(type) {
case *na_pb.OpenConfigData:
updateStats(h.jctx, v, false)
for idx, kv := range v.Kv {
updateStatsKV(h.jctx, false, 0)
switch kvvalue := kv.Value.(type) {
case *na_pb.KeyValue_UintValue:
if kv.Key == "__timestamp__" {
var re_c_ts uint64 = 0
var re_p_get_ts uint64 = 0
if len(v.Kv) > idx+2 {
nextKV := v.Kv[idx+1]
if nextKV.Key == "__junos_re_stream_creation_timestamp__" {
re_c_ts = nextKV.GetUintValue()
}
nextnextKV := v.Kv[idx+2]
if nextnextKV.Key == "__junos_re_payload_get_timestamp__" {
re_p_get_ts = nextnextKV.GetUintValue()
}
}

//"sensor-path", "sequence-number", "component-id", "sub-component-id", "packet-size", "p-ts", "e-ts", "re-stream-creation-ts", "re-payload-get-ts"))
h.jctx.config.CsvStatsJtimon.logger.Printf(
fmt.Sprintf("%s,%d,%d,%d,%d,%d,%d,%d,%d\n",
v.Path, v.SequenceNumber, v.ComponentId, v.SubComponentId, s.(*stats.InPayload).Length, v.Timestamp, kvvalue.UintValue, re_c_ts, re_p_get_ts))
}
}
}
case *gnmi_pb.SubscribeResponse:
stat := getKPIStats(v)
if stat != nil && stat.Timestamp != 0 {
path := stat.SensorName + ":" + stat.Streamed_path + ":" + stat.Path + ":" + stat.Component
h.jctx.config.CsvStatsJtimon.logger.Printf(
fmt.Sprintf("%s,%d,%d,%d,%d,%d,%d,%d,%d\n",
path, stat.SequenceNumber, stat.ComponentId, stat.SubComponentId,
s.(*stats.InPayload).Length, stat.notif_timestamp, int64(stat.Timestamp*uint64(1000000)),
int64(stat.re_stream_creation_timestamp*uint64(1000000)),
int64(stat.re_payload_get_timestamp*uint64(1000000)),
),
)
}
}
}
case *stats.InTrailer:
case *stats.End:
default:
}
}

func getKPIStats(subResponse *gnmi_pb.SubscribeResponse) *kpiStats {

stats := new(kpiStats)
notfn := subResponse.GetUpdate()
if notfn == nil {
return nil
}
stats.notif_timestamp = notfn.Timestamp
extns := subResponse.GetExtension()

if extns != nil {
extn := extns[0]
if extn != nil {
var hdr gnmi_juniper_header_ext.GnmiJuniperTelemetryHeaderExtension

reg_extn := extn.GetRegisteredExt()
msg := reg_extn.GetMsg()
err := proto.Unmarshal(msg, &hdr)
if err != nil {
log.Fatal("unmarshaling error: ", err)
}

stats.ComponentId = hdr.ComponentId
stats.SequenceNumber = hdr.SequenceNumber
stats.Path = hdr.SubscribedPath
stats.SubComponentId = hdr.SubComponentId
stats.Component = hdr.Component
stats.Streamed_path = hdr.StreamedPath
stats.SensorName = hdr.SensorName

if hdr.ExportTimestamp > 0 {
stats.Timestamp = uint64(hdr.ExportTimestamp)
}
if hdr.PayloadGetTimestamp > 0 {
stats.re_payload_get_timestamp = uint64(hdr.PayloadGetTimestamp)
}
if hdr.StreamCreationTimestamp > 0 {
stats.re_stream_creation_timestamp = uint64(hdr.StreamCreationTimestamp)
}
}
}
return stats

}

func updateStats(jctx *JCtx, ocData *na_pb.OpenConfigData, needLock bool) {
if !*stateHandler {
return
Expand Down Expand Up @@ -146,3 +261,43 @@ func printSummary(jctx *JCtx) {
s += fmt.Sprintf("\n")
jLog(jctx, fmt.Sprintf("\n%s\n", s))
}

func isCsvStatsEnabled(jctx *JCtx) bool {
return jctx.config.CsvStatsJtimon.logger != nil
}

func csvStatsLogInit(jctx *JCtx) {
if !*csvStats && jctx.config.CsvStatsJtimon.CsvLog == "" {
return
}
var out *os.File
var err error

csvStatsFile := "csv-stats.csv"
if jctx.config.CsvStatsJtimon.CsvLog == "" {
jctx.config.CsvStatsJtimon.CsvLog = csvStatsFile
}

out, err = os.OpenFile(csvStatsFile, os.O_CREATE|os.O_WRONLY|os.O_TRUNC, 0600)
if err != nil {
log.Printf("Could not create csv stats file(%s): %v\n", csvStatsFile, err)
}

if out != nil {
flags := 0

jctx.config.CsvStatsJtimon.logger = log.New(out, "", flags)
jctx.config.CsvStatsJtimon.out = out

log.Printf("Writing stats in %s for %s:%d [in csv format]\n",
jctx.config.CsvStatsJtimon.CsvLog, jctx.config.Host, jctx.config.Port)
}
}

func csvStatsLogStop(jctx *JCtx) {
if jctx.config.CsvStatsJtimon.out != nil {
jctx.config.CsvStatsJtimon.out.Close()
jctx.config.CsvStatsJtimon.out = nil
jctx.config.CsvStatsJtimon.logger = nil
}
}
8 changes: 6 additions & 2 deletions subscribe_gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -530,8 +530,12 @@ func subscribegNMI(conn *grpc.ClientConn, jctx *JCtx, cfg Config, paths []PathsC

if err != nil {
jLog(jctx, fmt.Sprintf("gNMI host: %v, receive response failed: %v", hostname, err))
sc, _ := status.FromError(err)

sc, sErr := status.FromError(err)
if !sErr {
jLog(jctx, fmt.Sprintf("Failed to retrieve status from error: %v", sErr))
datach <- SubRcConnRetry
return
}
/*
* Unavailable is just a cover-up for JUNOS, ideally the device is expected to return:
* 1. Unimplemented if RPC is not available yet
Expand Down
6 changes: 6 additions & 0 deletions tests/data/cisco-ios-xr/config/xr-all-influx.log
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ Running config of JTIMON:
"write-per-measurement": false
},
"kafka": null,
"internal-jtimon": {
"data-log-file": ""
},
"csv-stats-log": {
"csv-log-file": ""
},
"paths": [
{
"path": "SUB_JTIMON_ALL",
Expand Down
6 changes: 6 additions & 0 deletions tests/data/cisco-ios-xr/config/xr-wdsysmon-influx.log
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ Running config of JTIMON:
"write-per-measurement": false
},
"kafka": null,
"internal-jtimon": {
"data-log-file": ""
},
"csv-stats-log": {
"csv-log-file": ""
},
"paths": [
{
"path": "sub_wdsysmon-fd",
Expand Down
17 changes: 12 additions & 5 deletions tests/data/juniper-junos/config/jtisim-influx-alias.log
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,12 @@ Running config of JTIMON:
"write-per-measurement": false
},
"kafka": null,
"internal-jtimon": {
"data-log-file": ""
},
"csv-stats-log": {
"csv-log-file": ""
},
"paths": [
{
"path": "/interfaces",
Expand Down Expand Up @@ -75,6 +81,7 @@ invoking getInfluxClient
batch size: 102400 batch frequency: 2000
Accumulator frequency: 2000
Successfully initialized InfluxDB Client
127.0.0.1, jctx.config.Kafka.producer: <nil>
compression = none
Connecting to 127.0.0.1:50051
gRPC headers from host 127.0.0.1:50051
Expand All @@ -85,22 +92,22 @@ Receiving telemetry data from 127.0.0.1:50051
+------------------------------+--------------------+--------------------+--------------------+--------------------+
| Timestamp | KV | Packets | Bytes | Bytes(wire) |
+------------------------------+--------------------+--------------------+--------------------+--------------------+
| Wed Feb 14 14:19:54 PST 2024 | 1980 | 40 | 87418 | 87618 |
| Wed May 29 18:50:54 PDT 2024 | 1980 | 40 | 87418 | 87618 |

Batch processing: #packets:40 #points:40
Batch write successful! Post batch write available points: 0

| Wed Feb 14 14:19:56 PST 2024 | 1980 | 40 | 87418 | 87618 |
| Wed May 29 18:50:56 PDT 2024 | 1980 | 40 | 87418 | 87618 |


| Wed Feb 14 14:19:58 PST 2024 | 1980 | 40 | 87418 | 87618 |
| Wed May 29 18:50:58 PDT 2024 | 1980 | 40 | 87418 | 87618 |


| Wed Feb 14 14:20:00 PST 2024 | 1980 | 40 | 87418 | 87618 |
| Wed May 29 18:51:00 PDT 2024 | 1980 | 40 | 87418 | 87618 |



Collector Stats for 127.0.0.1:50051 (Run time : 8.006234208s)
Collector Stats for 127.0.0.1:50051 (Run time : 8.003391166s)
40 : in-packets
1980 : data points (KV pairs)
25 : in-header wirelength (bytes)
Expand Down
Loading