Skip to content

Commit

Permalink
feat(new source): Initial exec source (#6876)
Browse files Browse the repository at this point in the history
* feat(exec source): new `exec` source (#992) (#7)

Added exec source with streaming and scheduled capability.
Generates log events from stdout and stderr of a command execution.

Signed-off-by: Stuart Broad <stuart@moogsoft.com>
  • Loading branch information
moogstuart authored Apr 29, 2021
1 parent b11ac64 commit ffaa9e4
Show file tree
Hide file tree
Showing 12 changed files with 1,350 additions and 0 deletions.
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,7 @@ sources-logs = [
"sources-aws_s3",
"sources-datadog",
"sources-docker_logs",
"sources-exec",
"sources-file",
"sources-generator",
"sources-heroku_logs",
Expand Down Expand Up @@ -391,6 +392,7 @@ sources-aws_kinesis_firehose = ["base64", "sources-utils-tls", "warp"]
sources-aws_s3 = ["rusoto", "rusoto_s3", "rusoto_sqs", "semver", "uuid"]
sources-datadog = ["sources-utils-http"]
sources-docker_logs = ["bollard", "dirs-next"]
sources-exec = []
sources-file = ["bytesize", "file-source"]
sources-generator = ["fakedata"]
sources-heroku_logs = ["sources-utils-http"]
Expand Down
4 changes: 4 additions & 0 deletions docs/reference.cue
Original file line number Diff line number Diff line change
Expand Up @@ -383,13 +383,17 @@ _values: {

#TypeArray: {
_args: required: bool
_type: items: type: string
let Args = _args
let Type = _type

if !Args.required {
// `default` sets the default value.
default: [...] | null
}

examples?: [...[...Type.items.type]]

// Set `required` to `true` to force disable defaults. Defaults should
// be specified on the array level and not the type level.
items: type: #TypePrimitive & {_args: required: true}
Expand Down
227 changes: 227 additions & 0 deletions docs/reference/components/sources/exec.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package metadata

components: sources: exec: {
title: "Exec"

classes: {
commonly_used: false
delivery: "at_least_once"
deployment_roles: ["sidecar"]
development: "beta"
egress_method: "stream"
stateful: false
}

features: {
multiline: enabled: false
receive: {
from: {
service: services.exec
}

tls: enabled: false
}
}

support: {
targets: {
"aarch64-unknown-linux-gnu": true
"aarch64-unknown-linux-musl": true
"armv7-unknown-linux-gnueabihf": true
"armv7-unknown-linux-musleabihf": true
"x86_64-apple-darwin": true
"x86_64-pc-windows-msv": true
"x86_64-unknown-linux-gnu": true
"x86_64-unknown-linux-musl": true
}
requirements: []
warnings: []
notices: []
}

installation: {
platform_name: null
}

configuration: {
mode: {
description: "The type of exec mechanism."
required: true
type: string: {
enum: {
scheduled: "Scheduled exec mechanism."
streaming: "Streaming exec mechanism."
}
syntax: "literal"
}
}
command: {
required: true
description: "The command to be run, plus any arguments required."
type: array: {
examples: [["echo", "Hello World!"], ["ls", "-la"]]
items: type: string: {
syntax: "literal"
}
}
}
working_directory: {
common: false
required: false
description: "The directory in which to run the command."
warnings: []
type: string: {
default: null
syntax: "literal"
}
}
include_stderr: {
common: false
description: "Include the output of stderr when generating events."
required: false
type: bool: default: true
}
event_per_line: {
common: false
description: "Determine if events should be generated per line or buffered and output as a single event when script execution finishes."
required: false
type: bool: default: true
}
maximum_buffer_size_bytes: {
common: false
description: "The maximum buffer size allowed before a log event will be generated."
required: false
type: uint: {
default: 1000000
unit: "bytes"
}
}
scheduled: {
common: true
description: "The scheduled options."
required: false
warnings: []
type: object: {
examples: []
options: {
exec_interval_secs: {
common: true
description: "The interval in seconds between scheduled command runs. The command will be killed if it takes longer than exec_interval_secs to run."
relevant_when: "mode = `scheduled`"
required: false
type: uint: {
default: 60
unit: "seconds"
}
}
}
}
}
streaming: {
common: true
description: "The streaming options."
required: false
warnings: []
type: object: {
examples: []
options: {
respawn_on_exit: {
common: true
description: "Determine if a streaming command should be restarted if it exits."
relevant_when: "mode = `streaming`"
required: false
type: bool: default: true
}
respawn_interval_secs: {
common: false
description: "The interval in seconds between restarting streaming commands if needed."
relevant_when: "mode = `streaming`"
required: false
warnings: []
type: uint: {
default: 5
unit: "seconds"
}
}
}
}
}
}

output: logs: line: {
description: "An individual event from exec."
fields: {
host: fields._local_host
message: fields._raw_line
timestamp: fields._current_timestamp
data_stream: {
common: true
description: "The data stream from which the event originated."
required: false
type: string: {
examples: ["stdout", "stderr"]
default: null
syntax: "literal"
}
}
pid: {
description: "The process ID of the command."
required: true
type: uint: {
examples: [60085, 668]
unit: null
}
}
command: {
required: true
description: "The command that was run to generate this event."
type: array: {
items: type: string: {
examples: ["echo", "Hello World!", "ls", "-la"]
syntax: "literal"
}
}
}
}
}

examples: [
{
_line: "64 bytes from 127.0.0.1: icmp_seq=0 ttl=64 time=0.060 ms"
_timestamp: "2020-03-13T20:45:38.119Z"
title: "Exec line"
configuration: {}
input: """
```text
(_message)
```
"""
output: log: {
data_stream: "stdout"
pid: 5678
timestamp: _timestamp
host: _values.local_host
message: _line
}
},
]

how_it_works: {
line_delimiters: {
title: "Line Delimiters"
body: """
Each line is read until a new line delimiter, the `0xA` byte, is found or the end of the
maximum_buffer_size is reached.
"""
}
}

telemetry: metrics: {
events_in_total: components.sources.internal_metrics.output.metrics.events_in_total
processed_bytes_total: components.sources.internal_metrics.output.metrics.processed_bytes_total
processed_events_total: components.sources.internal_metrics.output.metrics.processed_events_total
processing_errors_total: components.sources.internal_metrics.output.metrics.processing_errors_total
command_executed_total: components.sources.internal_metrics.output.metrics.command_executed_total
command_execution_duration_ns: components.sources.internal_metrics.output.metrics.command_execution_duration_ns
}
}
12 changes: 12 additions & 0 deletions docs/reference/components/sources/internal_metrics.cue
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,18 @@ components: sources: internal_metrics: {
default_namespace: "vector"
tags: _internal_metrics_tags
}
command_executed_total: {
description: "The total number of times a command has been executed."
type: "counter"
default_namespace: "vector"
tags: _component_tags
}
command_execution_duration_ns: {
description: "The command execution duration in nanoseconds."
type: "histogram"
default_namespace: "vector"
tags: _component_tags
}
communication_errors_total: {
description: "The total number of errors stemming from communication with the Docker daemon."
type: "counter"
Expand Down
8 changes: 8 additions & 0 deletions docs/reference/services/exec.cue
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package metadata

services: exec: {
name: "Exec"
thing: "the \(name) stream"
url: urls.exec
versions: null
}
1 change: 1 addition & 0 deletions docs/reference/urls.cue
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,7 @@ urls: {
statsd_udp_protocol: "\(github)/b/statsd_spec"
stderr: "\(wikipedia)/wiki/Standard_streams#Standard_error_(stderr)"
stdin: "\(wikipedia)/wiki/Standard_streams#Standard_input_(stdin)"
exec: "\(wikipedia)/wiki/Exec_(system_call)"
stdout: "\(wikipedia)/wiki/Standard_streams#Standard_output_(stdout)"
stripe_blog_canonical_log_lines: "https://stripe.com/blog/canonical-log-lines"
strptime_specifiers: "https://docs.rs/chrono/latest/chrono/format/strftime/index.html#specifiers"
Expand Down
78 changes: 78 additions & 0 deletions src/async_read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,81 @@ where
}
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::shutdown::ShutdownSignal;
use crate::test_util::temp_file;
use futures::FutureExt;
use tokio::fs::{remove_file, File};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};

#[tokio::test]
async fn test_read_line_without_shutdown() {
let shutdown = ShutdownSignal::noop();
let temp_path = temp_file();
let write_file = File::create(temp_path.clone()).await.unwrap();
let read_file = File::open(temp_path.clone()).await.unwrap();

// Wrapper AsyncRead
let read_file = read_file.allow_read_until(shutdown.clone().map(|_| ()));

let mut reader = BufReader::new(read_file);
let mut writer = BufWriter::new(write_file);

writer.write_all("First line\n".as_bytes()).await.unwrap();
writer.flush().await.unwrap();

// Test one of the AsyncBufRead extension functions
let mut line_one = String::new();
let _ = reader.read_line(&mut line_one).await;

assert_eq!("First line\n", line_one);

writer.write_all("Second line\n".as_bytes()).await.unwrap();
writer.flush().await.unwrap();

let mut line_two = String::new();
let _ = reader.read_line(&mut line_two).await;

assert_eq!("Second line\n", line_two);

remove_file(temp_path).await.unwrap();
}

#[tokio::test]
async fn test_read_line_with_shutdown() {
let (trigger_shutdown, shutdown, _) = ShutdownSignal::new_wired();
let temp_path = temp_file();
let write_file = File::create(temp_path.clone()).await.unwrap();
let read_file = File::open(temp_path.clone()).await.unwrap();

// Wrapper AsyncRead
let read_file = read_file.allow_read_until(shutdown.clone().map(|_| ()));

let mut reader = BufReader::new(read_file);
let mut writer = BufWriter::new(write_file);

writer.write_all("First line\n".as_bytes()).await.unwrap();
writer.flush().await.unwrap();

// Test one of the AsyncBufRead extension functions
let mut line_one = String::new();
let _ = reader.read_line(&mut line_one).await;

assert_eq!("First line\n", line_one);

drop(trigger_shutdown);

writer.write_all("Second line\n".as_bytes()).await.unwrap();
writer.flush().await.unwrap();

let mut line_two = String::new();
let _ = reader.read_line(&mut line_two).await;

assert_eq!("", line_two);

remove_file(temp_path).await.unwrap();
}
}
Loading

0 comments on commit ffaa9e4

Please sign in to comment.