Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(projects): Treat fetch failures as pending #4140

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions relay-server/src/services/projects/cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use std::collections::{BTreeMap, BTreeSet};
use std::convert::Infallible;
use std::error::Error;
use std::sync::Arc;
use std::time::Duration;
Expand Down Expand Up @@ -493,12 +494,11 @@ impl ProjectSource {
project_key: ProjectKey,
no_cache: bool,
cached_state: ProjectFetchState,
) -> Result<ProjectFetchState, ()> {
) -> Result<ProjectFetchState, ProjectSourceError> {
let state_opt = self
.local_source
.send(FetchOptionalProjectState { project_key })
.await
.map_err(|_| ())?;
.await?;

if let Some(state) = state_opt {
return Ok(ProjectFetchState::new(state));
Expand All @@ -516,12 +516,11 @@ impl ProjectSource {
if let Some(redis_source) = self.redis_source {
let current_revision = current_revision.clone();

let redis_permit = self.redis_semaphore.acquire().await.map_err(|_| ())?;
let redis_permit = self.redis_semaphore.acquire().await?;
let state_fetch_result = tokio::task::spawn_blocking(move || {
redis_source.get_config_if_changed(project_key, current_revision.as_deref())
})
.await
.map_err(|_| ())?;
.await?;
drop(redis_permit);

match state_fetch_result {
Expand Down Expand Up @@ -553,8 +552,7 @@ impl ProjectSource {
current_revision,
no_cache,
})
.await
.map_err(|_| ())?;
.await?;

match state {
UpstreamProjectState::New(state) => Ok(ProjectFetchState::new(state.sanitized())),
Expand All @@ -563,6 +561,22 @@ impl ProjectSource {
}
}

#[derive(Debug, thiserror::Error)]
enum ProjectSourceError {
#[error("redis permit error {0}")]
RedisPermit(#[from] tokio::sync::AcquireError),
#[error("redis join error {0}")]
RedisJoin(#[from] tokio::task::JoinError),
#[error("upstream error {0}")]
Upstream(#[from] relay_system::SendError),
}

impl From<Infallible> for ProjectSourceError {
fn from(value: Infallible) -> Self {
match value {}
}
}

/// Updates the cache with new project state information.
struct UpdateProjectState {
/// The public key to fetch the project by.
Expand Down Expand Up @@ -777,7 +791,13 @@ impl ProjectCacheBroker {
let state = source
.fetch(project_key, no_cache, cached_state)
.await
.unwrap_or_else(|()| ProjectFetchState::disabled());
.unwrap_or_else(|e| {
Copy link
Member

@Dav1dde Dav1dde Oct 15, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Possibly a bit nicer to read with:

    .inspect_err(|e| relay_log::error!(..))
    .unwrap_or(ProjectFetchState::pending())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how unwrap_or_else only creates a ProjectFetchState in the error case. ProjectFetchState::pending() calls Instant::now(), so it's not entirely free.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That part could be fixed by using unwrap_or_else, but I think the current version is fine as well.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I went overboard with my example, I mainly meant the split of logging into the inspect_err, keep the side effect separate from the conversion. But either way it's fine and possibly worse with the how rustfmt wants to format it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That part could be fixed by using unwrap_or_else

Yes, but then you need to ignore the argument passed to unwrap_or_else(the error), that you previously inspected with inspect_err. Is that better?

                .inspect_err(|e| {
                    relay_log::error!(
                        error = e as &dyn Error,
                        "Failed to fetch project from source"
                    );
                })
                .unwrap_or_else(|_| ProjectFetchState::pending());
                
                // vs
                
                .unwrap_or_else(|e| {
                    relay_log::error!(
                        error = &e as &dyn Error,
                        "Failed to fetch project from source"
                    );
                    ProjectFetchState::pending()
                });

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, I think the current version is superior.

relay_log::error!(
error = &e as &dyn Error,
"Failed to fetch project from source"
);
ProjectFetchState::pending()
});

let message = UpdateProjectState {
project_key,
Expand Down
17 changes: 8 additions & 9 deletions tests/integration/test_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,17 +154,12 @@ def get_project_config():
mini_sentry.clear_test_failures()


def test_query_retry_maxed_out(mini_sentry, relay_with_processing, events_consumer):
def test_query_retry_maxed_out(mini_sentry, relay):
"""
Assert that a query is not retried an infinite amount of times.

This is not specific to processing or store, but here we have the outcomes
consumer which we can use to assert that an event has been dropped.
"""
request_count = 0

events_consumer = events_consumer()

original_get_project_config = mini_sentry.app.view_functions["get_project_config"]

@mini_sentry.app.endpoint("get_project_config")
Expand All @@ -184,9 +179,7 @@ def get_project_config():
for retry in range(RETRIES): # 1 retry
query_timeout += 1 * 1.5 ** (retry + 1)

relay = relay_with_processing(
{"limits": {"query_timeout": math.ceil(query_timeout)}}
)
relay = relay(mini_sentry, {"limits": {"query_timeout": math.ceil(query_timeout)}})

# No error messages yet
assert mini_sentry.test_failures.empty()
Expand All @@ -199,6 +192,12 @@ def get_project_config():
assert {str(e) for _, e in mini_sentry.current_test_failures()} == {
"Relay sent us event: error fetching project states: upstream request returned error 500 Internal Server Error: no error details",
}

time.sleep(1) # Wait for project to be cached

# Relay still accepts events for this project
next_response = relay.send_event(42)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the old version, this call raises a 403 error.

assert "id" in next_response
finally:
mini_sentry.clear_test_failures()

Expand Down
Loading