Skip to content

Commit

Permalink
Parametrize multi-tenant example (#1109)
Browse files Browse the repository at this point in the history
  • Loading branch information
spetz committed Aug 5, 2024
1 parent 01e59e6 commit 7db0f52
Show file tree
Hide file tree
Showing 2 changed files with 280 additions and 192 deletions.
234 changes: 138 additions & 96 deletions examples/src/multi-tenant/consumer/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,114 +13,150 @@ use iggy::users::defaults::{DEFAULT_ROOT_PASSWORD, DEFAULT_ROOT_USERNAME};
use iggy::utils::duration::IggyDuration;
use iggy_examples::shared::args::Args;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::str::FromStr;
use tokio::task::JoinHandle;
use tracing::{error, info};

const TENANT1_STREAM: &str = "tenant_1";
const TENANT2_STREAM: &str = "tenant_2";
const TENANT3_STREAM: &str = "tenant_3";
const TENANT1_USER: &str = "tenant_1_consumer";
const TENANT2_USER: &str = "tenant_2_consumer";
const TENANT3_USER: &str = "tenant_3_consumer";
const PASSWORD: &str = "secret";
const TOPICS: &[&str] = &["events", "logs", "notifications"];
const CONSUMER_GROUP: &str = "multi-tenant-consumer";
const CONSUMER_GROUP: &str = "multi-tenant";
const PASSWORD: &str = "secret";

struct Tenant {
id: u32,
stream: String,
user: String,
client: IggyClient,
consumers: Vec<TenantConsumer>,
}

impl Tenant {
pub fn new(id: u32, stream: String, user: String, client: IggyClient) -> Self {
Self {
id,
stream,
user,
client,
consumers: Vec::new(),
}
}

pub fn add_consumers(&mut self, consumers: Vec<TenantConsumer>) {
self.consumers.extend(consumers);
}
}

struct TenantConsumer {
tenant: String,
id: u32,
stream: String,
topic: String,
consumer: IggyConsumer,
}

impl TenantConsumer {
pub fn new(id: u32, stream: String, topic: String, consumer: IggyConsumer) -> Self {
Self {
id,
stream,
topic,
consumer,
}
}
}

#[tokio::main]
async fn main() -> anyhow::Result<(), Box<dyn Error>> {
let args = Args::parse();
tracing_subscriber::fmt::init();
print_info("Multi-tenant consumer has started");
let tenants_count = env::var("TENANTS_COUNT")
.unwrap_or_else(|_| 3.to_string())
.parse::<u32>()
.expect("Invalid tenants count");

let consumers_count = env::var("CONSUMERS_COUNT")
.unwrap_or_else(|_| 1.to_string())
.parse::<u32>()
.expect("Invalid consumers count");

let ensure_access = env::var("ENSURE_ACCESS")
.unwrap_or_else(|_| "true".to_string())
.parse::<bool>()
.expect("Invalid ensure stream access");

print_info("Multi-tenant consumers has started, tenants: {tenants_count}, consumers: {producers_count}, partitions: {partitions_count}");
let address = args.tcp_server_address;

print_info("Creating root client to manage streams and users");
let root_client = create_client(&address, DEFAULT_ROOT_USERNAME, DEFAULT_ROOT_PASSWORD).await?;

print_info("Creating users with permissions for each tenant");
create_user(TENANT1_STREAM, TOPICS, TENANT1_USER, &root_client).await?;
create_user(TENANT2_STREAM, TOPICS, TENANT2_USER, &root_client).await?;
create_user(TENANT3_STREAM, TOPICS, TENANT3_USER, &root_client).await?;
print_info("Creating users with stream permissions for each tenant");
let mut streams_with_users = HashMap::new();
for i in 1..=tenants_count {
let name = format!("tenant_{i}");
let stream = format!("{name}_stream");
let user = format!("{name}_consumer");
create_user(&stream, TOPICS, &user, &root_client).await?;
streams_with_users.insert(stream, user);
}

print_info("Disconnecting root client");
root_client.disconnect().await?;

print_info("Creating clients for each tenant");
let tenant1_client = create_client(&address, TENANT1_USER, PASSWORD).await?;
let tenant2_client = create_client(&address, TENANT2_USER, PASSWORD).await?;
let tenant3_client = create_client(&address, TENANT3_USER, PASSWORD).await?;

print_info("Ensuring access to topics for each tenant");
ensure_topics_access(
&tenant1_client,
TOPICS,
TENANT1_STREAM,
&[TENANT2_STREAM, TENANT3_STREAM],
)
.await?;
ensure_topics_access(
&tenant2_client,
TOPICS,
TENANT2_STREAM,
&[TENANT1_STREAM, TENANT3_STREAM],
)
.await?;
ensure_topics_access(
&tenant3_client,
TOPICS,
TENANT3_STREAM,
&[TENANT1_STREAM, TENANT2_STREAM],
)
.await?;
let mut tenants = Vec::new();
let mut tenant_id = 1;
for (stream, user) in streams_with_users.into_iter() {
let client = create_client(&address, &user, PASSWORD).await?;
tenants.push(Tenant::new(tenant_id, stream, user, client));
tenant_id += 1;
}

print_info("Creating consumer for each tenant");
let consumers1 = create_consumers(
"tenant_1",
&tenant1_client,
TENANT1_STREAM,
TOPICS,
args.messages_per_batch,
&args.interval,
)
.await?;
let consumers2 = create_consumers(
"tenant_2",
&tenant2_client,
TENANT2_STREAM,
TOPICS,
args.messages_per_batch,
&args.interval,
)
.await?;
let consumers3 = create_consumers(
"tenant_3",
&tenant3_client,
TENANT3_STREAM,
TOPICS,
args.messages_per_batch,
&args.interval,
)
.await?;
if ensure_access {
print_info("Ensuring access to stream topics for each tenant");
for tenant in tenants.iter() {
let unavailable_streams = tenants
.iter()
.filter(|t| t.stream != tenant.stream)
.map(|t| t.stream.as_str())
.collect::<Vec<_>>();
ensure_stream_topics_access(
&tenant.client,
TOPICS,
&tenant.stream,
&unavailable_streams,
)
.await?;
}
}

print_info("Starting consumers for each tenant");
let consumer1_tasks = start_consumers(consumers1);
let consumer2_tasks = start_consumers(consumers2);
let consumer3_tasks = start_consumers(consumers3);
print_info("Creating {consumers_count} consumer(s) for each tenant");
for tenant in tenants.iter_mut() {
let consumers = create_consumers(
&tenant.client,
consumers_count,
&tenant.stream,
TOPICS,
args.messages_per_batch,
&args.interval,
)
.await?;
tenant.add_consumers(consumers);
info!(
"Created {consumers_count} consumer(s) for tenant stream: {}, username: {}",
tenant.stream, tenant.user
);
}

print_info("Starting {consumers_count} consumers(s) for each tenant");
let mut tasks = Vec::new();
tasks.extend(consumer1_tasks);
tasks.extend(consumer2_tasks);
tasks.extend(consumer3_tasks);
join_all(tasks).await;
for tenant in tenants.into_iter() {
let producers_tasks = start_consumers(tenant.id, tenant.consumers);
tasks.extend(producers_tasks);
}

join_all(tasks).await;
print_info("Disconnecting clients");

Ok(())
}

Expand Down Expand Up @@ -170,11 +206,13 @@ async fn create_user(
Ok(())
}

fn start_consumers(consumers: Vec<TenantConsumer>) -> Vec<JoinHandle<()>> {
fn start_consumers(tenant_id: u32, consumers: Vec<TenantConsumer>) -> Vec<JoinHandle<()>> {
let mut tasks = Vec::new();
for mut consumer in consumers {
let task = tokio::spawn(async move {
let tenant = consumer.tenant;
let consumer_id = consumer.id;
let stream = consumer.stream;
let topic = consumer.topic;
while let Some(message) = consumer.consumer.next().await {
if let Ok(message) = message {
let current_offset = message.current_offset;
Expand All @@ -188,9 +226,9 @@ fn start_consumers(consumers: Vec<TenantConsumer>) -> Vec<JoinHandle<()>> {
}

let payload = payload.unwrap();
info!("Tenant: {tenant} consumer received: {payload} from partition: {partition_id}, at offset: {offset}, current offset: {current_offset}");
info!("Tenant: {tenant_id} consumer: {consumer_id} received: {payload} from partition: {partition_id}, topic: {topic}, stream: {stream}, at offset: {offset}, current offset: {current_offset}");
} else if let Err(error) = message {
error!("Error while handling message: {error}, by: {tenant} consumer.");
error!("Error while handling message: {error} by tenant: {tenant_id} consumer: {consumer_id}, topic: {topic}, stream: {stream}");
continue;
}
}
Expand All @@ -201,33 +239,37 @@ fn start_consumers(consumers: Vec<TenantConsumer>) -> Vec<JoinHandle<()>> {
}

async fn create_consumers(
tenant: &str,
client: &IggyClient,
consumers_count: u32,
stream: &str,
topics: &[&str],
batch_size: u32,
interval: &str,
) -> Result<Vec<TenantConsumer>, IggyError> {
let mut consumers = Vec::new();
for topic in topics {
let mut consumer = client
.consumer_group(CONSUMER_GROUP, stream, topic)?
.batch_size(batch_size)
.poll_interval(IggyDuration::from_str(interval).expect("Invalid duration"))
.polling_strategy(PollingStrategy::next())
.auto_join_consumer_group()
.auto_commit(AutoCommit::After(AutoCommitAfter::PollingMessages))
.build();
consumer.init().await?;
consumers.push(TenantConsumer {
tenant: tenant.to_owned(),
consumer,
});
for id in 1..=consumers_count {
let mut consumer = client
.consumer_group(CONSUMER_GROUP, stream, topic)?
.batch_size(batch_size)
.poll_interval(IggyDuration::from_str(interval).expect("Invalid duration"))
.polling_strategy(PollingStrategy::next())
.auto_join_consumer_group()
.auto_commit(AutoCommit::After(AutoCommitAfter::PollingMessages))
.build();
consumer.init().await?;
consumers.push(TenantConsumer::new(
id,
stream.to_owned(),
topic.to_string(),
consumer,
));
}
}
Ok(consumers)
}

async fn ensure_topics_access(
async fn ensure_stream_topics_access(
client: &IggyClient,
topics: &[&str],
available_stream: &str,
Expand Down
Loading

0 comments on commit 7db0f52

Please sign in to comment.