-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added a new Telegraf input for events and session-records (t128_tank) #58
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this compile? I see some things that I question. It's pretty difficult to review incremental changes like this with no context about what is expected to work and what isn't or about what has and has yet to be addressed.
WAN-2371 #time 7h
6a14e84
to
4b0627a
Compare
WAN-2371 #time 2h
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lgtm! need @gregschrock's approval before merge though!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
close
WAN-2371 #time 20m
WAN-2371 #time 30m
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
would still like to see unit tests for this input
plugins/inputs/t128_tank/reader.go
Outdated
var errBoundaryFault *boundaryFault | ||
if err != nil && errors.As(err, &errBoundaryFault) { | ||
r.log.Debugf("detected boundary fault, restarting %s tank read from index %d", r.topic, errBoundaryFault.nextAvailableIndex.value) | ||
go r.read(mainCtx, readCtx, errBoundaryFault.nextAvailableIndex) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I missed it earlier. read
shouldn't take two contexts. Since the read context is created from the main context (readCtx, readCtxCancel := context.WithCancel(mainCtx)
), canceling the main context will also cancel the read context. So you don't need to pass both.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh actually missed this just now!! didn't realize this while pushing! thanks for catching.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
P.S. working on unit tests!
plugins/inputs/t128_tank/reader.go
Outdated
} | ||
} | ||
|
||
err = os.WriteFile(indexPath, []byte(index.string()), 0644) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use 0600
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
want to keep it accessible by owner I am assuming?
plugins/inputs/t128_tank/reader.go
Outdated
var collectedMessages []IndexedMessage | ||
for _, message := range messages { | ||
if message.IsValid() { | ||
collectedMessages = append(collectedMessages, *message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not great that we're copying all these messages here. Can we drop invalid messages inside parseLines
and just pass everything back to sendChan
here?
@@ -143,6 +144,12 @@ def parse_args(): | |||
action="store_true", | |||
) | |||
|
|||
build_parser.add_argument( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the clean
flag could be used for this purpose too. nbd though
WAN-2371 #time 30m
plugins/inputs/t128_tank/reader.go
Outdated
} | ||
|
||
func (r *Reader) read(readCtx context.Context, startingIndex index) { | ||
r.log.Errorf("starting %s tank read from index %d", r.topic, startingIndex.value) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't an error log and will be hit a lot. I'd say this is a debug log
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch; maybe useful as an info log? I'm not sure
err := os.WriteFile(plugin.IndexFile, []byte(testcase.IndexFileContent), 0755) | ||
assert.NoError(t, err) | ||
} | ||
reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, testcase.DefaultIndex, testutil.Logger{}, &acc).withTankReadCommandContext(testcase.TankReadCommandContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reformat this line
NewReader(
plugin.ServerAddress,
plugin.PortNumber,
plugin.Topic,
plugin.IndexFile,
testcase.DefaultIndex,
testutil.Logger{},
&acc,
).withTankReadCommandContext(testcase.TankReadCommandContext)
select { | ||
case <-time.After(5 * time.Second): | ||
t.Log("no messages received") | ||
case <-ctx.Done(): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This isn't right. Nothing cancels this context, so you'll always end up waiting for 5 seconds.
err := os.WriteFile(plugin.IndexFile, []byte(testcase.IndexFileContent), 0755) | ||
assert.NoError(t, err) | ||
} | ||
reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, testcase.DefaultIndex, testutil.Logger{}, &acc).withTankReadCommandContext(testcase.TankReadCommandContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
reformat
}{ | ||
{ | ||
Name: "input-message-with-newline", | ||
InputMessage: "seq=8:type=LINK_UP, Timestamp=2022-03-25T00:00:00Z,Raw=hello\nworld", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is fine for the test I guess, but it's not actually a valid Influx line
} | ||
reader := NewReader(plugin.ServerAddress, plugin.PortNumber, plugin.Topic, plugin.IndexFile, testcase.DefaultIndex, testutil.Logger{}, &acc).withTankReadCommandContext(testcase.TankReadCommandContext) | ||
var receivedErrorMessage string | ||
go func() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why go routine? and why sleep?
WAN-2371 #time 1h
WAN-2371 #time 1m
f8e7cf1
into
feature/new-plugins-to-reduce-python-overhead
* Sqaushed commits WAN-2321 #time 10m * addressed pr comments and added a testcase WAN-2321 #time 10m * Added a new Telegraf input for LTE (#56) * Added a new Telegraf input for events and session-records (#58) * Updated T128_peer_path Tags (#64) * Updated t128_tank to parse the messages (#63) * added a parser to parse data and send as metrics WAN-2496 #time 30m * added a new config option and updated the logic WAN-2496 #time 30m * addressed more comments and simplified logic WAN-2496 #time 20m * addressed comments WAN-2496 #time 5m * updated the index-file config WAN-2496 #time 5m * fixed an issue with last index not being saved WAN-2496 #time 15m * addressed pr comments WAN-2496 #time 30m * added data precision WAN-2496 #time 30m * addressed more comments WAN-2496 #time 1h * addressed comments and test for lastobserved value WAN-2496 #time 20m * added testcase WAN-2496 #time 20m * addressed comments WAN-2496 #time 15m * Fix timestamp precision adjustments WAN-2496 #time 1h --------- Co-authored-by: Greg Schrock <gschrock@juniper.net> * Update README.md for lte-collector * Update README.md for peer path collector --------- Co-authored-by: Shriyansh Kothari <shriyanshk@juniper.net> Co-authored-by: Kaushik Agrawal <60372242+agrawalkaushik@users.noreply.github.com> Co-authored-by: Shriyansh Kothari <100544034+shriyanshk128T@users.noreply.github.com>
Description
In order to translate
session-records
andevents
to native telegraf we decided to create a new input telegraf calledt128_tank
Required for all PRs:
Command
Testing
Testcase 1 config & output
Testcase 2 config & output
Testcase 3 config & output
Testcase 4 config & output
Testcase 5
This is what the stdout file stopped at -
Testcase 6
Stopped at
Started at after restart
Testcase 7
Testcase for TagPass
config without tagpass
Output
config with tagpass
Output