Skip to content

Commit

Permalink
Return not found resource for missing permissions (#1157)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz authored Aug 18, 2024
1 parent b7f64ab commit ecc0d94
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 52 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 4 additions & 6 deletions examples/src/multi-tenant/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,14 @@ async fn ensure_stream_topics_access(
let topic_id = Identifier::named(topic)?;
client
.get_topic(&available_stream.try_into()?, &topic_id)
.await
.unwrap_or_else(|_| {
panic!("No access to topic: {topic} in stream: {available_stream}")
});
.await?
.unwrap_or_else(|| panic!("No access to topic: {topic} in stream: {available_stream}"));
info!("Ensured access to topic: {topic} in stream: {available_stream}");
for stream in unavailable_streams {
if client
.get_topic(&Identifier::named(stream)?, &topic_id)
.await
.is_err()
.await?
.is_none()
{
info!("Ensured no access to topic: {topic} in stream: {stream}");
} else {
Expand Down
8 changes: 4 additions & 4 deletions examples/src/multi-tenant/producer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -275,14 +275,14 @@ async fn ensure_stream_access(
) -> Result<(), IggyError> {
client
.get_stream(&available_stream.try_into()?)
.await
.unwrap_or_else(|_| panic!("No access to stream: {available_stream}"));
.await?
.unwrap_or_else(|| panic!("No access to stream: {available_stream}"));
info!("Ensured access to stream: {available_stream}");
for stream in unavailable_streams {
if client
.get_stream(&Identifier::named(stream)?)
.await
.is_err()
.await?
.is_none()
{
info!("Ensured no access to stream: {stream}");
} else {
Expand Down
2 changes: 1 addition & 1 deletion server/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "server"
version = "0.4.12"
version = "0.4.13"
edition = "2021"
build = "src/build.rs"

Expand Down
34 changes: 14 additions & 20 deletions server/src/streaming/systems/consumer_groups.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ impl System {
group_id: &Identifier,
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
self.ensure_authenticated(session)?;
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.get_consumer_group(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;

Expand All @@ -33,11 +32,10 @@ impl System {
topic_id: &Identifier,
) -> Result<Vec<&RwLock<ConsumerGroup>>, IggyError> {
self.ensure_authenticated(session)?;
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.get_consumer_groups(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;

Expand All @@ -54,11 +52,10 @@ impl System {
) -> Result<&RwLock<ConsumerGroup>, IggyError> {
self.ensure_authenticated(session)?;
{
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.create_consumer_group(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;
}
Expand All @@ -78,14 +75,13 @@ impl System {
let stream_id_value;
let topic_id_value;
{
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.delete_consumer_group(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;
stream_id_value = stream.stream_id;
stream_id_value = topic.stream_id;
topic_id_value = topic.topic_id;
}

Expand Down Expand Up @@ -124,14 +120,13 @@ impl System {
let stream_id_value;
let topic_id_value;
{
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.join_consumer_group(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;
stream_id_value = stream.stream_id;
stream_id_value = topic.stream_id;
topic_id_value = topic.topic_id;
}

Expand Down Expand Up @@ -167,11 +162,10 @@ impl System {
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
{
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.leave_consumer_group(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;
}
Expand Down
10 changes: 4 additions & 6 deletions server/src/streaming/systems/consumer_offsets.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@ impl System {
offset: u64,
) -> Result<(), IggyError> {
self.ensure_authenticated(session)?;
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.store_consumer_offset(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;

Expand All @@ -38,11 +37,10 @@ impl System {
partition_id: Option<u32>,
) -> Result<ConsumerOffsetInfo, IggyError> {
self.ensure_authenticated(session)?;
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
let topic = self.find_topic(session, stream_id, topic_id)?;
self.permissioner.get_consumer_offset(
session.get_user_id(),
stream.stream_id,
topic.stream_id,
topic.topic_id,
)?;

Expand Down
12 changes: 8 additions & 4 deletions server/src/streaming/systems/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,14 @@ impl System {
identifier: &Identifier,
) -> Result<&Stream, IggyError> {
self.ensure_authenticated(session)?;
let stream = self.get_stream(identifier)?;
self.permissioner
.get_stream(session.get_user_id(), stream.stream_id)?;
Ok(stream)
let stream = self.get_stream(identifier);
if let Ok(stream) = stream {
self.permissioner
.get_stream(session.get_user_id(), stream.stream_id)?;
return Ok(stream);
}

stream
}

pub fn get_stream(&self, identifier: &Identifier) -> Result<&Stream, IggyError> {
Expand Down
14 changes: 9 additions & 5 deletions server/src/streaming/systems/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,15 @@ impl System {
topic_id: &Identifier,
) -> Result<&Topic, IggyError> {
self.ensure_authenticated(session)?;
let stream = self.get_stream(stream_id)?;
let topic = stream.get_topic(topic_id)?;
self.permissioner
.get_topic(session.get_user_id(), stream.stream_id, topic.topic_id)?;
Ok(topic)
let stream = self.find_stream(session, stream_id)?;
let topic = stream.get_topic(topic_id);
if let Ok(topic) = topic {
self.permissioner
.get_topic(session.get_user_id(), stream.stream_id, topic.topic_id)?;
return Ok(topic);
}

topic
}

pub fn find_topics(
Expand Down
14 changes: 9 additions & 5 deletions server/src/streaming/systems/users.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,17 @@ impl System {

pub fn find_user(&self, session: &Session, user_id: &Identifier) -> Result<&User, IggyError> {
self.ensure_authenticated(session)?;
let user = self.get_user(user_id)?;
let session_user_id = session.get_user_id();
if user.id != session_user_id {
self.permissioner.get_user(session_user_id)?;
let user = self.get_user(user_id);
if let Ok(user) = user {
let session_user_id = session.get_user_id();
if user.id != session_user_id {
self.permissioner.get_user(session_user_id)?;
}

return Ok(user);
}

Ok(user)
user
}

pub fn get_user(&self, user_id: &Identifier) -> Result<&User, IggyError> {
Expand Down

0 comments on commit ecc0d94

Please sign in to comment.