Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add server logs endpoint #1005

Merged
merged 1 commit into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions fern/definition/servers/__package__.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ service:
type: uuid
response: GetServerResponse

list:
path: ""
method: GET
docs: >-
Lists all servers associated with the token used. Can be filtered by
tags in the query string.
request:
name: GetServersRequest
query-parameters:
tags: optional<string>
response: ListServersResponse

create:
path: ""
method: POST
Expand Down Expand Up @@ -44,18 +56,6 @@ service:
type: optional<long>
response: DestroyServerResponse

list:
path: /list
method: GET
docs: >-
Lists all servers associated with the token used. Can be filtered by
tags in the query string.
request:
name: GetServersRequest
query-parameters:
tags: optional<string>
response: ListServersResponse

types:
GetServerResponse:
properties:
Expand Down
43 changes: 43 additions & 0 deletions fern/definition/servers/logs.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
# yaml-language-server: $schema=https://raw.githubusercontent.com/fern-api/fern/main/fern.schema.json

imports:
commons: ../common.yml
uploadCommons: ../upload/common.yml
cloudCommons: ../cloud/common.yml

service:
auth: true
base-path: /servers
endpoints:
getServerLogs:
path: /{server_id}/logs
method: GET
docs: >-
Returns the logs for a given server.
path-parameters:
server_id:
type: uuid
request:
name: GetServerLogsRequest
query-parameters:
stream: LogStream
watch_index:
docs: A query parameter denoting the requests watch index.
type: optional<string>
response: GetServerLogsResponse

types:
GetServerLogsResponse:
properties:
lines:
docs: Sorted old to new.
type: list<string>
timestamps:
docs: Sorted old to new.
type: list<string>
watch: commons.WatchResponse

LogStream:
enum:
- std_out
- std_err
40 changes: 40 additions & 0 deletions infra/tf/vector/vector.tf
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,24 @@ resource "helm_release" "vector" {
}
}

dynamic_servers = {
type = "filter"
inputs = ["vector", "tcp_json"]
condition = {
type = "vrl"
source = ".source == \"dynamic_servers\""
}
}

ds_fix_id = {
type = "remap"
inputs = ["dynamic_servers"]
source = <<-EOF
.server_id = .run_id
del(.run_id)
EOF
}

opengb_worker = {
type = "filter"
inputs = ["http_json"]
Expand Down Expand Up @@ -120,6 +138,28 @@ resource "helm_release" "vector" {
}
}

clickhouse_ds_logs = {
type = "clickhouse"
inputs = ["ds_fix_id"]
compression = "gzip"
database = "db_ds_log"
endpoint = "https://${var.clickhouse_host}:${var.clickhouse_port_https}"
table = "server_logs"
auth = {
strategy = "basic"
user = "vector"
# Escape values for Vector
password = replace(module.secrets.values["clickhouse/users/vector/password"], "$", "$$")
}
tls = local.clickhouse_k8s ? {
ca_file = "/usr/local/share/ca-certificates/clickhouse-ca.crt"
} : {}
batch = {
# Speed up for realtime-ish logs
timeout_secs = 1.0
}
}

clickhouse_opengb_logs = {
type = "clickhouse"
inputs = ["opengb_worker"]
Expand Down
4 changes: 2 additions & 2 deletions lib/bolt/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions lib/job-runner/src/log_shipper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ pub struct LogShipper {

pub job_run_id: String,
pub nomad_task_name: String,
pub runner: String,
}

impl LogShipper {
Expand Down Expand Up @@ -89,12 +90,12 @@ impl LogShipper {

while let Result::Ok(message) = self.msg_rx.recv() {
let vector_message = VectorMessage {
source: "job_run",
source: self.runner.as_str(),
run_id: self.job_run_id.as_str(),
task: self.nomad_task_name.as_str(),
stream_type: message.stream_type as u8,
ts: message.ts,
message: &message.message,
message: message.message.as_str(),
};

serde_json::to_writer(&mut stream, &vector_message)?;
Expand All @@ -110,7 +111,7 @@ impl LogShipper {
/// Vector-compatible message format
#[derive(Serialize)]
struct VectorMessage<'a> {
source: &'static str,
source: &'a str,
run_id: &'a str,
task: &'a str,
stream_type: u8,
Expand Down
9 changes: 5 additions & 4 deletions lib/job-runner/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ fn main() -> anyhow::Result<()> {
let root_user_enabled = std::env::var("NOMAD_META_root_user_enabled")
.context("NOMAD_META_root_user_enabled")?
== "1";
let runner = std::env::var("NOMAD_META_runner").unwrap_or("job_run".to_string());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should work for any existing usage of job-runner, while the new dynamic servers passes in a new var.


let (shutdown_tx, shutdown_rx) = mpsc::sync_channel(1);

Expand All @@ -39,6 +40,7 @@ fn main() -> anyhow::Result<()> {
msg_rx,
job_run_id,
nomad_task_name,
runner,
};
let log_shipper_thread = log_shipper.spawn();

Expand Down Expand Up @@ -226,15 +228,14 @@ fn ship_logs(
for line in stream.lines() {
// Throttle
if let Err(err) = throttle_short.tick() {
if err.first_throttle_in_window {
if send_message(
if err.first_throttle_in_window
&& send_message(
&msg_tx,
Some(&mut throttle_error),
stream_type,
format_rate_limit(err.time_remaining),
) {
break;
}
break;
}
continue;
} else if let Err(err) = throttle_long.tick() {
Expand Down
19 changes: 19 additions & 0 deletions proto/backend/ds/log.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
syntax = "proto3";

package rivet.backend.ds.log;

import "proto/common.proto";

enum StreamType {
STD_OUT = 0;
STD_ERR = 1;
}

message LogEntry {
// Timestamp the log was received (in nanoseconds).
int64 nts = 1;

// Message that was logged.
bytes message = 3;
}

1 change: 1 addition & 0 deletions scripts/openapi/gen_rust.sh
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ docker run --rm \
if [ "$FERN_GROUP" == "full" ]; then
# Fix OpenAPI bug (https://github.com/OpenAPITools/openapi-generator/issues/14171)
sed -i 's/CloudGamesLogStream/crate::models::CloudGamesLogStream/' "$GEN_PATH_RUST/src/apis/cloud_games_matchmaker_api.rs"
sed -i 's/ServersLogStream/crate::models::ServersLogStream/' "$GEN_PATH_RUST/src/apis/servers_logs_api.rs"
sed -i 's/AdminClustersPoolType/crate::models::AdminClustersPoolType/' "$GEN_PATH_RUST/src/apis/admin_clusters_servers_api.rs"

# Create variant specifically for the CLI
Expand Down
57 changes: 30 additions & 27 deletions sdks/full/go/servers/client/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading
Loading