Skip to content

Commit

Permalink
V2 Control Protocol (#29)
Browse files Browse the repository at this point in the history
* Add proposed v2 protocol.

* Update elastic-agent-client.proto

Co-authored-by: Anderson Queiroz <contato@andersonq.eti.br>

* Update elastic-agent-client.proto

Co-authored-by: Anderson Queiroz <contato@andersonq.eti.br>

* Update elastic-agent-client.proto

Co-authored-by: Anderson Queiroz <contato@andersonq.eti.br>

* Update to CheckinV2. Split the new methods into seperate services. Finish metrics reporting.

* Fix some typos.

* Add version_info for initial check-in for v2. Add log service. Comment fixes.

* Update elastic-agent-client.proto

Co-authored-by: Anderson Queiroz <contato@andersonq.eti.br>

* Change payload of UnitObserved to bytes.

* Switch to log message bulk only.

* Add defined diagnostic action and result for the actions protocol.

* Remove metrics.

* Add all the client work.

* Add unit tests.

* Fix lint.

* More lint fixes.

* Changes from code review.

Co-authored-by: Anderson Queiroz <contato@andersonq.eti.br>
  • Loading branch information
blakerouse and AndersonQ authored May 24, 2022
1 parent 4803740 commit 43bacbe
Show file tree
Hide file tree
Showing 17 changed files with 6,172 additions and 235 deletions.
344 changes: 338 additions & 6 deletions elastic-agent-client.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,22 @@ package proto;

option cc_enable_arenas = true;
option go_package = "pkg/proto;proto";
import "google/protobuf/empty.proto";
import "google/protobuf/timestamp.proto";

// Services that the client is allowed to use over the connection.
enum ConnInfoServices {
// V1 checkin service.
Checkin = 0;
// V2 checkin service.
CheckinV2 = 1;
// Key-value store service.
Store = 2;
// Artifact store service.
Artifact = 3;
// Log service.
Log = 4;
}

// Connection information sent to the application on startup so it knows how to connected back to the Elastic Agent.
//
Expand All @@ -25,6 +41,8 @@ message ConnInfo {
bytes peer_cert = 5;
// Peer private key.
bytes peer_key = 6;
// Allowed services that spawned process can use. (only used in V2)
repeated ConnInfoServices services = 7;
}

// A status observed message is streamed from the application to Elastic Agent.
Expand All @@ -49,7 +67,7 @@ message StateObserved {
// Application is stopping.
STOPPING = 5;
}
// Token that is used to unique identify the application to agent. When agent started this
// Token that is used to uniquely identify the application to agent. When agent started this
// application it would have provided it this token.
string token = 1;
// Current index of the applied configuration.
Expand Down Expand Up @@ -83,12 +101,40 @@ message StateExpected {
// A action request is streamed from the Elastic Agent to the application so an action can be performed
// by the connected application.
message ActionRequest {
// Type of action being performed.
enum Type {
// Custom action (registered by the unit)
CUSTOM = 0;
// Defined diagnostics action.
DIAGNOSTICS = 1;
}
// Unique ID of the action.
string id = 1;
// Name of the action.
// Name of the action (name is ignored for DIAGNOSTICS).
string name = 2;
// JSON encoded parameters for the action.
bytes params = 3;
// Unique ID of the unit (only used with V2).
string unit_id = 4;
// Type of the unit (only used with V2).
UnitType unit_type = 5;
// Type of action to be performed (only used with V2).
Type type = 6;
}

message ActionDiagnosticUnitResult {
// Human readable name of the diagnostic result content.
string name = 1;
// Filename to use to store the diagnostic to the disk.
string filename = 2;
// Human readable description of the information this diagnostic provides.
string description = 3;
// Content-Type of the resulting content.
string content_type = 4;
// Actual file content.
bytes content = 5;
// Timestamp the content was generated at.
google.protobuf.Timestamp generated = 6;
}

// An action response is streamed from the application back to the Elastic Agent to provide a result to
Expand All @@ -101,26 +147,312 @@ message ActionResponse {
// Action has failed.
FAILED = 1;
}
// Token that is used to unique identify the application to agent. When agent started this
// Token that is used to uniquely identify the application to agent. When agent started this
// application it would have provided it this token.
string token = 1;
// Unique ID of the action.
string id = 2;
// Status of the action.
Status status = 3;
// JSON encoded result for the action.
// JSON encoded result for the action (empty when diagnostic action response).
bytes result = 4;
// Specific result for diagnostics action. (only used in V2)
repeated ActionDiagnosticUnitResult diagnostic = 5;
}

// State codes for the current state.
enum State {
STARTING = 0;
CONFIGURING = 1;
HEALTHY = 2;
DEGRADED = 3;
FAILED = 4;
STOPPING = 5;
STOPPED = 6;
}

// Type of unit.
enum UnitType {
INPUT = 0;
OUTPUT = 1;
}

// A unit that is part of a collector/shipper.
message UnitExpected {
// Unique ID of the unit.
string id = 1;
// Unit type.
UnitType type = 2;
// Expected state of the unit.
State state = 3;
// Index of the either current configuration or new configuration provided.
uint64 config_state_idx = 4;
// Resulting configuration. (If the application already has the current `config_state_idx` this
// will be empty.)
string config = 5;
}

// A set of units and their expected states and configuration.
message CheckinExpected {
repeated UnitExpected units = 1;
}

// Observed status for a unit.
//
// Contains the currently applied `config_state_idx` (0 in the case of initial start, 1 is the first
// applied config index) along with the status of the application. In the case that the sent `config_state_idx`
// doesn't match the expected `config_state_idx` that Elastic Agent expects, the unit is always marked as
// `CONFIGURING` and a new `UnitExpected` will be sent to so it can have the latest configuration.
message UnitObserved {
// Unique ID of the unit.
string id = 1;
// Unit type.
UnitType type = 2;
// Current index of the applied configuration.
uint64 config_state_idx = 3;
// State of unit.
State state = 4;
// Human readable message for the state of the unit.
// Exposed to users to provide more detail about the state for this single unit.
string message = 5;
// JSON encoded payload for the state.
bytes payload = 6;
}

// Observed version information for the running program.
message CheckinObservedVersionInfo {
// Name of the binary.
string name = 1;
// Version of the binary.
string version = 2;
// Additional metadata about the binary.
map<string, string> meta = 3;
}

// Observed statuses and configuration for defined units.
//
// In the case that a unit is missing from the observation then the Elastic Agent will mark that missing unit
// as `STARTING` and send a new `UnitExpected` for the missing unit.
message CheckinObserved {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Units observed state.
repeated UnitObserved units = 2;
// Version information about the running program. Should always be included on first checkin, and not again unless
// one of the values have changed.
optional CheckinObservedVersionInfo version_info = 3;
}

service ElasticAgent {
// Called by the client to provide the Elastic Agent the state of the application.
//
// A `StateObserved` must be streamed at least every 30 seconds or it will result in the
// application be automatically marked as FAILED, and after 60 seconds it will be force killed and
// restarted.
// application be automatically marked as FAILED, and after 60 seconds the Elastic Agent will
// force kill the entire process and restart it.
rpc Checkin(stream StateObserved) returns (stream StateExpected);

// Called by the client to provide the Elastic Agent the state of the application over the V2 protocol.
//
// A `CheckinObserved` must be streamed at least every 30 seconds or it will result in the
// set of units automatically marked as FAILED, and after 60 seconds the Elastic Agent will
// force kill the entire process and restart it.
rpc CheckinV2(stream CheckinObserved) returns (stream CheckinExpected);

// Called by the client on connection to the GRPC allowing the Elastic Agent to stream action
// requests to the application and the application stream back responses to those requests.
//
// Request and response is swapped here because the Elastic Agent sends the requests in a stream
// to the connected process. The order of response from the process does not matter, it is acceptable
// for the response order to be different then the request order.
rpc Actions(stream ActionResponse) returns (stream ActionRequest);
}

// Type of transaction to start.
enum StoreTxType {
READ_ONLY = 0;
READ_WRITE = 1;
}

// Begins a new transaction.
//
// A started transaction must either have commit or discard called.
message StoreBeginTxRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// ID of the unit.
string unit_id = 2;
// Type of the unit.
UnitType unit_type = 3;
// Type of transaction to start.
StoreTxType type = 4;
}

// Response for a started transaction.
message StoreBeginTxResponse {
// Transaction ID.
string id = 1;
}

// Gets a key from the store.
message StoreGetKeyRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
// Name of the key.
string name = 3;
}

// Response of the retrieved key.
message StoreGetKeyResponse {
// Status result of the get.
enum Status {
// Action was successful.
FOUND = 0;
// Action has failed.
NOT_FOUND = 1;
}
Status status = 1;
// Value when `FOUND`.
bytes value = 2;
}

// Sets a key into the store.
//
// `tx_id` must be an ID of a transaction that was started with `READ_WRITE`.
message StoreSetKeyRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
// Name of the key.
string name = 3;
// Value of the key.
bytes value = 4;
// TTL of the key (in milliseconds)
uint64 ttl = 5;
}

// Response from `SetKey`.
message StoreSetKeyResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

// Deletes a key in the store.
//
// `tx_id` must be an ID of a transaction that was started with `READ_WRITE`.
//
// Does not error in the case that a key does not exist.
message StoreDeleteKeyRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
// Name of the key.
string name = 3;
}

// Response from `DeleteKey`.
message StoreDeleteKeyResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

// Commits the transaction in the store.
//
// Upon error the whole transaction is discarded so no need to call discard after error.
message StoreCommitTxRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
}

// Response from `CommitTx`.
message StoreCommitTxResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

// Discards the transaction in the store.
message StoreDiscardTxRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Transaction ID.
string tx_id = 2;
}

// Response from `DiscardTx`.
message StoreDiscardTxResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

service ElasticAgentStore {
// Key-Value state storage is provided for each unit.
//
// Transactional store is provided to allow multiple key operations to occur before a commit to ensure consistent
// state when multiple keys make up the state of an units persistent state.
rpc BeginTx(StoreBeginTxRequest) returns (StoreBeginTxResponse);
rpc GetKey(StoreGetKeyRequest) returns (StoreGetKeyResponse);
rpc SetKey(StoreSetKeyRequest) returns (StoreSetKeyResponse);
rpc DeleteKey(StoreDeleteKeyRequest) returns (StoreDeleteKeyResponse);
rpc CommitTx(StoreCommitTxRequest) returns (StoreCommitTxResponse);
rpc DiscardTx(StoreDiscardTxRequest) returns (StoreDiscardTxResponse);
}

// Requests an artifact from the Elastic Agent.
message ArtifactFetchRequest {
// Token that is used to uniquely identify the collection of inputs to the agent. When started this is provided
// in the `ConnInfo`.
string token = 1;
// ID of the artifact.
string id = 2;
// SHA256 of the artifact.
string sha256 = 3;
}

// Content of the artifact.
message ArtifactFetchResponse {
oneof content_eof {
// Artifact content.
bytes content = 1;
// End-of-file.
google.protobuf.Empty eof = 2;
}
}

service ElasticAgentArtifact {
// Fetches an artifact from the artifact store.
//
// Response from this call can be chunked over multiple `ArtifactFetchResponse` for very large responses. A minimum
// of two responses will always be returned. The last response has eof set.
rpc Fetch(ArtifactFetchRequest) returns (stream ArtifactFetchResponse);
}

message LogMessage {
// ID of the unit.
string unit_id = 1;
// Type of the unit.
UnitType unit_type = 2;
// ECS log message body JSON encoded.
bytes message = 3;
}

message LogMessageRequest {
// Token that is used to uniquely identify the connection to the Elastic Agent.
string token = 1;
// Multiple message to report at the same time.
repeated LogMessage messages = 2;
}

message LogMessageResponse {
// Empty at the moment, defined for possibility of adding future return values.
}

// Log service is only exposed to programs that are not started as sub-processes by Elastic Agent.
//
// This allows services that are not started as sub-processes to write to the same stdout that programs that are
// started as subprocess. A program that is as a sub-process with stdout connected does not have the ability to use
// this service.
service ElasticAgentLog {
// Log messages to the Elastic Agent.
rpc Log(LogMessageRequest) returns (LogMessageResponse);
}
Loading

0 comments on commit 43bacbe

Please sign in to comment.