Skip to content

Commit

Permalink
fix: remove duplicate trace in op ctx (#845)
Browse files Browse the repository at this point in the history
<!-- Please make sure there is an issue that this PR is correlated to. -->

## Changes

<!-- If there are frontend changes, please include screenshots. -->
  • Loading branch information
MasterPtato committed Jun 4, 2024
1 parent 09f3ddc commit dc9812c
Show file tree
Hide file tree
Showing 38 changed files with 44 additions and 116 deletions.
1 change: 0 additions & 1 deletion lib/api-helper/build/src/macro_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,6 @@ pub async fn __with_ctx<A: auth::ApiAuth + Send>(
ts,
ts,
(),
Vec::new(),
);

// Create auth
Expand Down
15 changes: 4 additions & 11 deletions lib/chirp/client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,17 +131,6 @@ impl SharedClient {
ts,
)
}

pub fn wrap_with(
self: Arc<Self>,
parent_req_id: Uuid,
ray_id: Uuid,
ts: i64,
trace: Vec<chirp::TraceEntry>,
perf_ctx: chirp_perf::PerfCtxInner,
) -> Client {
Client::new(self, parent_req_id, ray_id, trace, Arc::new(perf_ctx), ts)
}
}

/// Used to communicate with other Chirp clients.
Expand Down Expand Up @@ -238,6 +227,10 @@ impl Client {
pub fn ts(&self) -> i64 {
self.ts
}

pub fn trace(&self) -> &[chirp::TraceEntry] {
&self.trace
}
}

impl Drop for Client {
Expand Down
29 changes: 12 additions & 17 deletions lib/chirp/worker/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -696,22 +696,16 @@ where
let worker_req = {
// Build client
let ts = rivet_util::timestamp::now();
let client = self.shared_client.clone().wrap_with(
req_id,
ray_id,
ts,
{
let mut x = trace.clone();
x.push(chirp::TraceEntry {
context_name: worker_name.clone(),
req_id: req_id_proto.clone(),
ts: rivet_util::timestamp::now(),
run_context: chirp::RunContext::Service as i32,
});
x
},
chirp_perf::PerfCtxInner::new(self.redis_cache.clone(), ts, req_id, ray_id),
);
let client = self.shared_client.clone().wrap(req_id, ray_id, {
let mut x = trace.clone();
x.push(chirp::TraceEntry {
context_name: worker_name.clone(),
req_id: req_id_proto.clone(),
ts: rivet_util::timestamp::now(),
run_context: chirp::RunContext::Service as i32,
});
x
});
let conn = Connection::new(client, self.pools.clone(), self.cache.clone());

let ts = req_debug
Expand Down Expand Up @@ -743,7 +737,6 @@ where
ts,
req_ts,
req_body,
trace,
),
dont_log_body,
allow_recursive,
Expand Down Expand Up @@ -838,6 +831,8 @@ where
.op_ctx
.trace()
.iter()
// Don't include the last trace event, which is the current request
.take(req.op_ctx.trace().len() - 1)
.any(|x| x.context_name == req.op_ctx.name());
let handle_res = if is_recursive {
Ok(Err(err_code!(
Expand Down
1 change: 0 additions & 1 deletion lib/chirp/worker/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ impl TestCtx {
rivet_util::timestamp::now(),
rivet_util::timestamp::now(),
(),
Vec::new(),
);

Ok(TestCtx {
Expand Down
19 changes: 10 additions & 9 deletions lib/connection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,24 @@ impl Connection {
&self,
parent_req_id: Uuid,
ray_id: Uuid,
trace: Vec<chirp_client::TraceEntry>,
) -> GlobalResult<Connection> {
trace_entry: chirp_client::TraceEntry,
) -> Connection {
// Not the same as the operation ctx's ts because this cannot be overridden by debug start ts
let ts = rivet_util::timestamp::now();
let redis_cache = self.pools.redis("ephemeral")?;

Ok(Connection::new(
(*self.client).clone().wrap_with(
Connection::new(
(*self.client).clone().wrap(
parent_req_id,
ray_id,
ts,
trace,
chirp_perf::PerfCtxInner::new(redis_cache, ts, parent_req_id, ray_id),
{
let mut x = self.client.trace().to_vec();
x.push(trace_entry);
x
},
),
self.pools.clone(),
self.cache.clone(),
))
)
}

pub fn chirp(&self) -> &chirp_client::Client {
Expand Down
63 changes: 18 additions & 45 deletions lib/operation/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ where
ts: i64,
req_ts: i64,
body: B,
// Trace of all requests not including this request. The client does include
// this request in the trace, though.
trace: Vec<chirp_client::TraceEntry>,
}

impl<B> OperationContext<B>
Expand All @@ -58,7 +55,6 @@ where
ts: i64,
req_ts: i64,
body: B,
trace: Vec<chirp_client::TraceEntry>,
) -> Self {
OperationContext {
name,
Expand All @@ -69,7 +65,6 @@ where
ts,
req_ts,
body,
trace,
}
}

Expand All @@ -90,7 +85,7 @@ where

// TODO: Throw dedicated "timed out" error here
// Process the request
let req_op_ctx = self.wrap::<O>(body)?;
let req_op_ctx = self.wrap::<O>(body);
let timeout_fut = tokio::time::timeout(O::TIMEOUT, O::handle(req_op_ctx).in_current_span());
let res = tokio::task::Builder::new()
.name("operation::handle")
Expand Down Expand Up @@ -134,49 +129,28 @@ where
}

/// Adds trace and correctly wraps `Connection` (and subsequently `chirp_client::Client`).
fn wrap<O: Operation>(&self, body: O::Request) -> GlobalResult<OperationContext<O::Request>> {
let ray_id = Uuid::new_v4();
// Add self to new operation's trace
let trace = {
let mut x = self.trace.clone();
x.push(chirp_client::TraceEntry {
context_name: self.name.clone(),
req_id: Some(self.req_id.into()),
ts: rivet_util::timestamp::now(),
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
});
x
fn wrap<O: Operation>(&self, body: O::Request) -> OperationContext<O::Request> {
let req_id = Uuid::new_v4();
let trace_entry = chirp_client::TraceEntry {
context_name: O::NAME.to_string(),
req_id: Some(req_id.into()),
ts: rivet_util::timestamp::now(),
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
};

Ok(OperationContext {
OperationContext {
name: O::NAME.to_string(),
timeout: O::TIMEOUT,
conn: self.conn.wrap(self.req_id, ray_id, {
let mut x = trace.clone();

// Add new operation's trace to its connection (and chirp client)
x.push(chirp_client::TraceEntry {
context_name: O::NAME.to_string(),
req_id: Some(self.req_id.into()),
ts: rivet_util::timestamp::now(),
run_context: match rivet_util::env::run_context() {
rivet_util::env::RunContext::Service => chirp_client::RunContext::Service,
rivet_util::env::RunContext::Test => chirp_client::RunContext::Test,
} as i32,
});

x
})?,
req_id: self.req_id,
ray_id,
conn: self.conn.wrap(req_id, self.ray_id, trace_entry),
req_id,
ray_id: self.ray_id,
ts: util::timestamp::now(),
req_ts: self.req_ts,
body,
trace,
})
}
}

/// Clones everything but the body. This should always be used over `.clone()` unless you need to
Expand All @@ -191,7 +165,6 @@ where
ts: self.ts,
req_ts: self.req_ts,
body: (),
trace: self.trace.clone(),
}
}

Expand Down Expand Up @@ -231,11 +204,11 @@ where
}

pub fn trace(&self) -> &[chirp_client::TraceEntry] {
&self.trace
self.conn.trace()
}

pub fn test(&self) -> bool {
self.trace
self.trace()
.iter()
.any(|x| x.run_context == chirp_client::RunContext::Test as i32)
}
Expand Down
1 change: 0 additions & 1 deletion svc/api/admin/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

Ctx { op_ctx }
Expand Down
1 change: 0 additions & 1 deletion svc/api/auth/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let (user_id, user_token) = Self::issue_user_token(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/cf-verification/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let http_client = rivet_cf_verification::Config::builder()
Expand Down
1 change: 0 additions & 1 deletion svc/api/cloud/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// // Create temp team
Expand Down
1 change: 0 additions & 1 deletion svc/api/group/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let (_user_id, user_token) = Self::issue_user_token(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/identity/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let (primary_region_id, _) = Self::setup_region(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/job/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let nomad_config = nomad_util::config_from_env().unwrap();
Expand Down
1 change: 0 additions & 1 deletion svc/api/kv/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let (primary_region_id, _) = Self::setup_region(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/matchmaker/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let (primary_region_id, primary_region_name_id) = Self::setup_region(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/portal/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let (_user_id, user_token) = Self::issue_user_token(&op_ctx).await;
Expand Down
1 change: 0 additions & 1 deletion svc/api/status/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
// util::timestamp::now(),
// util::timestamp::now(),
// (),
// Vec::new(),
// );

// let http_client = rivet_status::Config::builder()
Expand Down
1 change: 0 additions & 1 deletion svc/api/traefik-provider/tests/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ impl Ctx {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

Ctx { op_ctx }
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/build/standalone/default-create/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ pub async fn run_from_env() -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

for build in DEFAULT_BUILDS {
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/datacenter-tls-renew/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ pub async fn run_from_env(pools: rivet_pools::Pools) -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let updated_datacenter_ids = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/default-update/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,6 @@ pub async fn run_from_env(use_autoscaler: bool) -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

// Read config from env
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/fix-tls/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ pub async fn run_from_env(ts: i64) -> GlobalResult<()> {
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let datacenter_ids = vec!["5767a802-5c7c-4563-a266-33c014f7e244"]
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/gc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ pub async fn run_from_env(ts: i64, pools: rivet_pools::Pools) -> GlobalResult<()
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let datacenter_ids = rivet_pools::utils::crdb::tx(&ctx.crdb().await?, |tx| {
Expand Down
1 change: 0 additions & 1 deletion svc/pkg/cluster/standalone/metrics-publish/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ pub async fn run_from_env(_ts: i64, pools: rivet_pools::Pools) -> GlobalResult<(
util::timestamp::now(),
util::timestamp::now(),
(),
Vec::new(),
);

let servers = select_servers(&ctx).await?;
Expand Down
Loading

0 comments on commit dc9812c

Please sign in to comment.