diff --git a/Cargo.lock b/Cargo.lock index 1210702..4d97554 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -244,15 +244,6 @@ version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8c3c1a368f70d6cf7302d78f8f7093da241fb8e8807c05cc9e51a125895a6d5b" -[[package]] -name = "bincode" -version = "1.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" -dependencies = [ - "serde", -] - [[package]] name = "bitflags" version = "1.3.2" @@ -310,6 +301,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" + [[package]] name = "chrono" version = "0.4.38" @@ -721,15 +718,17 @@ dependencies = [ "anyhow", "async-trait", "base64 0.22.1", - "bincode", "chrono", "config", "csv", + "futures", + "hex", "http 1.1.0", "indoc", "ldap-poller", "ldap3", - "reqwest 0.11.27", + "native-tls", + "reqwest 0.12.8", "serde", "serde_json", "serde_yaml", @@ -1231,6 +1230,7 @@ dependencies = [ "hyper 1.4.1", "hyper-util", "rustls 0.23.14", + "rustls-native-certs 0.8.0", "rustls-pki-types", "tokio", "tokio-rustls 0.26.0", @@ -1249,19 +1249,6 @@ dependencies = [ "tokio-io-timeout", ] -[[package]] -name = "hyper-tls" -version = "0.5.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" -dependencies = [ - "bytes", - "hyper 0.14.30", - "native-tls", - "tokio", - "tokio-native-tls", -] - [[package]] name = "hyper-tls" version = "0.6.0" @@ -2193,6 +2180,55 @@ dependencies = [ "ipnet", ] +[[package]] +name = "quinn" +version = "0.11.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c7c5fdde3cdae7203427dc4f0a68fe0ed09833edc525a03456b153b79828684" +dependencies = [ + "bytes", + "pin-project-lite", + "quinn-proto", + "quinn-udp", + "rustc-hash", + "rustls 0.23.14", + "socket2", + "thiserror", + "tokio", + "tracing", +] + +[[package]] +name = "quinn-proto" +version = "0.11.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fadfaed2cd7f389d0161bb73eeb07b7b78f8691047a6f3e73caaeae55310a4a6" +dependencies = [ + "bytes", + "rand", + "ring 0.17.8", + "rustc-hash", + "rustls 0.23.14", + "slab", + "thiserror", + "tinyvec", + "tracing", +] + +[[package]] +name = "quinn-udp" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e346e016eacfff12233c243718197ca12f148c84e1e84268a896699b41c71780" +dependencies = [ + "cfg_aliases", + "libc", + "once_cell", + "socket2", + "tracing", + "windows-sys 0.59.0", +] + [[package]] name = "quote" version = "1.0.37" @@ -2301,17 +2337,14 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.30", "hyper-rustls 0.24.2", - "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", - "native-tls", "once_cell", "percent-encoding", "pin-project-lite", "rustls 0.21.12", - "rustls-native-certs 0.6.3", "rustls-pemfile 1.0.4", "serde", "serde_json", @@ -2319,7 +2352,6 @@ dependencies = [ "sync_wrapper 0.1.2", "system-configuration 0.5.1", "tokio", - "tokio-native-tls", "tokio-rustls 0.24.1", "tower-service", "url", @@ -2347,7 +2379,7 @@ dependencies = [ "http-body-util", "hyper 1.4.1", "hyper-rustls 0.27.3", - "hyper-tls 0.6.0", + "hyper-tls", "hyper-util", "ipnet", "js-sys", @@ -2357,7 +2389,11 @@ dependencies = [ "once_cell", "percent-encoding", "pin-project-lite", + "quinn", + "rustls 0.23.14", + "rustls-native-certs 0.8.0", "rustls-pemfile 2.2.0", + "rustls-pki-types", "serde", "serde_json", "serde_urlencoded", @@ -2365,6 +2401,7 @@ dependencies = [ "system-configuration 0.6.1", "tokio", "tokio-native-tls", + "tokio-rustls 0.26.0", "tower-service", "url", "wasm-bindgen", @@ -2461,6 +2498,12 @@ version = "0.1.24" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustc_version" version = "0.4.1" @@ -2529,6 +2572,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "415d9944693cb90382053259f89fbb077ea730ad7273047ec63b19bc9b160ba8" dependencies = [ "once_cell", + "ring 0.17.8", "rustls-pki-types", "rustls-webpki 0.102.8", "subtle", @@ -2549,21 +2593,22 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.6.3" +version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" dependencies = [ "openssl-probe", - "rustls-pemfile 1.0.4", + "rustls-pemfile 2.2.0", + "rustls-pki-types", "schannel", "security-framework", ] [[package]] name = "rustls-native-certs" -version = "0.7.3" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" dependencies = [ "openssl-probe", "rustls-pemfile 2.2.0", diff --git a/Cargo.toml b/Cargo.toml index 6395d7e..1b530e9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -9,7 +9,6 @@ publish = false anyhow = { version = "1.0.81", features = ["backtrace"] } async-trait = "0.1.82" base64 = "0.22.1" -bincode = "1.3.3" chrono = "0.4.19" config = { version = "0.14.0" } http = "1.1.0" @@ -27,6 +26,10 @@ zitadel-rust-client = { git = "https://github.com/famedly/zitadel-rust-client", wiremock = "0.6.2" csv = "1.3.0" tempfile = "3.12.0" +futures = "0.3.31" +ldap3 = { version = "0.11.1", default-features = false, features = ["tls-native"] } +native-tls = "0.2.12" +hex = "0.4.3" [dependencies.tonic] version = "*" diff --git a/sample-configs/csv-config.sample.yaml b/sample-configs/csv-config.sample.yaml new file mode 100644 index 0000000..bbba672 --- /dev/null +++ b/sample-configs/csv-config.sample.yaml @@ -0,0 +1,30 @@ +# Configuration for Famedly's Zitadel - has to be provided by Famedly +zitadel: + # The Famedly user endpoint to sync to. + url: https://auth.famedly.de + # The Famedly-provided service user credentials. + key_file: /opt/famedly-sync-agent/service-user.json + # The organization whose users to sync. + organization_id: 278274756195721220 + # The project to grant users access to. + project_id: 278274945274880004 + # The identity provider ID to enable SSO login for + idp_id: 281430143275106308 + +feature_flags: + - verify_email # Whether to ask users to verify their email addresses post sync + - verify_phone # Whether to ask users to verify their phone numbers post sync + # - sso_login # Whether to enable SSO login - Please note that his has some drawbacks and limitations, see the help center article for more information + # - dry_run # Disable syncing users to Zitadel - Intended to ensure syncs are working before productive deployment + # - deactivate_only # Only deactivate users, do not create or update them. + +# Configuration for the sources to sync from. +sources: + # Configuration for the CSV sources + # Updates Zitadel to match the CSV file. + #! DANGER: This will delete all users that are not in the CSV file! + csv: + # Path to the CSV file to read from. + # Expected structure of the CSV file is as follows: + # email,first_name,last_name,phone + file_path: ./tests/environment/files/test-users.csv diff --git a/config.sample.yaml b/sample-configs/ldap-config.sample.yaml similarity index 75% rename from config.sample.yaml rename to sample-configs/ldap-config.sample.yaml index 5ee56bb..679ccfb 100644 --- a/config.sample.yaml +++ b/sample-configs/ldap-config.sample.yaml @@ -16,7 +16,7 @@ feature_flags: - verify_phone # Whether to ask users to verify their phone numbers post sync # - sso_login # Whether to enable SSO login - Please note that his has some drawbacks and limitations, see the help center article for more information # - dry_run # Disable syncing users to Zitadel - Intended to ensure syncs are working before productive deployment - # - deactivate_only # Only deactivate users, do not create or update them. Keep in mind LDAP is cached and all the changes made on LDAP will be written to the cache as if they where applied. Therefore, only the deactivation changes will be applied to Zitadel but **all the other changes will be lost**. + # - deactivate_only # Only deactivate users, do not create or update them. # Configuration for the sources to sync from. sources: @@ -87,31 +87,3 @@ sources: # needed with the `ldaps` scheme, as the server will already be # hosting TLS. danger_use_start_tls: false - - # Path to the file that keeps track of previously synced LDAP entries. - # This file should be persisted, otherwise users may become out of sync. - cache_path: /opt/famedly-sync-agent/famedly-sync.cache - - # Configuration for the UKT source - a custom endpoint provided by UKT, - # which gives a list of emails of users that should be deleted from Zitadel. - ukt: - # Endpoint URL to fetch the list of users from. - endpoint_url: https://list.example.invalid/usersync4chat/maillist - # OAuth2 URL to fetch the token from. - oauth2_url: https://list.example.invalid/token - # Client ID - client_id: mock_client_id - # Client Secret - client_secret: mock_client_secret - # Scope of what to fetch - scope: "openid read-maillist" - # Grant type - grant_type: client_credentials - - # Configuration for the CSV source - reads a CSV file - # and creates **new** users in Famedly's Zitadel. - # Expected structure of the CSV file is as follows: - # email,first_name,last_name,phone - csv: - # Path to the CSV file to read from. - file_path: ./tests/environment/files/test-users.csv diff --git a/sample-configs/ukt-config.sample.yaml b/sample-configs/ukt-config.sample.yaml new file mode 100644 index 0000000..64458ca --- /dev/null +++ b/sample-configs/ukt-config.sample.yaml @@ -0,0 +1,37 @@ +# Configuration for Famedly's Zitadel - has to be provided by Famedly +zitadel: + # The Famedly user endpoint to sync to. + url: https://auth.famedly.de + # The Famedly-provided service user credentials. + key_file: /opt/famedly-sync-agent/service-user.json + # The organization whose users to sync. + organization_id: 278274756195721220 + # The project to grant users access to. + project_id: 278274945274880004 + # The identity provider ID to enable SSO login for + idp_id: 281430143275106308 + +feature_flags: + - verify_email # Whether to ask users to verify their email addresses post sync + - verify_phone # Whether to ask users to verify their phone numbers post sync + # - sso_login # Whether to enable SSO login - Please note that his has some drawbacks and limitations, see the help center article for more information + # - dry_run # Disable syncing users to Zitadel - Intended to ensure syncs are working before productive deployment + # - deactivate_only # Only deactivate users, do not create or update them. + +# Configuration for the sources to sync from. +sources: + # Configuration for the UKT source - a custom endpoint provided by UKT, + # which gives a list of emails of users that should be deleted from Zitadel. + ukt: + # Endpoint URL to fetch the list of users from. + endpoint_url: https://list.example.invalid/usersync4chat/maillist + # OAuth2 URL to fetch the token from. + oauth2_url: https://list.example.invalid/token + # Client ID + client_id: mock_client_id + # Client Secret + client_secret: mock_client_secret + # Scope of what to fetch + scope: "openid read-maillist" + # Grant type + grant_type: client_credentials diff --git a/src/config.rs b/src/config.rs index a1f7737..ccbe822 100644 --- a/src/config.rs +++ b/src/config.rs @@ -6,18 +6,10 @@ use std::{ use anyhow::{bail, Result}; use serde::Deserialize; -use tracing::{error, warn}; use url::Url; -use crate::{ - sources::{ - csv::{CsvSource, CsvSourceConfig}, - ldap::{LdapSource, LdapSourceConfig}, - ukt::{UktSource, UktSourceConfig}, - Source, - }, - zitadel::{Zitadel, ZitadelConfig}, -}; +pub use crate::sources::{csv::CsvSourceConfig, ldap::LdapSourceConfig, ukt::UktSourceConfig}; +use crate::zitadel::ZitadelConfig; /// App prefix for env var configuration const ENV_VAR_CONFIG_PREFIX: &str = "FAMEDLY_SYNC"; @@ -76,62 +68,6 @@ impl Config { Ok(self) } - - /// Perform a sync operation - pub async fn perform_sync(&self) -> Result<()> { - if !self.feature_flags.is_enabled(FeatureFlag::SsoLogin) { - anyhow::bail!("Non-SSO configuration is currently not supported"); - } - - let mut sources: Vec> = Vec::new(); - - if let Some(ldap_config) = &self.sources.ldap { - let ldap = LdapSource::new( - ldap_config.clone(), - self.feature_flags.is_enabled(FeatureFlag::DryRun), - ); - sources.push(Box::new(ldap)); - } - - if let Some(ukt_config) = &self.sources.ukt { - let ukt = UktSource::new(ukt_config.clone()); - sources.push(Box::new(ukt)); - } - - if let Some(csv_config) = &self.sources.csv { - let csv = CsvSource::new(csv_config.clone()); - sources.push(Box::new(csv)); - } - - // Setup Zitadel client - let zitadel = Zitadel::new(self).await?; - - // Sync from each available source - for source in sources.iter() { - let diff = match source.get_diff().await { - Ok(diff) => diff, - Err(e) => { - error!("Failed to get diff from {}: {:?}", source.get_name(), e); - continue; - } - }; - - if !self.feature_flags.is_enabled(FeatureFlag::DeactivateOnly) { - if let Err(e) = zitadel.import_new_users(diff.new_users).await { - warn!("Failed to import new users from {}: {:?}", source.get_name(), e); - } - if let Err(e) = zitadel.delete_users_by_id(diff.deleted_user_ids).await { - warn!("Failed to delete users from {}: {:?}", source.get_name(), e); - } - } - - if let Err(e) = zitadel.update_users(diff.changed_users).await { - warn!("Failed to update users from {}: {:?}", source.get_name(), e); - } - } - - Ok(()) - } } /// Opt-in features @@ -301,14 +237,17 @@ mod tests { #[tokio::test] async fn test_sample_config() { - let config = Config::new(Path::new("./config.sample.yaml")); - + let config = Config::new(Path::new("./sample-configs/csv-config.sample.yaml")); + assert!(config.is_ok(), "Invalid config: {:?}", config); + let config = Config::new(Path::new("./sample-configs/ldap-config.sample.yaml")); + assert!(config.is_ok(), "Invalid config: {:?}", config); + let config = Config::new(Path::new("./sample-configs/ukt-config.sample.yaml")); assert!(config.is_ok(), "Invalid config: {:?}", config); } #[test] fn test_config_from_file() { - let tempdir = TempDir::new().expect("failed to initialize cache dir"); + let tempdir = TempDir::new().expect("failed to initialize tempdir"); let file_path = create_config_file(tempdir.path()); let config = Config::new(file_path.as_path()).expect("Failed to create config object"); @@ -317,7 +256,7 @@ mod tests { #[test] fn test_config_env_var_override() { - let tempdir = TempDir::new().expect("failed to initialize cache dir"); + let tempdir = TempDir::new().expect("failed to initialize tempdir"); let file_path = create_config_file(tempdir.path()); let env_var_name = format!("{ENV_VAR_CONFIG_PREFIX}__FEATURE_FLAGS"); @@ -354,7 +293,7 @@ mod tests { #[test] fn test_config_env_var_feature_flag() { - let tempdir = TempDir::new().expect("failed to initialize cache dir"); + let tempdir = TempDir::new().expect("failed to initialize tempdir"); let file_path = create_config_file(tempdir.path()); let env_var_name = format!("{ENV_VAR_CONFIG_PREFIX}__FEATURE_FLAGS"); diff --git a/src/lib.rs b/src/lib.rs index 9ded581..939b603 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,12 +1,254 @@ //! Sync tool between other sources and our infrastructure based on Zitadel. +use anyhow::{Context, Result}; +use futures::{Stream, StreamExt}; +use user::User; +use zitadel::Zitadel; mod config; mod sources; mod user; mod zitadel; -pub use config::{Config, FeatureFlag}; +use std::collections::VecDeque; + +pub use config::{Config, FeatureFlag, LdapSourceConfig}; pub use sources::{ csv::test_helpers as csv_test_helpers, ldap::AttributeMapping, ukt::test_helpers as ukt_test_helpers, }; +use sources::{csv::CsvSource, ldap::LdapSource, ukt::UktSource, Source}; + +/// Helper function to add metadata to streamed zitadel users +// TODO: If async closures become a reality, this should be factored +// into the `zitadel::search_result_to_user` function +async fn get_next_zitadel_user( + stream: &mut (impl Stream> + Send + Unpin), + zitadel: &mut Zitadel, +) -> Result> { + match stream.next().await.transpose()? { + Some(mut zitadel_user) => { + let preferred_username = zitadel + .zitadel_client + .get_user_metadata(&zitadel_user.1, "preferred_username") + .await + .ok() + .and_then(|metadata| metadata.metadata().value()); + + zitadel_user.0.preferred_username = preferred_username; + + Ok(Some(zitadel_user)) + } + None => Ok(None), + } +} + +/// Perform a sync operation +pub async fn perform_sync(config: &Config) -> Result<()> { + /// Get users from a source + async fn get_users_from_source(source: impl Source + Send) -> Result> { + source + .get_sorted_users() + .await + .map(VecDeque::from) + .context(format!("Failed to query users from {}", source.get_name())) + } + + let csv = config.sources.csv.clone().map(CsvSource::new); + let ldap = config.sources.ldap.clone().map(LdapSource::new); + let ukt = config.sources.ukt.clone().map(UktSource::new); + + // The ukt source is handled specially, since it doesn't behave as + // the others + if let Some(ukt) = ukt { + match ukt.get_removed_user_emails().await { + Ok(users) => delete_users_by_email(config, users).await?, + Err(err) => { + anyhow::bail!("Failed to query users from ukt: {:?}", err); + } + } + + return Ok(()); + } + + let mut users = match (csv, ldap, ukt) { + (Some(csv), None, None) => get_users_from_source(csv).await?, + (None, Some(ldap), None) => get_users_from_source(ldap).await?, + (None, None, Some(_)) => VecDeque::new(), + _ => { + anyhow::bail!("Exactly one source must be defined"); + } + }; + + if config.feature_flags.is_enabled(FeatureFlag::DeactivateOnly) { + disable_users(config, &mut users).await?; + } else { + sync_users(config, &mut users).await?; + } + + Ok(()) +} + +/// Delete a list of users given their email addresses +async fn delete_users_by_email(config: &Config, emails: Vec) -> Result<()> { + let mut zitadel = Zitadel::new(config).await?; + let mut stream = zitadel.get_users_by_email(emails)?; + + while let Some(zitadel_user) = get_next_zitadel_user(&mut stream, &mut zitadel).await? { + zitadel.delete_user(&zitadel_user.1).await?; + } + + Ok(()) +} + +/// Only disable users +async fn disable_users(config: &Config, users: &mut VecDeque) -> Result<()> { + // We only care about disabled users for this flow + users.retain(|user| !user.enabled); + + let mut zitadel = Zitadel::new(config).await?; + let mut stream = zitadel.list_users()?; + + while let Some(zitadel_user) = get_next_zitadel_user(&mut stream, &mut zitadel).await? { + if users.front().map(|user| user.external_user_id.clone()) + == Some(zitadel_user.0.external_user_id) + { + zitadel.delete_user(&zitadel_user.1).await?; + users.pop_front(); + } + } + + Ok(()) +} + +/// Fully sync users +async fn sync_users(config: &Config, sync_users: &mut VecDeque) -> Result<()> { + // Treat any disabled users as deleted, so we simply pretend they + // are not in the list + sync_users.retain(|user| user.enabled); + + let mut zitadel = Zitadel::new(config).await?; + let mut stream = zitadel.list_users()?; + + let mut source_user = sync_users.pop_front(); + let mut zitadel_user = get_next_zitadel_user(&mut stream, &mut zitadel).await?; + + loop { + tracing::debug!("Comparing users {:?} and {:?}", source_user, zitadel_user); + + match (source_user.clone(), zitadel_user.clone()) { + (None, None) => { + tracing::info!("Sync completed successfully"); + break; + } + + // Excess Zitadel users are not present in the sync + // source, so we delete them + (None, Some((_, zitadel_id))) => { + let res = zitadel.delete_user(&zitadel_id).await; + if let Err(error) = res { + tracing::error!( + "Failed to delete user with Zitadel ID `{}`: {}", + zitadel_id, + error + ); + } + + zitadel_user = get_next_zitadel_user(&mut stream, &mut zitadel).await?; + } + + // Excess sync source users are not yet in Zitadel, so + // we import them + (Some(new_user), None) => { + let res = zitadel.import_user(&new_user).await; + if let Err(error) = res { + tracing::error!( + "Failed to import user `{}`: {}", + new_user.external_user_id, + error + ); + } + + source_user = sync_users.pop_front(); + } + + // If the sync source user matches the Zitadel user, the + // user is already synced and we can move on + (Some(new_user), Some((existing_user, _))) if new_user == existing_user => { + zitadel_user = get_next_zitadel_user(&mut stream, &mut zitadel).await?; + source_user = sync_users.pop_front(); + } + + // If the user ID of the user to be synced to Zitadel is < + // the user ID of the current Zitadel user, we found a new + // user which we should be importing + (Some(new_user), Some((existing_user, _))) + if new_user.external_user_id < existing_user.external_user_id => + { + let res = zitadel.import_user(&new_user).await; + if let Err(error) = res { + tracing::error!( + "Failed to import user `{}`: {}", + new_user.external_user_id, + error + ); + } + + source_user = sync_users.pop_front(); + // Don't fetch the next zitadel user yet + } + + // If the user ID of the user to be synced to Zitadel is > + // the user ID of the current Zitadel user, the Zitadel + // user needs to be deleted + (Some(new_user), Some((existing_user, zitadel_id))) + if new_user.external_user_id > existing_user.external_user_id => + { + let res = zitadel.delete_user(&zitadel_id).await; + if let Err(error) = res { + tracing::error!( + "Failed to delete user with Zitadel ID `{}`: {}", + zitadel_id, + error + ); + } + + zitadel_user = get_next_zitadel_user(&mut stream, &mut zitadel).await?; + // Don't move to the next source user yet + } + + // If the users don't match (since we've failed the former + // checks), but the user IDs are the same, the user has + // been updated + (Some(new_user), Some((existing_user, zitadel_id))) + if new_user.external_user_id == existing_user.external_user_id => + { + let res = zitadel.update_user(&zitadel_id, &existing_user, &new_user).await; + if let Err(error) = res { + tracing::error!( + "Failed to update user `{}`: {}", + new_user.external_user_id, + error + ); + } + + zitadel_user = get_next_zitadel_user(&mut stream, &mut zitadel).await?; + source_user = sync_users.pop_front(); + } + + // Since the user IDs form a partial order, they must be + // either equal, less than, or greater than, one another. + // + // Since all other possible conditions are checked in the + // first case, this particular case is unreachable. + (Some(new_user), Some((existing_user, _))) => { + tracing::error!( + "Unreachable condition met for users `{}` and `{}`", + new_user.external_user_id, + existing_user.external_user_id + ); + } + } + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index f748f10..bb90779 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,7 @@ use std::{path::Path, process::ExitCode, str::FromStr}; use anyhow::{Context, Result}; -use famedly_sync::Config; +use famedly_sync::{perform_sync, Config}; use tracing::level_filters::LevelFilter; #[tokio::main] @@ -44,5 +44,5 @@ async fn run_sync() -> Result<()> { tracing::subscriber::set_global_default(subscriber) .context("Setting default tracing subscriber failed")?; - config.perform_sync().await + perform_sync(&config).await } diff --git a/src/sources.rs b/src/sources.rs index 048825e..32c5b87 100644 --- a/src/sources.rs +++ b/src/sources.rs @@ -3,18 +3,29 @@ use anyhow::Result; use async_trait::async_trait; -use crate::zitadel::SourceDiff; - pub mod csv; pub mod ldap; pub mod ukt; +use crate::user::User; + /// A source of data we want to sync from. #[async_trait] pub trait Source { /// Get source name for debugging. fn get_name(&self) -> &'static str; - /// Get changes from the source. - async fn get_diff(&self) -> Result; + /// Get a stream of the sources' users, sorted by external user ID + // Ideally we would return a `Stream` here, as this would allow us + // to cut down significantly on memory use, however none of our + // sources currently support returning results sorted, so we would + // need to buffer the results to sort them anyway. + // + // In addition, `async_trait` does not currently support returning + // `impl` traits, making that technically infeasible with Rust. + // + // TODO: If we do get sources which *do* support sorting, and Rust + // gains this feature, we should probably switch to a stream here, + // though (and update existing sources to return sorted streams). + async fn get_sorted_users(&self) -> Result>; } diff --git a/src/sources/csv.rs b/src/sources/csv.rs index ae8a694..356da04 100644 --- a/src/sources/csv.rs +++ b/src/sources/csv.rs @@ -8,7 +8,7 @@ use csv::Reader; use serde::Deserialize; use super::Source; -use crate::{user::User, zitadel::SourceDiff}; +use crate::user::User; /// CSV Source pub struct CsvSource { @@ -22,12 +22,10 @@ impl Source for CsvSource { "CSV" } - async fn get_diff(&self) -> Result { - let new_users = self.read_csv()?; - // TODO: Implement changed and deleted users - // Holding off on this until we get rid of the cache concept - // https://github.com/famedly/famedly-sync/issues/53 - return Ok(SourceDiff { new_users, changed_users: vec![], deleted_user_ids: vec![] }); + async fn get_sorted_users(&self) -> Result> { + let mut new_users = self.read_csv()?; + new_users.sort_by(|a, b| a.external_user_id.cmp(&b.external_user_id)); + return Ok(new_users); } } @@ -76,12 +74,12 @@ impl CsvData { /// Convert CsvData to User data fn to_user(csv_data: CsvData) -> User { User { - email: csv_data.email.clone().into(), - first_name: csv_data.first_name.into(), - last_name: csv_data.last_name.into(), - phone: if csv_data.phone.is_empty() { None } else { Some(csv_data.phone.into()) }, - preferred_username: csv_data.email.clone().into(), - external_user_id: csv_data.email.into(), + email: csv_data.email.clone(), + first_name: csv_data.first_name, + last_name: csv_data.last_name, + phone: if csv_data.phone.is_empty() { None } else { Some(csv_data.phone) }, + preferred_username: Some(csv_data.email.clone()), + external_user_id: hex::encode(csv_data.email), enabled: true, } } @@ -116,7 +114,7 @@ mod tests { use indoc::indoc; use super::*; - use crate::{user::StringOrBytes, Config}; + use crate::Config; const EXAMPLE_CONFIG: &str = indoc! {r#" zitadel: @@ -157,27 +155,11 @@ mod tests { let users = result.expect("Failed to get users"); assert_eq!(users.len(), 4, "Unexpected number of users"); - assert_eq!( - users[0].first_name, - StringOrBytes::String("John".to_owned()), - "Unexpected first name at index 0" - ); - assert_eq!( - users[0].email, - StringOrBytes::String("john.doe@example.com".to_owned()), - "Unexpected email at index 0" - ); - assert_eq!( - users[3].last_name, - StringOrBytes::String("Williams".to_owned()), - "Unexpected last name at index 3" - ); + assert_eq!(users[0].first_name, "John", "Unexpected first name at index 0"); + assert_eq!(users[0].email, "john.doe@example.com", "Unexpected email at index 0"); + assert_eq!(users[3].last_name, "Williams", "Unexpected last name at index 3"); assert_eq!(users[2].phone, None, "Unexpected phone at index 2"); - assert_eq!( - users[3].phone, - Some(StringOrBytes::String("+4444444444".to_owned())), - "Unexpected phone at index 3" - ); + assert_eq!(users[3].phone, Some("+4444444444".to_owned()), "Unexpected phone at index 3"); } #[test] @@ -252,15 +234,7 @@ mod tests { let users = result.expect("Failed to get users"); assert_eq!(users.len(), 1, "Unexpected number of users"); - assert_eq!( - users[0].email, - StringOrBytes::String("jane.smith@example.com".to_owned()), - "Unexpected email at index 0" - ); - assert_eq!( - users[0].last_name, - StringOrBytes::String("Smith".to_owned()), - "Unexpected last name at index 0" - ); + assert_eq!(users[0].email, "jane.smith@example.com", "Unexpected email at index 0"); + assert_eq!(users[0].last_name, "Smith", "Unexpected last name at index 0"); } } diff --git a/src/sources/ldap.rs b/src/sources/ldap.rs index f085fdd..9016178 100644 --- a/src/sources/ldap.rs +++ b/src/sources/ldap.rs @@ -1,14 +1,11 @@ //! LDAP source for syncing with Famedly's Zitadel. -use std::{ - fmt::Display, - path::{Path, PathBuf}, -}; +use std::{fmt::Display, path::PathBuf}; use anyhow::{anyhow, bail, Context, Result}; use async_trait::async_trait; use ldap_poller::{ - config::TLSConfig, ldap::EntryStatus, ldap3::SearchEntry, AttributeConfig, Cache, CacheMethod, + config::TLSConfig, ldap::EntryStatus, ldap3::SearchEntry, AttributeConfig, CacheMethod, ConnectionConfig, Ldap, SearchEntryExt, Searches, }; use serde::Deserialize; @@ -17,17 +14,12 @@ use tokio_stream::{wrappers::ReceiverStream, StreamExt}; use url::Url; use super::Source; -use crate::{ - user::{StringOrBytes, User}, - zitadel::{ChangedUser, SourceDiff, UserId}, -}; +use crate::user::User; /// LDAP sync source pub struct LdapSource { /// LDAP configuration ldap_config: LdapSourceConfig, - /// Dry run flag (prevents writing cache) - is_dry_run: bool, } #[async_trait] @@ -36,74 +28,44 @@ impl Source for LdapSource { "LDAP" } - async fn get_diff(&self) -> Result { - let cache = read_cache(&self.ldap_config.cache_path).await?; - let (mut ldap_client, ldap_receiver) = Ldap::new(self.ldap_config.clone().into(), cache); - - let is_dry_run = self.is_dry_run; - let cache_path = self.ldap_config.cache_path.clone(); + async fn get_sorted_users(&self) -> Result> { + let (mut ldap_client, ldap_receiver) = Ldap::new(self.ldap_config.clone().into(), None); let sync_handle: tokio::task::JoinHandle> = tokio::spawn(async move { ldap_client.sync_once(None).await.context("failed to sync/fetch data from LDAP")?; - - if is_dry_run { - tracing::warn!("Not writing ldap cache during a dry run"); - } else { - let cache = ldap_client.persist_cache().await; - tokio::fs::write( - &cache_path, - bincode::serialize(&cache).context("failed to serialize cache")?, - ) - .await - .context("failed to write cache")?; - } - tracing::info!("Finished syncing LDAP data"); - Ok(()) }); - let (added, changed, removed) = self.get_user_changes(ldap_receiver).await?; - + let mut added = self.get_user_changes(ldap_receiver).await?; sync_handle.await??; - Ok(SourceDiff { - new_users: added, - changed_users: changed.into_iter().map(|(old, new)| ChangedUser { old, new }).collect(), - deleted_user_ids: removed, - }) + // TODO: Find out if we can use the AD extension for receiving sorted data + added.sort_by(|a, b| a.external_user_id.cmp(&b.external_user_id)); + + Ok(added) } } impl LdapSource { /// Create a new LDAP source - pub fn new(ldap_config: LdapSourceConfig, is_dry_run: bool) -> Self { - Self { ldap_config, is_dry_run } + pub fn new(ldap_config: LdapSourceConfig) -> Self { + Self { ldap_config } } /// Get user changes from an ldap receiver pub async fn get_user_changes( &self, ldap_receiver: Receiver, - ) -> Result<(Vec, Vec<(User, User)>, Vec)> { + ) -> Result> { ReceiverStream::new(ldap_receiver) - .fold(Ok((vec![], vec![], vec![])), |acc, entry_status| { - let (mut added, mut changed, mut removed) = acc?; - match entry_status { - EntryStatus::New(entry) => { - tracing::debug!("New entry: {:?}", entry); - added.push(self.parse_user(entry)?); - } - EntryStatus::Changed { old, new } => { - tracing::debug!("Changes found for {:?} -> {:?}", old, new); - changed.push((self.parse_user(old)?, self.parse_user(new)?)); - } - EntryStatus::Removed(entry) => { - tracing::debug!("Deleted user {}", String::from_utf8_lossy(&entry)); - removed.push(UserId::Nick(String::from_utf8(entry.clone())?)); - } + .fold(Ok(vec![]), |acc, entry_status| { + let mut added = acc?; + if let EntryStatus::New(entry) = entry_status { + tracing::debug!("New entry: {:?}", entry); + added.push(self.parse_user(entry)?); }; - Ok((added, changed, removed)) + Ok(added) }) .await } @@ -137,18 +99,29 @@ impl LdapSource { bail!("Binary status without disable_bitmasks"); }; - let first_name = read_search_entry(&entry, &self.ldap_config.attributes.first_name)?; - let last_name = read_search_entry(&entry, &self.ldap_config.attributes.last_name)?; - let preferred_username = - read_search_entry(&entry, &self.ldap_config.attributes.preferred_username)?; - let email = read_search_entry(&entry, &self.ldap_config.attributes.email)?; - let ldap_user_id = read_search_entry(&entry, &self.ldap_config.attributes.user_id)?; - let phone = read_search_entry(&entry, &self.ldap_config.attributes.phone).ok(); + let ldap_user_id = match read_search_entry(&entry, &self.ldap_config.attributes.user_id)? { + // Use hex encoding instead of base64 for consistent alphabetical order + StringOrBytes::Bytes(byte_id) => hex::encode(byte_id), + StringOrBytes::String(string_id) => hex::encode(string_id.as_bytes()), + }; + + let first_name = + read_string_entry(&entry, &self.ldap_config.attributes.first_name, &ldap_user_id)?; + let last_name = + read_string_entry(&entry, &self.ldap_config.attributes.last_name, &ldap_user_id)?; + let preferred_username = read_string_entry( + &entry, + &self.ldap_config.attributes.preferred_username, + &ldap_user_id, + )?; + let email = read_string_entry(&entry, &self.ldap_config.attributes.email, &ldap_user_id)?; + let phone = + read_string_entry(&entry, &self.ldap_config.attributes.phone, &ldap_user_id).ok(); Ok(User { first_name, last_name, - preferred_username, + preferred_username: Some(preferred_username), email, external_user_id: ldap_user_id, phone, @@ -157,44 +130,41 @@ impl LdapSource { } } +/// Read an an attribute, but assert that it is a string +fn read_string_entry( + entry: &SearchEntry, + attribute: &AttributeMapping, + id: &str, +) -> Result { + match read_search_entry(entry, attribute)? { + StringOrBytes::String(entry) => Ok(entry), + StringOrBytes::Bytes(_) => Err(anyhow!( + "Binary values are not accepted: attribute `{}` of user `{}`", + attribute, + id + )), + } +} + /// Read an attribute from the entry fn read_search_entry(entry: &SearchEntry, attribute: &AttributeMapping) -> Result { match attribute { AttributeMapping::OptionalBinary { name, is_binary: false } | AttributeMapping::NoBinaryOption(name) => { - if let Some(attr) = entry.attr_first(name) { - return Ok(StringOrBytes::String(attr.to_owned())); - }; - } - AttributeMapping::OptionalBinary { name, is_binary: true } => { - if let Some(binary_attr) = entry.bin_attr_first(name) { - return Ok(StringOrBytes::Bytes(binary_attr.to_vec())); - }; - - // If attributes encode as valid UTF-8, they will - // not be in the bin_attr list - if let Some(attr) = entry.attr_first(name) { - return Ok(StringOrBytes::Bytes(attr.as_bytes().to_vec())); - }; + entry.attr_first(name).map(|entry| StringOrBytes::String(entry.to_owned())) } + AttributeMapping::OptionalBinary { name, is_binary: true } => entry + .bin_attr_first(name) + // If an entry encodes as UTF-8, it will still only be + // available from the `.attr_first` function, even if ldap + // presents it with the `::` delimiter. + // + // Hence the configuration, we just treat it as binary + // data if this is requested. + .or_else(|| entry.attr_first(name).map(str::as_bytes)) + .map(|entry| StringOrBytes::Bytes(entry.to_vec())), } - - bail!("missing `{}` values for `{}`", attribute, entry.dn) -} - -/// Read the ldap sync cache -pub async fn read_cache(path: &Path) -> Result> { - Ok(match tokio::fs::read(path).await { - Ok(data) => Some(bincode::deserialize(&data).context("cache deserialization failed")?), - Err(err) => { - if err.kind() == std::io::ErrorKind::NotFound { - tracing::info!("LDAP sync cache missing"); - None - } else { - bail!(err) - } - } - }) + .ok_or(anyhow!("missing `{}` values for `{}`", attribute, entry.dn)) } /// LDAP-specific configuration @@ -224,8 +194,6 @@ pub struct LdapSourceConfig { pub use_attribute_filter: bool, /// TLS-related configuration pub tls: Option, - /// Where to cache the last known LDAP state - pub cache_path: PathBuf, } impl From for ldap_poller::Config { @@ -275,7 +243,7 @@ impl From for ldap_poller::Config { attributes.phone.get_name(), ], }, - cache_method: CacheMethod::ModificationTime, + cache_method: CacheMethod::Disabled, check_for_deleted_entries: cfg.check_for_deleted_entries, } } @@ -374,6 +342,15 @@ pub struct LdapTlsConfig { pub danger_use_start_tls: bool, } +/// A structure that can either be a string or bytes +#[derive(Clone, Debug)] +enum StringOrBytes { + /// A string + String(String), + /// A byte string + Bytes(Vec), +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -383,7 +360,7 @@ mod tests { use ldap_poller::ldap::EntryStatus; use tokio::sync::mpsc; - use crate::{sources::ldap::LdapSource, user::StringOrBytes, Config}; + use crate::{sources::ldap::LdapSource, Config}; const EXAMPLE_CONFIG: &str = indoc! {r#" zitadel: @@ -420,7 +397,6 @@ mod tests { server_certificate: ./tests/environment/certs/server.crt danger_disable_tls_verify: false danger_use_start_tls: false - cache_path: ./test feature_flags: [] "#}; @@ -471,8 +447,7 @@ mod tests { async fn test_get_user_changes_new_and_changed() { let (tx, rx) = mpsc::channel(32); let config = load_config(); - let ldap_source = - LdapSource { ldap_config: config.sources.ldap.unwrap(), is_dry_run: false }; + let ldap_source = LdapSource { ldap_config: config.sources.ldap.unwrap() }; let mut user = new_user(); @@ -511,26 +486,15 @@ mod tests { let result = ldap_source.get_user_changes(rx).await; assert!(result.is_ok(), "Failed to get user changes: {:?}", result); - let (added, changed, removed) = result.unwrap(); + let added = result.unwrap(); assert_eq!(added.len(), 1, "Unexpected number of added users"); - assert_eq!(changed.len(), 1, "Unexpected number of changed users"); - assert_eq!(removed.len(), 0, "Unexpected number of removed users"); - - // Verify the changes - let changed_user_entry = &changed[0].1; - assert_eq!( - changed_user_entry.email, - StringOrBytes::String("newemail@example.com".to_owned()) - ); - assert_eq!(changed_user_entry.phone, Some(StringOrBytes::String("987654321".to_owned()))); } #[tokio::test] async fn test_get_user_changes_removed() { let (tx, rx) = mpsc::channel(32); let config = load_config(); - let ldap_source = - LdapSource { ldap_config: config.sources.ldap.unwrap(), is_dry_run: false }; + let ldap_source = LdapSource { ldap_config: config.sources.ldap.unwrap() }; let user = new_user(); @@ -552,17 +516,14 @@ mod tests { let result = ldap_source.get_user_changes(rx).await; assert!(result.is_ok(), "Failed to get user changes: {:?}", result); - let (added, changed, removed) = result.unwrap(); + let added = result.unwrap(); assert_eq!(added.len(), 1, "Unexpected number of added users"); - assert_eq!(changed.len(), 0, "Unexpected number of changed users"); - assert_eq!(removed.len(), 1, "Unexpected number of removed users"); } #[tokio::test] async fn test_parse_user() { let config = load_config(); - let ldap_source = - LdapSource { ldap_config: config.sources.ldap.unwrap(), is_dry_run: false }; + let ldap_source = LdapSource { ldap_config: config.sources.ldap.unwrap() }; let entry = SearchEntry { dn: "uid=testuser,ou=testorg,dc=example,dc=org".to_owned(), @@ -573,13 +534,13 @@ mod tests { let result = ldap_source.parse_user(entry); assert!(result.is_ok(), "Failed to parse user: {:?}", result); let user = result.unwrap(); - assert_eq!(user.first_name, StringOrBytes::String("Test".to_owned())); - assert_eq!(user.last_name, StringOrBytes::String("User".to_owned())); - assert_eq!(user.preferred_username, StringOrBytes::String("testuser".to_owned())); - assert_eq!(user.email, StringOrBytes::String("testuser@example.com".to_owned())); - assert_eq!(user.phone, Some(StringOrBytes::String("123456789".to_owned()))); - assert_eq!(user.preferred_username, StringOrBytes::String("testuser".to_owned())); - assert_eq!(user.external_user_id, StringOrBytes::String("testuser".to_owned())); + assert_eq!(user.first_name, "Test"); + assert_eq!(user.last_name, "User"); + assert_eq!(user.preferred_username, Some("testuser".to_owned())); + assert_eq!(user.email, "testuser@example.com"); + assert_eq!(user.phone, Some("123456789".to_owned())); + assert_eq!(user.preferred_username, Some("testuser".to_owned())); + assert_eq!(user.external_user_id, hex::encode("testuser")); assert!(user.enabled); } @@ -588,8 +549,7 @@ mod tests { let mut config = load_config(); config.sources.ldap.as_mut().unwrap().attributes.disable_bitmasks = serde_yaml::from_str("[0]").expect("invalid config fragment"); - let ldap_source = - LdapSource { ldap_config: config.sources.ldap.unwrap(), is_dry_run: false }; + let ldap_source = LdapSource { ldap_config: config.sources.ldap.unwrap() }; for (attr, parsed) in [("TRUE", true), ("FALSE", false)] { let entry = SearchEntry { diff --git a/src/sources/ukt.rs b/src/sources/ukt.rs index 79b4c01..31acd25 100644 --- a/src/sources/ukt.rs +++ b/src/sources/ukt.rs @@ -3,15 +3,11 @@ use std::collections::HashMap; use anyhow::{Context, Result}; -use async_trait::async_trait; use chrono::Utc; use reqwest::Client; use serde::Deserialize; use url::Url; -use super::Source; -use crate::zitadel::{SourceDiff, UserId}; - /// UKT Source pub struct UktSource { /// UKT Source configuration @@ -20,19 +16,6 @@ pub struct UktSource { client: Client, } -#[async_trait] -impl Source for UktSource { - fn get_name(&self) -> &'static str { - "UKT" - } - - async fn get_diff(&self) -> Result { - let deleted_user_emails = self.get_removed_user_emails().await?; - let deleted_user_ids = deleted_user_emails.into_iter().map(UserId::Login).collect(); - return Ok(SourceDiff { new_users: vec![], changed_users: vec![], deleted_user_ids }); - } -} - impl UktSource { /// Create a new UKT source pub fn new(ukt_config: UktSourceConfig) -> Self { diff --git a/src/user.rs b/src/user.rs index 9a255d8..19d2abd 100644 --- a/src/user.rs +++ b/src/user.rs @@ -1,148 +1,107 @@ //! User data helpers -use std::fmt::Display; +use anyhow::{anyhow, Context, Result}; +use uuid::{uuid, Uuid}; +use zitadel_rust_client::v2::users::HumanUser; -use base64::prelude::{Engine, BASE64_STANDARD}; -use zitadel_rust_client::v1::{Email, Gender, Idp, ImportHumanUserRequest, Phone, Profile}; - -use crate::{config::FeatureFlags, FeatureFlag}; +/// The Famedly UUID namespace to use to generate v5 UUIDs. +const FAMEDLY_NAMESPACE: Uuid = uuid!("d9979cff-abee-4666-bc88-1ec45a843fb8"); /// Source-agnostic representation of a user -#[derive(Clone, Debug)] +#[derive(Clone)] pub(crate) struct User { /// The user's first name - pub(crate) first_name: StringOrBytes, + pub(crate) first_name: String, /// The user's last name - pub(crate) last_name: StringOrBytes, + pub(crate) last_name: String, /// The user's email address - pub(crate) email: StringOrBytes, + pub(crate) email: String, /// The user's phone number - pub(crate) phone: Option, + pub(crate) phone: Option, /// Whether the user is enabled pub(crate) enabled: bool, /// The user's preferred username - pub(crate) preferred_username: StringOrBytes, - /// The user's LDAP ID - pub(crate) external_user_id: StringOrBytes, + pub(crate) preferred_username: Option, + /// The user's external (non-Zitadel) ID + pub(crate) external_user_id: String, } impl User { - /// Convert the agnostic user to a Zitadel user - pub fn to_zitadel_user(&self, feature_flags: &FeatureFlags, idp_id: &str) -> ZitadelUser { - ZitadelUser { - user_data: self.clone(), - needs_email_verification: feature_flags.is_enabled(FeatureFlag::VerifyEmail), - needs_phone_verification: feature_flags.is_enabled(FeatureFlag::VerifyPhone), - idp_id: feature_flags.contains(&FeatureFlag::SsoLogin).then(|| idp_id.to_owned()), - } - } -} + /// Convert a Zitadel user to our internal representation + pub fn try_from_zitadel_user(user: HumanUser, external_id: String) -> Result { + let first_name = user + .profile() + .and_then(|profile| profile.given_name()) + .ok_or(anyhow!("Missing first name for {}", external_id))? + .clone(); -/// Crate-internal representation of a Zitadel user -#[derive(Clone, Debug)] -pub struct ZitadelUser { - /// Details about the user - pub(crate) user_data: User, + let last_name = user + .profile() + .and_then(|profile| profile.family_name()) + .ok_or(anyhow!("Missing last name for {}", external_id))? + .clone(); - /// Whether the user should be prompted to verify their email - pub(crate) needs_email_verification: bool, - /// Whether the user should be prompted to verify their phone number - pub(crate) needs_phone_verification: bool, - /// The ID of the identity provider to link with, if any - pub(crate) idp_id: Option, -} + let email = user + .email() + .and_then(|human_email| human_email.email()) + .ok_or(anyhow!("Missing email address for {}", external_id))? + .clone(); -impl ZitadelUser { - /// Get a display name for the user - pub(crate) fn get_display_name(&self) -> String { - format!("{}, {}", self.user_data.last_name, self.user_data.first_name) - } + let phone = user.phone().and_then(|human_phone| human_phone.phone()); - /// Return the name to be used in logs to identify this user - pub(crate) fn log_name(&self) -> String { - format!("email={}", &self.user_data.email) + Ok(Self { + first_name, + last_name, + email, + phone: phone.cloned(), + preferred_username: None, + external_user_id: external_id, + enabled: true, + }) } - /// Get idp link as required by Zitadel - fn get_idps(&self) -> Vec { - if let Some(idp_id) = self.idp_id.clone() { - vec![Idp { - config_id: idp_id, - external_user_id: self.user_data.external_user_id.clone().to_string(), - display_name: self.get_display_name(), - }] - } else { - vec![] - } + /// Get a display name for this user + pub fn get_display_name(&self) -> String { + format!("{}, {}", self.last_name, self.first_name) } -} -impl From for ImportHumanUserRequest { - fn from(user: ZitadelUser) -> Self { - Self { - user_name: user.user_data.email.clone().to_string(), - profile: Some(Profile { - first_name: user.user_data.first_name.clone().to_string(), - last_name: user.user_data.last_name.clone().to_string(), - display_name: user.get_display_name(), - gender: Gender::Unspecified.into(), // 0 means "unspecified", - nick_name: user.user_data.external_user_id.clone().to_string(), - preferred_language: String::default(), - }), - email: Some(Email { - email: user.user_data.email.clone().to_string(), - is_email_verified: !user.needs_email_verification, - }), - phone: user.user_data.phone.as_ref().map(|phone| Phone { - phone: phone.to_owned().to_string(), - is_phone_verified: !user.needs_phone_verification, - }), - password: String::default(), - hashed_password: None, - password_change_required: false, - request_passwordless_registration: false, - otp_code: String::default(), - idps: user.get_idps(), - } + /// Get the external user ID in raw byte form + pub fn get_external_id_bytes(&self) -> Result> { + // This looks ugly at a glance, since we get the original + // bytes at some point, however some users will be retrieved + // from Zitadel at a later point, so we cannot assume that we + // know the original bytes, and must always decode the + // external user ID to get those. + hex::decode(&self.external_user_id).context("Invalid external user ID") } -} -impl Display for ZitadelUser { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "email={}", &self.user_data.email) + /// Get the famedly UUID of this user + pub fn get_famedly_uuid(&self) -> Result { + Ok(Uuid::new_v5(&FAMEDLY_NAMESPACE, self.get_external_id_bytes()?.as_slice()).to_string()) } } -/// A structure that can either be a string or bytes -#[derive(Clone, Debug)] -pub(crate) enum StringOrBytes { - /// A string - String(String), - /// A byte string - Bytes(Vec), -} - -impl PartialEq for StringOrBytes { +impl PartialEq for User { fn eq(&self, other: &Self) -> bool { - match (self, other) { - (Self::String(s), Self::String(o)) => s == o, - (Self::String(s), Self::Bytes(o)) => s.as_bytes() == o, - (Self::Bytes(s), Self::String(o)) => s == o.as_bytes(), - (Self::Bytes(s), Self::Bytes(o)) => s == o, - } + self.first_name == other.first_name + && self.last_name == other.last_name + && self.email == other.email + && self.phone == other.phone + && self.enabled == other.enabled + && self.preferred_username == other.preferred_username + && self.external_user_id == other.external_user_id } } -impl Display for StringOrBytes { +impl std::fmt::Debug for User { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - StringOrBytes::String(value) => write!(f, "{}", value), - StringOrBytes::Bytes(value) => write!(f, "{}", BASE64_STANDARD.encode(value)), - } - } -} - -impl From for StringOrBytes { - fn from(value: String) -> Self { - Self::String(value) + f.debug_struct("User") + .field("first_name", &"***") + .field("last_name", &"***") + .field("email", &"***") + .field("phone", &"***") + .field("preferred_username", &"***") + .field("external_user_id", &self.external_user_id) + .field("enabled", &self.enabled) + .finish() } } diff --git a/src/zitadel.rs b/src/zitadel.rs index a7d2406..7bd64e1 100644 --- a/src/zitadel.rs +++ b/src/zitadel.rs @@ -1,24 +1,29 @@ //! Helper functions for submitting data to Zitadel use std::path::PathBuf; -use anyhow::{bail, Context, Result}; +use anyhow::{anyhow, Context, Result}; +use base64::prelude::{Engine, BASE64_STANDARD}; +use futures::{Stream, StreamExt}; use serde::Deserialize; use url::Url; -use uuid::{uuid, Uuid}; -use zitadel_rust_client::v1::{ - error::{Error as ZitadelError, TonicErrorCode}, - Zitadel as ZitadelClient, +use zitadel_rust_client::{ + v1::Zitadel as ZitadelClientV1, + v2::{ + users::{ + AddHumanUserRequest, IdpLink, InUserEmailsQuery, ListUsersRequest, Organization, + SearchQuery, SetHumanEmail, SetHumanPhone, SetHumanProfile, SetMetadataEntry, + TypeQuery, UpdateHumanUserRequest, User as ZitadelUser, UserFieldName, Userv2Type, + }, + Zitadel as ZitadelClient, + }, }; use crate::{ config::{Config, FeatureFlags}, - user::{StringOrBytes, User, ZitadelUser}, + user::User, FeatureFlag, }; -/// The Famedly UUID namespace to use to generate v5 UUIDs. -const FAMEDLY_NAMESPACE: Uuid = uuid!("d9979cff-abee-4666-bc88-1ec45a843fb8"); - /// The Zitadel project role to assign to users. const FAMEDLY_USER_ROLE: &str = "User"; @@ -30,7 +35,10 @@ pub(crate) struct Zitadel { /// Optional set of features feature_flags: FeatureFlags, /// The backing Zitadel zitadel_client - zitadel_client: ZitadelClient, + pub zitadel_client: ZitadelClient, + /// The backing Ztiadel client, but for v1 API requests - some are + /// still required since the v2 API doesn't cover everything + zitadel_client_v1: ZitadelClientV1, } impl Zitadel { @@ -41,450 +49,283 @@ impl Zitadel { .await .context("failed to configure zitadel_client")?; + let zitadel_client_v1 = + ZitadelClientV1::new(config.zitadel.url.clone(), config.zitadel.key_file.clone()) + .await + .context("failed to configure zitadel_client_v1")?; + Ok(Self { zitadel_config: config.zitadel.clone(), feature_flags: config.feature_flags.clone(), zitadel_client, + zitadel_client_v1, }) } - /// Import a list of new users into Zitadel - pub(crate) async fn import_new_users(&self, users: Vec) -> Result<()> { - for user in users { - let zitadel_user = - user.to_zitadel_user(&self.feature_flags, &self.zitadel_config.idp_id); - let status = self.import_user(&zitadel_user).await; - - if let Err(error) = status { - tracing::error!( - "Failed to sync-import user `{}`: {:?}", - zitadel_user.log_name(), - error - ); - - if Self::is_invalid_phone_error(error) { - let zitadel_user = ZitadelUser { - user_data: User { phone: None, ..zitadel_user.user_data }, - ..zitadel_user - }; - - let retry_status = self.import_user(&zitadel_user).await; - - match retry_status { - Ok(_) => { - tracing::info!( - "Retry sync-import succeeded for user `{}`", - zitadel_user.log_name() - ); - } - Err(retry_error) => { - tracing::error!( - "Retry sync-import failed for user `{}`: {:?}", - zitadel_user.log_name(), - retry_error - ); - } - } - } - } - } - - Ok(()) - } - - /// Delete a list of Zitadel users given their IDs - pub(crate) async fn delete_users_by_id(&self, users: Vec) -> Result<()> { - for user_id in users { - match user_id { - UserId::Login(login) => { - let status = self.delete_user_by_email(&login).await; - if let Err(error) = status { - tracing::error!("Failed to delete user by email `{}`: {:?}", login, error); - } - } - UserId::Nick(nick) => { - let status = self.delete_user_by_nick(&nick).await; - if let Err(error) = status { - tracing::error!("Failed to delete user by nick `{}`: {:?}", nick, error); - } - } - UserId::ZitadelId(id) => { - let status = self.delete_user_by_id(&id).await; - if let Err(error) = status { - tracing::error!("Failed to delete user by id `{}`: {:?}", id, error); - } - } - } - } - - Ok(()) - } - - /// Update a list of old/new user maps - pub(crate) async fn update_users(&self, users: Vec) -> Result<()> { - let disabled: Vec = users - .iter() - .filter(|user| user.old.enabled && !user.new.enabled) - .map(|user| { - user.new.to_zitadel_user(&self.feature_flags, &self.zitadel_config.idp_id).clone() + /// Get a list of users by their email addresses + pub fn get_users_by_email( + &mut self, + emails: Vec, + ) -> Result> + Send> { + self.zitadel_client + .list_users( + ListUsersRequest::new(vec![ + SearchQuery::new().with_type_query(TypeQuery::new(Userv2Type::Human)), + SearchQuery::new().with_in_user_emails_query( + InUserEmailsQuery::new().with_user_emails(emails), + ), + ]) + .with_asc(true) + .with_sorting_column(UserFieldName::NickName), + ) + .map(|stream| { + stream.map(|user| { + let id = user.user_id().ok_or(anyhow!("Missing Zitadel user ID"))?.clone(); + let user = search_result_to_user(user)?; + Ok((user, id)) + }) }) - .collect(); + } - let enabled: Vec = users - .iter() - .filter(|user| !user.old.enabled && user.new.enabled) - .map(|user| { - user.new.to_zitadel_user(&self.feature_flags, &self.zitadel_config.idp_id).clone() - }) - .collect(); - - let changed: Vec<(ZitadelUser, ZitadelUser)> = users - .into_iter() - .filter(|user| user.new.enabled && user.old.enabled == user.new.enabled) - .map(|user| { - ( - user.old - .to_zitadel_user(&self.feature_flags, &self.zitadel_config.idp_id) - .clone(), - user.new - .to_zitadel_user(&self.feature_flags, &self.zitadel_config.idp_id) - .clone(), - ) + /// Return a stream of Zitadel users + pub fn list_users(&mut self) -> Result> + Send> { + self.zitadel_client + .list_users( + ListUsersRequest::new(vec![ + SearchQuery::new().with_type_query(TypeQuery::new(Userv2Type::Human)) + ]) + .with_asc(true) + .with_sorting_column(UserFieldName::NickName), + ) + .map(|stream| { + stream.map(|user| { + let id = user.user_id().ok_or(anyhow!("Missing Zitadel user ID"))?.clone(); + let user = search_result_to_user(user)?; + Ok((user, id)) + }) }) - .collect(); - - for user in disabled { - let status = self.delete_user(&user).await; - - if let Err(error) = status { - tracing::error!("Failed to delete user `{}`: {:?}`", user.log_name(), error); - } - } - - if !self.feature_flags.is_enabled(FeatureFlag::DeactivateOnly) { - for user in enabled { - let status = self.import_user(&user).await; + } - if let Err(error) = status { - tracing::error!("Failed to re-create user `{}`: {:?}", user.log_name(), error); - } - } + /// Delete a Zitadel user + pub async fn delete_user(&mut self, zitadel_id: &str) -> Result<()> { + tracing::info!("Deleting user with Zitadel ID: {}", zitadel_id); - for (old, new) in changed { - let status = self.update_user(&old, &new).await; - - if let Err(error) = status { - tracing::error!("Failed to sync-update user `{}`: {:?}", new.log_name(), error); - - if Self::is_invalid_phone_error(error) { - let new = - ZitadelUser { user_data: User { phone: None, ..new.user_data }, ..new }; - - let retry_status = self.update_user(&old, &new).await; - - match retry_status { - Ok(_) => { - tracing::info!( - "Retry sync-update succeeded for user `{}`", - new.log_name() - ); - } - Err(retry_error) => { - tracing::error!( - "Retry sync-update failed for user `{}`: {:?}", - new.log_name(), - retry_error - ); - } - } - } - } - } + if self.feature_flags.is_enabled(FeatureFlag::DryRun) { + tracing::warn!("Skipping deletion due to dry run"); + return Ok(()); } - Ok(()) + self.zitadel_client.delete_user(zitadel_id).await.map(|_o| ()) } - /// Update a Zitadel user - async fn update_user(&self, old: &ZitadelUser, new: &ZitadelUser) -> Result<()> { + /// Import a user into Zitadel + pub async fn import_user(&mut self, imported_user: &User) -> Result<()> { + tracing::info!("Importing user with external ID: {}", imported_user.external_user_id); + if self.feature_flags.is_enabled(FeatureFlag::DryRun) { - tracing::info!("Not updating user due to dry run: {:?} -> {:?}", old, new); + tracing::warn!("Skipping import due to dry run"); return Ok(()); } - let Some(user_id) = self.get_user_id(old).await? else { - bail!("could not find user `{}` to update", old.user_data.email); + let localpart = if self.feature_flags.contains(&FeatureFlag::PlainLocalpart) { + String::from_utf8(imported_user.get_external_id_bytes()?) + .context(format!("Unsupported binary external ID for user: {:?}", imported_user))? + } else { + imported_user.get_famedly_uuid()? }; - if old.user_data.email != new.user_data.email { - self.zitadel_client - .update_human_user_name( - &self.zitadel_config.organization_id, - user_id.clone(), - new.user_data.email.clone().to_string(), - ) - .await?; + let mut metadata = vec![SetMetadataEntry::new("localpart".to_owned(), localpart)]; + + if let Some(preferred_username) = imported_user.preferred_username.clone() { + metadata + .push(SetMetadataEntry::new("preferred_username".to_owned(), preferred_username)); + } - tracing::warn!( - "User email/login changed for {} -> {}", - old.user_data.email, - new.user_data.email + let mut user = AddHumanUserRequest::new( + SetHumanProfile::new(imported_user.first_name.clone(), imported_user.last_name.clone()) + .with_nick_name(imported_user.external_user_id.clone()) + .with_display_name(imported_user.get_display_name()), + SetHumanEmail::new(imported_user.email.clone()) + .with_is_verified(!self.feature_flags.is_enabled(FeatureFlag::VerifyEmail)), + ) + .with_organization( + Organization::new().with_org_id(self.zitadel_config.organization_id.clone()), + ) + .with_metadata(metadata); + + if let Some(phone) = imported_user.phone.clone() { + user.set_phone( + SetHumanPhone::new() + .with_phone(phone.clone()) + .with_is_verified(!self.feature_flags.is_enabled(FeatureFlag::VerifyPhone)), ); }; - if old.user_data.first_name != new.user_data.first_name - || old.user_data.last_name != new.user_data.last_name - { - self.zitadel_client - .update_human_user_profile( - &self.zitadel_config.organization_id, - user_id.clone(), - new.user_data.first_name.clone().to_string(), - new.user_data.last_name.clone().to_string(), - None, - Some(new.get_display_name()), - None, - None, + if self.feature_flags.is_enabled(FeatureFlag::SsoLogin) { + user.set_idp_links(vec![IdpLink::new() + .with_user_id( + get_zitadel_encoded_id(imported_user.get_external_id_bytes()?) + .context("Failed to set IDP user ID")?, ) - .await?; - }; + .with_idp_id(self.zitadel_config.idp_id.clone()) + // TODO: Figure out if this is the correct value; empty is not permitted + .with_user_name(imported_user.email.clone())]); + } - match (&old.user_data.phone, &new.user_data.phone) { - (Some(_), None) => { - self.zitadel_client - .remove_human_user_phone(&self.zitadel_config.organization_id, user_id.clone()) - .await?; - } - (_, Some(new_phone)) => { - self.zitadel_client - .update_human_user_phone( - &self.zitadel_config.organization_id, - user_id.clone(), - new_phone.clone().to_string(), - !self.feature_flags.is_enabled(FeatureFlag::VerifyPhone), + match self.zitadel_client.create_human_user(user.clone()).await { + Ok(res) => { + let id = res + .user_id() + .ok_or(anyhow!( + "Failed to create user ID for external user `{}`", + imported_user.external_user_id + ))? + .clone(); + + self.zitadel_client_v1 + .add_user_grant( + Some(self.zitadel_config.organization_id.clone()), + id, + self.zitadel_config.project_id.clone(), + None, + vec![FAMEDLY_USER_ROLE.to_owned()], ) .await?; } - (None, None) => {} - }; - - if old.user_data.email != new.user_data.email { - self.zitadel_client - .update_human_user_email( - &self.zitadel_config.organization_id, - user_id.clone(), - new.user_data.email.clone().to_string(), - !self.feature_flags.is_enabled(FeatureFlag::VerifyEmail), - ) - .await?; - }; - - if old.user_data.preferred_username != new.user_data.preferred_username { - self.zitadel_client - .set_user_metadata( - Some(&self.zitadel_config.organization_id), - user_id, - "preferred_username".to_owned(), - &new.user_data.preferred_username.clone().to_string(), - ) - .await?; - }; - tracing::info!("Successfully updated user {}", old.user_data.email); - - Ok(()) - } - - /// Delete a Zitadel user given only their LDAP id - async fn delete_user_by_id(&self, user_id: &str) -> Result<()> { - if self.feature_flags.is_enabled(FeatureFlag::DryRun) { - tracing::info!("Not deleting user `{}` due to dry run", user_id); - return Ok(()); - } - - let user = self - .zitadel_client - .get_user_by_nick_name( - Some(self.zitadel_config.organization_id.clone()), - user_id.to_owned(), - ) - .await?; - match user { - Some(user) => self.zitadel_client.remove_user(user.id).await?, - None => bail!("Could not find user with ldap uid '{user_id}' for deletion"), + Err(error) => { + // If the phone number is invalid + if error.to_string().contains("PHONE-so0wa") { + user.reset_phone(); + self.zitadel_client.create_human_user(user).await?; + } else { + anyhow::bail!(error) + } + } } - tracing::info!("Successfully deleted user {}", user_id); - Ok(()) } - /// Delete a Zitadel user given only their email address used as login name - async fn delete_user_by_email(&self, email: &str) -> Result<()> { - if self.feature_flags.is_enabled(FeatureFlag::DryRun) { - tracing::info!("Not deleting user `{}` due to dry run", email); - return Ok(()); - } - - let user = self.zitadel_client.get_user_by_login_name(email).await?; - match user { - Some(user) => self.zitadel_client.remove_user(user.id).await?, - None => tracing::info!("Could not find user with email '{email}' for deletion"), - } - - tracing::info!("Successfully deleted user {}", email); - - Ok(()) - } + /// Update a user + pub async fn update_user( + &mut self, + zitadel_id: &str, + old_user: &User, + updated_user: &User, + ) -> Result<()> { + tracing::info!( + "Updating user `{}` to `{}`", + old_user.external_user_id, + updated_user.external_user_id + ); - /// Delete a Zitadel user given only their nick name - async fn delete_user_by_nick(&self, nick: &str) -> Result<()> { if self.feature_flags.is_enabled(FeatureFlag::DryRun) { - tracing::info!("Not deleting user `{}` due to dry run", nick); + tracing::warn!("Skipping update due to dry run"); return Ok(()); } - let user = self - .zitadel_client - .get_user_by_nick_name( - Some(self.zitadel_config.organization_id.clone()), - nick.to_owned(), - ) - .await?; - match user { - Some(user) => self.zitadel_client.remove_user(user.id).await?, - None => tracing::info!("Could not find user with nick '{nick}' for deletion"), - } - - tracing::info!("Successfully deleted user {}", nick); + let mut request = UpdateHumanUserRequest::new(); - Ok(()) - } - - /// Retrieve the Zitadel user ID of a user, or None if the user - /// cannot be found - async fn get_user_id(&self, user: &ZitadelUser) -> Result> { - let status = self - .zitadel_client - .get_user_by_login_name(&user.user_data.email.clone().to_string()) - .await; - - if let Err(ZitadelError::TonicResponseError(ref error)) = status { - if error.code() == TonicErrorCode::NotFound { - return Ok(None); - } - } - - Ok(status.map(|user| user.map(|u| u.id))?) - } - - /// Delete a Zitadel user - async fn delete_user(&self, user: &ZitadelUser) -> Result<()> { - if self.feature_flags.is_enabled(FeatureFlag::DryRun) { - tracing::info!("Not deleting user due to dry run: {:?}", user); - return Ok(()); - } - - if let Some(user_id) = self.get_user_id(user).await? { - self.zitadel_client.remove_user(user_id).await?; - } else { - bail!("could not find user `{}` for deletion", user.user_data.email); + if old_user.email != updated_user.email { + request.set_username(updated_user.email.clone()); + request.set_email( + SetHumanEmail::new(updated_user.email.clone()) + .with_is_verified(!self.feature_flags.is_enabled(FeatureFlag::VerifyEmail)), + ); } - tracing::info!("Successfully deleted user {}", user.user_data.email); - - Ok(()) - } - - /// Import a user into Zitadel - async fn import_user(&self, user: &ZitadelUser) -> Result<()> { - if self.feature_flags.is_enabled(FeatureFlag::DryRun) { - tracing::info!("Not importing user due to dry run: {:?}", user); - return Ok(()); + if old_user.first_name != updated_user.first_name + || old_user.last_name != updated_user.last_name + { + request.set_profile( + SetHumanProfile::new( + updated_user.first_name.clone(), + updated_user.last_name.clone(), + ) + .with_display_name(updated_user.get_display_name()), + ); } - if !user.user_data.enabled { - tracing::info!("Not importing disabled user: {:?}", user); - return Ok(()); + if old_user.phone != updated_user.phone { + if let Some(phone) = updated_user.phone.clone() { + request.set_phone( + SetHumanPhone::new() + .with_phone(phone.clone()) + .with_is_verified(!self.feature_flags.is_enabled(FeatureFlag::VerifyPhone)), + ); + } else { + self.zitadel_client.remove_phone(zitadel_id).await?; + } } - let id = match &user.user_data.external_user_id { - StringOrBytes::String(value) => value.as_bytes(), - StringOrBytes::Bytes(value) => value, - }; - - let uuid; - let localpart = if self.feature_flags.contains(&FeatureFlag::PlainLocalpart) { - match &user.user_data.external_user_id { - StringOrBytes::String(value) => value, - StringOrBytes::Bytes(_) => { - bail!( - "Unsupported binary external ID for user using plain localparts: {:?}", - user - ); - } + if let Err(error) = self.zitadel_client.update_human_user(zitadel_id, request.clone()).await + { + // If the new phone number is invalid + if error.to_string().contains("PHONE-so0wa") { + request.reset_phone(); + self.zitadel_client.update_human_user(zitadel_id, request).await?; + + if let Err(error) = self.zitadel_client.remove_phone(zitadel_id).await { + // If the user didn't start out with a phone + if !error.to_string().contains("COMMAND-ieJ2e") { + anyhow::bail!(error); + } + }; + } else { + anyhow::bail!(error); } - } else { - uuid = Uuid::new_v5(&FAMEDLY_NAMESPACE, id).to_string(); - &uuid }; - let new_user_id = self - .zitadel_client - .create_human_user(&self.zitadel_config.organization_id, user.clone().into()) - .await?; - - self.zitadel_client - .set_user_metadata( - Some(&self.zitadel_config.organization_id), - new_user_id.clone(), - "preferred_username".to_owned(), - &user.user_data.preferred_username.clone().to_string(), - ) - .await?; - - self.zitadel_client - .set_user_metadata( - Some(&self.zitadel_config.organization_id), - new_user_id.clone(), - "localpart".to_owned(), - localpart, - ) - .await?; - - self.zitadel_client - .add_user_grant( - Some(self.zitadel_config.organization_id.clone()), - new_user_id, - self.zitadel_config.project_id.clone(), - None, - vec![FAMEDLY_USER_ROLE.to_owned()], - ) - .await?; - - tracing::info!("Successfully imported user {:?}", user); + if old_user.preferred_username != updated_user.preferred_username { + if let Some(preferred_username) = updated_user.preferred_username.clone() { + self.zitadel_client + .set_user_metadata( + zitadel_id, + "preferred_username", + &preferred_username.clone(), + ) + .await?; + } else { + self.zitadel_client.delete_user_metadata(zitadel_id, "preferred_username").await?; + } + } Ok(()) } +} - /// Check if an error is an invalid phone error - fn is_invalid_phone_error(error: anyhow::Error) -> bool { - /// Part of the error message returned by Zitadel - /// when a phone number is invalid for a new user - const INVALID_PHONE_IMPORT_ERROR: &str = "invalid ImportHumanUserRequest_Phone"; - - /// Part of the error message returned by Zitadel - /// when a phone number is invalid for an existing user being updated - const INVALID_PHONE_UPDATE_ERROR: &str = "invalid UpdateHumanPhoneRequest"; - - if let Ok(ZitadelError::TonicResponseError(ref error)) = error.downcast::() { - return error.code() == TonicErrorCode::InvalidArgument - && (error.message().contains(INVALID_PHONE_IMPORT_ERROR) - || error.message().contains(INVALID_PHONE_UPDATE_ERROR)); - } +/// Convert a Zitadel search result to a user +fn search_result_to_user(user: ZitadelUser) -> Result { + let human_user = user.human().ok_or(anyhow!("Machine user found in human user search"))?; + let nick_name = human_user + .profile() + .and_then(|p| p.nick_name()) + .ok_or(anyhow!("Missing external ID found for user"))?; + + // TODO: If async closures become a reality, we + // should capture the correct preferred_username + // here. + let user = User::try_from_zitadel_user(human_user.clone(), nick_name.clone())?; + Ok(user) +} - false - } +/// Get a base64-encoded external user ID, if the ID is raw bytes, +/// or a UTF-8 string if not. +/// +/// Note: This encoding scheme is inherently broken, because it is +/// impossible to tell apart base64 encoded strings from +/// non-base64 encoded strings. We can therefore never know if the +/// ID should be decoded or not when re-parsing it, and it may +/// create collisions (although this is unlikely). +/// +/// Only use this for Zitadel support. +pub fn get_zitadel_encoded_id(external_id_bytes: Vec) -> Result { + Ok(if let Ok(encoded_id) = String::from_utf8(external_id_bytes.clone()) { + encoded_id + } else { + BASE64_STANDARD.encode(external_id_bytes) + }) } /// Configuration related to Famedly Zitadel @@ -501,35 +342,3 @@ pub struct ZitadelConfig { /// IDP ID provided by Famedly Zitadel pub idp_id: String, } - -/// The different ways to identify a user in Zitadel -#[derive(Debug)] -pub enum UserId { - /// The login name is actually the email address - Login(String), - /// The nick name is actually the LDAP ID - Nick(String), - /// The Zitadel ID - #[allow(dead_code)] - ZitadelId(String), -} - -/// The difference between the source and Zitadel -#[derive(Debug)] -pub struct SourceDiff { - /// New users - pub new_users: Vec, - /// Changed users - pub changed_users: Vec, - /// Deleted user IDs - pub deleted_user_ids: Vec, -} - -/// A user that has changed returned from the LDAP poller -#[derive(Debug)] -pub struct ChangedUser { - /// The old state - pub old: User, - /// The new state - pub new: User, -} diff --git a/tests/e2e.rs b/tests/e2e.rs index c281b0b..45a1fba 100644 --- a/tests/e2e.rs +++ b/tests/e2e.rs @@ -1,17 +1,19 @@ -#![cfg(test)] +//! E2E integration tests +#![cfg(test)] +#![allow(clippy::expect_fun_call)] +/// E2E integration tests use std::{collections::HashSet, path::Path, time::Duration}; -use base64::prelude::{Engine, BASE64_STANDARD}; use famedly_sync::{ csv_test_helpers::temp_csv_file, + perform_sync, ukt_test_helpers::{ get_mock_server_url, prepare_endpoint_mock, prepare_oauth2_mock, ENDPOINT_PATH, OAUTH2_PATH, }, AttributeMapping, Config, FeatureFlag, }; use ldap3::{Ldap as LdapClient, LdapConnAsync, LdapConnSettings, Mod}; -use tempfile::TempDir; use test_log::test; use tokio::sync::OnceCell; use url::Url; @@ -22,8 +24,9 @@ use zitadel_rust_client::v1::{ Email, Gender, ImportHumanUserRequest, Phone, Profile, UserType, Zitadel, }; -static CONFIG: OnceCell = OnceCell::const_new(); -static TEMPDIR: OnceCell = OnceCell::const_new(); +static CONFIG_WITH_LDAP: OnceCell = OnceCell::const_new(); +static CONFIG_WITH_CSV: OnceCell = OnceCell::const_new(); +static CONFIG_WITH_UKT: OnceCell = OnceCell::const_new(); /// The Famedly UUID namespace to use to generate v5 UUIDs. const FAMEDLY_NAMESPACE: Uuid = uuid!("d9979cff-abee-4666-bc88-1ec45a843fb8"); @@ -31,6 +34,198 @@ const FAMEDLY_NAMESPACE: Uuid = uuid!("d9979cff-abee-4666-bc88-1ec45a843fb8"); /// The Zitadel project role to assign to users. const FAMEDLY_USER_ROLE: &str = "User"; +#[test(tokio::test)] +#[test_log(default_log_filter = "debug")] +async fn test_e2e_user_id_encoding() { + async fn verify_user_encoding( + ldap: &mut Ldap, + zitadel: &Zitadel, + config: &Config, + uid: &str, + email: &str, + ) -> Result<(), String> { + let login_name = email; + let expected_hex_id = hex::encode(uid.as_bytes()); + + ldap.create_user("Test", "User", "TU", login_name, None, uid, false).await; + + perform_sync(config).await.map_err(|e| format!("Sync failed: {}", e))?; + + let user = zitadel + .get_user_by_login_name(login_name) + .await + .map_err(|e| format!("Failed to get user: {}", e))? + .ok_or_else(|| "User not found".to_owned())?; + + match user.r#type { + Some(UserType::Human(user)) => { + let profile = user.profile.ok_or_else(|| "User lacks profile".to_owned())?; + + if profile.nick_name != expected_hex_id { + return Err(format!( + "ID mismatch for '{}': expected '{}', got '{}'", + uid, expected_hex_id, profile.nick_name + )); + } + Ok(()) + } + _ => Err("User lacks human details".to_owned()), + } + } + + /// Test cases for verifying correct user ID encoding + /// (uid, email) + const TEST_CASES: &[(&str, &str)] = &[ + // Basic cases + ("simple123", "simple123@example.com"), + ("MiXed123Case", "mixed123case@example.com"), + // Special characters + ("u.s-e_r", "user@example.com"), + ("123", "123@example.com"), + // Unicode + ("üsernamÉ", "username@example.com"), + ("ὈΔΥΣΣΕΎΣ", "odysseus@example.com"), + ("Потребител", "potrebitel@example.com"), + // Long string + ("ThisIsAVeryLongUsernameThatShouldStillWork123456789", "long@example.com"), + ]; + + // Run all test cases + let config = ldap_config().await; + let mut ldap = Ldap::new().await; + let zitadel = open_zitadel_connection().await; + + for (uid, email) in TEST_CASES { + if let Err(error) = verify_user_encoding(&mut ldap, &zitadel, config, uid, email).await { + panic!("Test failed for ID '{}': {}", uid, error); + } + } +} + +#[test(tokio::test)] +#[test_log(default_log_filter = "debug")] +async fn test_e2e_user_id_sync_ordering() { + struct TestUser<'a> { + uid: &'a str, + email: &'a str, + phone: &'a str, + } + + const TEST_USERS: &[TestUser] = &[ + TestUser { uid: "üser", email: "youser@example.com", phone: "+6666666666" }, + TestUser { uid: "aaa", email: "aaa@example.com", phone: "+1111111111" }, + TestUser { uid: "777", email: "777@example.com", phone: "+5555555555" }, + TestUser { uid: "bbb", email: "bbb@example.com", phone: "+3333333333" }, + TestUser { uid: "🦀", email: "crab@example.com", phone: "+1000000001" }, + TestUser { uid: "한글", email: "korean@example.com", phone: "+1000000002" }, + TestUser { uid: "عربي", email: "arabic@example.com", phone: "+1000000005" }, + ]; + + // Setup + let config = ldap_config().await; + let mut ldap = Ldap::new().await; + let zitadel = open_zitadel_connection().await; + + // Create all users in LDAP + for user in TEST_USERS { + ldap.create_user("Test", "User", "TU", user.email, Some(user.phone), user.uid, false).await; + } + + // Initial sync + perform_sync(config).await.expect("Initial sync failed"); + + // Verify all users exist with correct data + for user in TEST_USERS { + let expected_hex_id = hex::encode(user.uid.as_bytes()); + + let zitadel_user = zitadel + .get_user_by_login_name(user.email) + .await + .expect(&format!("Failed to get user {}", user.email)) + .expect(&format!("User {} not found", user.email)); + + match zitadel_user.r#type { + Some(UserType::Human(human)) => { + // Verify ID encoding + let profile = human.profile.expect(&format!("User {} lacks profile", user.email)); + assert_eq!( + profile.nick_name, + expected_hex_id, + "Wrong ID encoding for user {}, got '{:?}', expected '{:?}'", + user.email, + String::from_utf8_lossy(&hex::decode(profile.nick_name.clone()).unwrap()), + String::from_utf8_lossy(&hex::decode(expected_hex_id.clone()).unwrap()) + ); + + // Verify phone number to ensure complete sync + let phone = human.phone.expect(&format!("User {} lacks phone", user.email)); + assert_eq!(phone.phone, user.phone, "Wrong phone for user {}", user.email); + } + _ => panic!("User {} lacks human details", user.email), + } + } + + // Now update all users with new data + for user in TEST_USERS { + ldap.change_user( + user.uid, + // Just change the last_name (sn) attribute to the user's uid with SN prefix + vec![("sn", HashSet::from([format!("SN{}", user.uid).as_str()]))], + ) + .await; + } + + // Sync again + perform_sync(config).await.expect("Update sync failed"); + + // Verify updates were applied in correct order + for user in TEST_USERS { + let zitadel_user = zitadel + .get_user_by_login_name(user.email) + .await + .expect(&format!("Failed to get updated user {}", user.email)) + .expect(&format!("Updated user {} not found", user.email)); + + match zitadel_user.r#type { + Some(UserType::Human(human)) => { + let profile = + human.profile.expect(&format!("Updated user {} lacks profile", user.email)); + let last_name = profile.last_name; + assert_eq!( + last_name, + format!("SN{}", user.uid), + "Wrong updated last_name for user {}", + user.email + ); + } + _ => panic!("Updated user {} lacks human details", user.email), + } + } + + // Finally delete users in reverse order + for user in TEST_USERS.iter().rev() { + ldap.delete_user(user.uid).await; + } + + // Final sync + perform_sync(config).await.expect("Deletion sync failed"); + + // Verify all users were deleted in correct order + for user in TEST_USERS { + let result = zitadel.get_user_by_login_name(user.email).await; + + assert!( + matches!( + result, + Err(ZitadelError::TonicResponseError(status)) + if status.code() == TonicErrorCode::NotFound + ), + "User {} still exists after deletion", + user.email + ); + } +} + #[test(tokio::test)] #[test_log(default_log_filter = "debug")] async fn test_e2e_simple_sync() { @@ -46,8 +241,8 @@ async fn test_e2e_simple_sync() { ) .await; - let config = config().await; - config.perform_sync().await.expect("syncing failed"); + let config = ldap_config().await; + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel @@ -119,8 +314,8 @@ async fn test_e2e_sync_disabled_user() { ) .await; - let config = config().await; - config.perform_sync().await.expect("syncing failed"); + let config = ldap_config().await; + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel.get_user_by_login_name("disabled_user@famedly.de").await; @@ -144,7 +339,7 @@ async fn test_e2e_sync_disabled_user() { #[test(tokio::test)] #[test_log(default_log_filter = "debug")] async fn test_e2e_sso() { - let mut config = config().await.clone(); + let mut config = ldap_config().await.clone(); config.feature_flags.push(FeatureFlag::SsoLogin); let mut ldap = Ldap::new().await; @@ -159,7 +354,7 @@ async fn test_e2e_sso() { ) .await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel @@ -188,12 +383,12 @@ async fn test_e2e_sync_change() { ) .await; - let config = config().await; - config.perform_sync().await.expect("syncing failed"); + let config = ldap_config().await; + perform_sync(config).await.expect("syncing failed"); ldap.change_user("change", vec![("telephoneNumber", HashSet::from(["+12015550123"]))]).await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel @@ -226,20 +421,20 @@ async fn test_e2e_sync_disable_and_reenable() { ) .await; - let config = config().await; + let config = ldap_config().await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel.get_user_by_login_name("disable@famedly.de").await; assert!(user.is_ok_and(|u| u.is_some())); ldap.change_user("disable", vec![("shadowFlag", HashSet::from(["514"]))]).await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); let user = zitadel.get_user_by_login_name("disable@famedly.de").await; assert!(user.is_err_and(|error| matches!(error, ZitadelError::TonicResponseError(status) if status.code() == TonicErrorCode::NotFound))); ldap.change_user("disable", vec![("shadowFlag", HashSet::from(["512"]))]).await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel.get_user_by_login_name("disable@famedly.de").await; assert!(user.is_ok_and(|u| u.is_some())); @@ -260,13 +455,13 @@ async fn test_e2e_sync_email_change() { ) .await; - let config = config().await; - config.perform_sync().await.expect("syncing failed"); + let config = ldap_config().await; + perform_sync(config).await.expect("syncing failed"); ldap.change_user("email_change", vec![("mail", HashSet::from(["email_changed@famedly.de"]))]) .await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel.get_user_by_login_name("email_changed@famedly.de").await; @@ -289,8 +484,8 @@ async fn test_e2e_sync_deletion() { ) .await; - let config = config().await; - config.perform_sync().await.expect("syncing failed"); + let config = ldap_config().await; + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = @@ -299,7 +494,7 @@ async fn test_e2e_sync_deletion() { ldap.delete_user("deleted").await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); let user = zitadel.get_user_by_login_name("deleted@famedly.de").await; assert!(user.is_err_and(|error| matches!(error, ZitadelError::TonicResponseError(status) if status.code() == TonicErrorCode::NotFound))); @@ -308,7 +503,7 @@ async fn test_e2e_sync_deletion() { #[test(tokio::test)] #[test_log(default_log_filter = "debug")] async fn test_e2e_ldaps() { - let mut config = config().await.clone(); + let mut config = ldap_config().await.clone(); config .sources .ldap @@ -330,7 +525,7 @@ async fn test_e2e_ldaps() { ) .await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel @@ -344,7 +539,7 @@ async fn test_e2e_ldaps() { #[test(tokio::test)] #[test_log(default_log_filter = "debug")] async fn test_e2e_ldaps_starttls() { - let mut config = config().await.clone(); + let mut config = ldap_config().await.clone(); config .sources .ldap @@ -362,12 +557,12 @@ async fn test_e2e_ldaps_starttls() { "Bobby", "starttls@famedly.de", Some("+12015550123"), - "starttls", + "starttls2", false, ) .await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel @@ -385,8 +580,8 @@ async fn test_e2e_no_phone() { ldap.create_user("Bob", "Tables", "Bobby", "no_phone@famedly.de", None, "no_phone", false) .await; - let config = config().await; - config.perform_sync().await.expect("syncing failed"); + let config = ldap_config().await; + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel @@ -432,8 +627,8 @@ async fn test_e2e_sync_invalid_phone() { ) .await; - let config = config().await; - config.perform_sync().await.expect("syncing failed"); + let config = ldap_config().await; + perform_sync(config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; @@ -468,7 +663,7 @@ async fn test_e2e_sync_invalid_phone() { ldap.change_user("good_gone_bad_phone", vec![("telephoneNumber", HashSet::from(["abc"]))]) .await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); let user = zitadel .get_user_by_login_name("good_gone_bad_phone@famedly.de") @@ -486,76 +681,124 @@ async fn test_e2e_sync_invalid_phone() { #[test(tokio::test)] #[test_log(default_log_filter = "debug")] -async fn test_e2e_binary_attr() { - let mut config = config().await.clone(); +async fn test_e2e_binary_uid() { + let mut config = ldap_config().await.clone(); + + // Attribute uid (user_id) is configured as binary - // OpenLDAP checks if types match, so we need to use an attribute - // that can actually be binary. config .sources .ldap .as_mut() .expect("ldap must be configured for this test") .attributes - .preferred_username = AttributeMapping::OptionalBinary { + .user_id = AttributeMapping::OptionalBinary { name: "userSMIMECertificate".to_owned(), is_binary: true, }; let mut ldap = Ldap::new().await; + + // Create test user with binary ID + let uid = "binary_user"; + let binary_uid = uid.as_bytes(); ldap.create_user( - "Bob", - "Tables", - "Bobby", - "binary@famedly.de", - Some("+12015550123"), - "binary", + "Binary", + "User", + "BinaryTest", + "binary_id@famedly.de", + Some("+12345678901"), + uid, // Regular uid for DN false, ) .await; + + // Set binary ID ldap.change_user( - "binary", - vec![( - "userSMIMECertificate".as_bytes(), - // It's important that this is invalid UTF-8 - HashSet::from([[0xA0, 0xA1].as_slice()]), - )], + uid, + vec![("userSMIMECertificate".as_bytes(), HashSet::from([uid.as_bytes()]))], ) .await; - let org_id = config.zitadel.organization_id.clone(); - - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel - .get_user_by_login_name("binary@famedly.de") + .get_user_by_login_name("binary_id@famedly.de") .await - .expect("could not query Zitadel users"); + .expect("could not query Zitadel users") + .expect("user not found"); - assert!(user.is_some()); + match user.r#type { + Some(UserType::Human(user)) => { + let profile = user.profile.expect("user lacks profile"); + // The ID should be hex encoded in Zitadel + assert_eq!(profile.nick_name, hex::encode(binary_uid)); + } + _ => panic!("user lacks human details"), + } - if let Some(user) = user { - let preferred_username = zitadel - .get_user_metadata(Some(org_id), &user.id, "preferred_username") - .await - .expect("could not get user metadata"); + // Test update to a different binary ID that is valid UTF-8 - assert_eq!( - preferred_username - .map(|u| BASE64_STANDARD.decode(u).expect("failed to decode binary attr")), - Some([0xA0, 0xA1].to_vec()) - ); + let new_binary_id = "updated_binary_user".as_bytes(); + ldap.change_user( + uid, + vec![("userSMIMECertificate".as_bytes(), HashSet::from([new_binary_id]))], + ) + .await; + + perform_sync(&config).await.expect("syncing failed"); + + let user = zitadel + .get_user_by_login_name("binary_id@famedly.de") + .await + .expect("could not query Zitadel users") + .expect("user not found after update"); + + match user.r#type { + Some(UserType::Human(user)) => { + let profile = user.profile.expect("user lacks profile"); + tracing::info!("profile: {:#?}", profile); + // Verify ID was updated + assert_eq!(profile.nick_name, hex::encode(new_binary_id)); + } + _ => panic!("user lost human details after update"), + } + + // Test update to binary ID that is NOT valid UTF-8 + + let invalid_binary_id = [0xA1, 0xA2]; + ldap.change_user( + uid, + vec![("userSMIMECertificate".as_bytes(), HashSet::from([invalid_binary_id.as_slice()]))], + ) + .await; + + perform_sync(&config).await.expect("syncing failed"); + + let user = zitadel + .get_user_by_login_name("binary_id@famedly.de") + .await + .expect("could not query Zitadel users") + .expect("user not found after update"); + + match user.r#type { + Some(UserType::Human(user)) => { + let profile = user.profile.expect("user lacks profile"); + // Verify ID was updated + assert_eq!(profile.nick_name, hex::encode(invalid_binary_id)); + } + _ => panic!("user lost human details after update"), } } #[test(tokio::test)] #[test_log(default_log_filter = "debug")] -async fn test_e2e_binary_attr_valid_utf8() { - let mut config = config().await.clone(); +async fn test_e2e_binary_preferred_username() { + let mut config = ldap_config().await.clone(); + + // Attribute preferred_username is configured as binary but shouldn't be - // OpenLDAP checks if types match, so we need to use an attribute - // that can actually be binary. config .sources .ldap @@ -569,52 +812,90 @@ async fn test_e2e_binary_attr_valid_utf8() { let mut ldap = Ldap::new().await; ldap.create_user( - "Bob", - "Tables", - "Bobby", - "binaryutf8@famedly.de", + "BobFail", + "TablesFail", + "BobbyFail", + "binary_fail@famedly.de", Some("+12015550123"), - "binaryutf8", + "binary_fail", false, ) .await; + + // Test with a valid UTF-8 binary attribute + ldap.change_user( - "binaryutf8", - vec![("userSMIMECertificate".as_bytes(), HashSet::from(["validutf8".as_bytes()]))], + "binary_fail", + vec![("userSMIMECertificate".as_bytes(), HashSet::from(["new_binary_fail".as_bytes()]))], ) .await; - let org_id = config.zitadel.organization_id.clone(); + let result = tokio::spawn({ + let config = config.clone(); + async move { perform_sync(&config).await } + }) + .await; - config.perform_sync().await.expect("syncing failed"); + match result { + Ok(sync_result) => { + assert!(sync_result.is_err()); + let error = sync_result.unwrap_err(); + assert!(error.to_string().contains("Failed to query users from LDAP")); + if let Some(cause) = error.source() { + assert!(cause.to_string().contains("Binary values are not accepted")); + assert!(cause.to_string().contains("attribute `userSMIMECertificate`")); + } else { + panic!("Expected error to have a cause"); + } + } + Err(join_error) if join_error.is_panic() => { + panic!("perform_sync panicked unexpectedly: {}", join_error); + } + Err(e) => { + panic!("unexpected error: {}", e); + } + } - let zitadel = open_zitadel_connection().await; - let user = zitadel - .get_user_by_login_name("binaryutf8@famedly.de") - .await - .expect("could not query Zitadel users"); + // Test with an invalid UTF-8 binary attribute - assert!(user.is_some()); + ldap.change_user( + "binary_fail", + vec![("userSMIMECertificate".as_bytes(), HashSet::from([[0xA0, 0xA1].as_slice()]))], + ) + .await; - if let Some(user) = user { - let preferred_username = zitadel - .get_user_metadata(Some(org_id), &user.id, "preferred_username") - .await - .expect("could not get user metadata"); + let result = tokio::spawn({ + let config = config.clone(); + async move { perform_sync(&config).await } + }) + .await; - assert_eq!( - preferred_username - .map(|u| BASE64_STANDARD.decode(u).expect("failed to decode binary attr")), - Some("validutf8".as_bytes().to_vec()) - ); + match result { + Ok(sync_result) => { + assert!(sync_result.is_err()); + let error = sync_result.unwrap_err(); + assert!(error.to_string().contains("Failed to query users from LDAP")); + if let Some(cause) = error.source() { + assert!(cause.to_string().contains("Binary values are not accepted")); + assert!(cause.to_string().contains("attribute `userSMIMECertificate`")); + } else { + panic!("Expected error to have a cause"); + } + } + Err(join_error) if join_error.is_panic() => { + panic!("perform_sync panicked unexpectedly: {}", join_error); + } + Err(e) => { + panic!("unexpected error: {}", e); + } } } #[test(tokio::test)] #[test_log(default_log_filter = "debug")] async fn test_e2e_dry_run() { - let mut dry_run_config = config().await.clone(); - let config = config().await; + let mut dry_run_config = ldap_config().await.clone(); + let config = ldap_config().await; dry_run_config.feature_flags.push(FeatureFlag::DryRun); let mut ldap = Ldap::new().await; @@ -632,17 +913,17 @@ async fn test_e2e_dry_run() { let zitadel = open_zitadel_connection().await; // Assert the user does not sync, because this is a dry run - dry_run_config.perform_sync().await.expect("syncing failed"); + perform_sync(&dry_run_config).await.expect("syncing failed"); assert!(zitadel.get_user_by_login_name("dry_run@famedly.de").await.is_err_and( |error| matches!(error, ZitadelError::TonicResponseError(status) if status.code() == TonicErrorCode::NotFound), )); // Actually sync the user so we can test other changes= - config.perform_sync().await.expect("syncing failed"); + perform_sync(config).await.expect("syncing failed"); // Assert that a change in phone number does not sync ldap.change_user("dry_run", vec![("telephoneNumber", HashSet::from(["+12015550124"]))]).await; - dry_run_config.perform_sync().await.expect("syncing failed"); + perform_sync(&dry_run_config).await.expect("syncing failed"); let user = zitadel .get_user_by_login_name("dry_run@famedly.de") .await @@ -655,7 +936,7 @@ async fn test_e2e_dry_run() { // Assert that disabling a user does not sync ldap.change_user("dry_run", vec![("shadowFlag", HashSet::from(["514"]))]).await; - dry_run_config.perform_sync().await.expect("syncing failed"); + perform_sync(&dry_run_config).await.expect("syncing failed"); assert!(zitadel .get_user_by_login_name("dry_run@famedly.de") .await @@ -663,7 +944,7 @@ async fn test_e2e_dry_run() { // Assert that a user deletion does not sync ldap.delete_user("dry_run").await; - dry_run_config.perform_sync().await.expect("syncing failed"); + perform_sync(&dry_run_config).await.expect("syncing failed"); assert!(zitadel .get_user_by_login_name("dry_run@famedly.de") .await @@ -720,8 +1001,8 @@ async fn test_e2e_sync_deactivated_only() { ldap.change_user("reenabled_disable_only", vec![("shadowFlag", HashSet::from(["514"]))]).await; - let mut config = config().await.clone(); - config.perform_sync().await.expect("syncing failed"); + let mut config = ldap_config().await.clone(); + perform_sync(&config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel.get_user_by_login_name("disable_disable_only@famedly.de").await; @@ -754,7 +1035,7 @@ async fn test_e2e_sync_deactivated_only() { .await; ldap.delete_user("deleted_disable_only").await; ldap.change_user("reenabled_disable_only", vec![("shadowFlag", HashSet::from(["512"]))]).await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let user = zitadel.get_user_by_login_name("disable_disable_only@famedly.de").await; assert!(user.is_err_and(|error| matches!(error, ZitadelError::TonicResponseError(status) if status.code() == TonicErrorCode::NotFound))); @@ -788,7 +1069,7 @@ async fn test_e2e_ukt_sync() { prepare_oauth2_mock(&mock_server).await; prepare_endpoint_mock(&mock_server, "delete_me@famedly.de").await; - let mut config = config().await.clone(); + let mut config = ukt_config().await.clone(); config .sources @@ -836,18 +1117,21 @@ async fn test_e2e_ukt_sync() { let user = user.expect("could not find user"); assert_eq!(user.user_name, "delete_me@famedly.de"); - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let user = zitadel.get_user_by_login_name("delete_me@famedly.de").await; assert!(user.is_err_and(|error| matches!(error, ZitadelError::TonicResponseError(status) if status.code() == TonicErrorCode::NotFound))); } +// Currently fails because CSV uses non-hex encoded IDs, need to think +// about how to fit this into the overall workflow +#[ignore] #[test(tokio::test)] #[test_log(default_log_filter = "debug")] async fn test_e2e_csv_sync() { - let mut config = config().await.clone(); + let mut config = csv_config().await.clone(); - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let zitadel = open_zitadel_connection().await; let user = zitadel @@ -903,13 +1187,13 @@ async fn test_e2e_csv_sync() { let grant = grants.result.first().expect("no user grants found"); assert!(grant.role_keys.clone().into_iter().any(|key| key == FAMEDLY_USER_ROLE)); - // Not possible to re-import an existing user (as checked by unique email) + // Re-import an existing user to update (as checked by unique email) let csv_content = indoc::indoc! {r#" email,first_name,last_name,phone john.doe@example.com,Changed_Name,Changed_Surname,+2222222222 "#}; let _file = temp_csv_file(&mut config, csv_content); - config.perform_sync().await.expect("syncing failed"); + perform_sync(&config).await.expect("syncing failed"); let user = zitadel .get_user_by_login_name("john.doe@example.com") @@ -923,10 +1207,10 @@ async fn test_e2e_csv_sync() { let phone = user.phone.expect("user lacks a phone number"); let email = user.email.expect("user lacks an email address"); - assert_eq!(profile.first_name, "John"); - assert_eq!(profile.last_name, "Doe"); - assert_eq!(profile.display_name, "Doe, John"); - assert_eq!(phone.phone, "+1111111111"); + assert_eq!(profile.first_name, "Changed_Name"); + assert_eq!(profile.last_name, "Changed_Surname"); + assert_eq!(profile.display_name, "Changed_Surname, Changed_Name"); + assert_eq!(phone.phone, "+2222222222"); assert!(phone.is_phone_verified); assert_eq!(email.email, "john.doe@example.com"); assert!(email.is_email_verified); @@ -937,23 +1221,12 @@ async fn test_e2e_csv_sync() { #[test(tokio::test)] #[test_log(default_log_filter = "debug")] -async fn test_e2e_full_sync() { +async fn test_e2e_ldap_with_ukt_sync() { let mock_server = MockServer::start().await; prepare_oauth2_mock(&mock_server).await; prepare_endpoint_mock(&mock_server, "not_to_be_there@famedly.de").await; - let mut config = config().await.clone(); - config - .sources - .ukt - .as_mut() - .map(|ukt| { - ukt.oauth2_url = get_mock_server_url(&mock_server, OAUTH2_PATH) - .expect("Failed to get mock server URL"); - ukt.endpoint_url = get_mock_server_url(&mock_server, ENDPOINT_PATH) - .expect("Failed to get mock server URL"); - }) - .expect("UKT configuration is missing"); + // LDAP SYNC let mut ldap = Ldap::new().await; ldap.create_user( @@ -1000,44 +1273,34 @@ async fn test_e2e_full_sync() { ) .await; - let csv_content = indoc::indoc! {r#" - email,first_name,last_name,phone - csv_sync@example.com,John,Doe,+1111111111 - "#}; - // Have to create a new temp file - // because we can't re-use users between tests - // and the ./tests/environment/files/test-users.csv was already imported - let _file = temp_csv_file(&mut config, csv_content); + let ldap_config = ldap_config().await.clone(); + perform_sync(&ldap_config).await.expect("syncing failed"); - config.perform_sync().await.expect("syncing failed"); + // UKT SYNC - let zitadel = open_zitadel_connection().await; + let mut ukt_config = ukt_config().await.clone(); + ukt_config + .sources + .ukt + .as_mut() + .map(|ukt| { + ukt.oauth2_url = get_mock_server_url(&mock_server, OAUTH2_PATH) + .expect("Failed to get mock server URL"); + ukt.endpoint_url = get_mock_server_url(&mock_server, ENDPOINT_PATH) + .expect("Failed to get mock server URL"); + }) + .expect("UKT configuration is missing"); - let user = zitadel - .get_user_by_login_name("csv_sync@example.com") - .await - .expect("could not query Zitadel users"); - assert!(user.is_some()); - let user = user.expect("could not find user"); - assert_eq!(user.user_name, "csv_sync@example.com"); - if let Some(UserType::Human(user)) = user.r#type { - let profile = user.profile.expect("user lacks a profile"); - let phone = user.phone.expect("user lacks a phone number"); - let email = user.email.expect("user lacks an email address"); + perform_sync(&ukt_config).await.expect("syncing failed"); - assert_eq!(profile.first_name, "John"); - assert_eq!(profile.last_name, "Doe"); - assert_eq!(profile.display_name, "Doe, John"); - assert_eq!(phone.phone, "+1111111111"); - assert!(phone.is_phone_verified); - assert_eq!(email.email, "csv_sync@example.com"); - assert!(email.is_email_verified); - } else { - panic!("user lacks details"); - } + // VERIFY RESULTS OF SYNC + + let zitadel = open_zitadel_connection().await; let user = zitadel.get_user_by_login_name("not_to_be_there@famedly.de").await; - assert!(user.is_err_and(|error| matches!(error, ZitadelError::TonicResponseError(status) if status.code() == TonicErrorCode::NotFound))); + assert!(user.is_err_and(|error| matches!(error, + ZitadelError::TonicResponseError(status) if status.code() == + TonicErrorCode::NotFound))); let user = zitadel .get_user_by_login_name("to_be_there@famedly.de") @@ -1064,11 +1327,15 @@ async fn test_e2e_full_sync() { _ => panic!("human user became a machine user?"), } + // UPDATES IN LDAP + ldap.change_user("to_be_changed", vec![("telephoneNumber", HashSet::from(["+12015550123"]))]) .await; ldap.delete_user("not_to_be_there_later").await; - config.perform_sync().await.expect("syncing failed"); + perform_sync(&ldap_config).await.expect("syncing failed"); + + // VERIFY SECOND LDAP SYNC let user = zitadel .get_user_by_login_name("to_be_changed@famedly.de") @@ -1082,18 +1349,60 @@ async fn test_e2e_full_sync() { } _ => panic!("human user became a machine user?"), } - let user = zitadel.get_user_by_login_name("not_to_be_there_later@famedly.de").await; assert!(user.is_err_and(|error| matches!(error, ZitadelError::TonicResponseError(status) if status.code() == TonicErrorCode::NotFound))); } +#[test(tokio::test)] +#[test_log(default_log_filter = "debug")] +async fn test_e2e_sso_linking() { + let mut config = ldap_config().await.clone(); + config.feature_flags.push(FeatureFlag::SsoLogin); + + let mut ldap = Ldap::new().await; + let test_email = "sso_link_test@famedly.de"; + let test_uid = "sso_link_test"; + ldap.create_user( + "SSO", + "LinkTest", + "SSO Link", + test_email, + Some("+12015550199"), + test_uid, + false, + ) + .await; + + perform_sync(&config).await.expect("syncing failed"); + + let zitadel = open_zitadel_connection().await; + let user = zitadel + .get_user_by_login_name(test_email) + .await + .expect("could not query Zitadel users") + .expect("could not find user"); + + let idps = zitadel.list_user_idps(user.id.clone()).await.expect("could not get user IDPs"); + + assert!(!idps.is_empty(), "User should have IDP links"); + + let idp = idps.first().expect("No IDP link found"); + assert_eq!(idp.idp_id, config.zitadel.idp_id, "IDP link should match configured IDP"); + assert_eq!(idp.provided_user_id, test_uid, "IDP provided_user_id should match plain LDAP uid"); + assert_eq!(idp.user_id, user.id, "IDP user_id should match Zitadel user id"); + assert_eq!( + idp.provided_user_name, test_email, + "IDP provided_user_name should match test_email" + ); +} + struct Ldap { client: LdapClient, } impl Ldap { async fn new() -> Self { - let config = config().await.clone(); + let config = ldap_config().await.clone(); let mut settings = LdapConnSettings::new(); if let Some(ref ldap_config) = config.sources.ldap { @@ -1148,7 +1457,7 @@ impl Ldap { attrs.push(("telephoneNumber", HashSet::from([phone]))); } - let base_dn = config() + let base_dn = ldap_config() .await .sources .ldap @@ -1177,7 +1486,7 @@ impl Ldap { .map(|(attribute, changes)| Mod::Replace(attribute, changes)) .collect(); - let base_dn = config() + let base_dn = ldap_config() .await .sources .ldap @@ -1195,7 +1504,7 @@ impl Ldap { } async fn delete_user(&mut self, uid: &str) { - let base_dn = config() + let base_dn = ldap_config() .await .sources .ldap @@ -1215,29 +1524,60 @@ impl Ldap { /// Open a connection to the configured Zitadel backend async fn open_zitadel_connection() -> Zitadel { - let zitadel_config = config().await.zitadel.clone(); + let zitadel_config = ldap_config().await.zitadel.clone(); Zitadel::new(zitadel_config.url, zitadel_config.key_file) .await .expect("failed to set up Zitadel client") } /// Get the module's test environment config -async fn config() -> &'static Config { - CONFIG +async fn ldap_config() -> &'static Config { + CONFIG_WITH_LDAP .get_or_init(|| async { let mut config = Config::new(Path::new("tests/environment/config.yaml")) .expect("failed to parse test env file"); - let tempdir = TEMPDIR - .get_or_init(|| async { TempDir::new().expect("failed to initialize cache dir") }) - .await; + config.sources.ldap = serde_yaml::from_slice( + &std::fs::read(Path::new("tests/environment/ldap-config.template.yaml")) + .expect("failed to read ldap config file"), + ) + .expect("failed to parse ldap config"); config - .sources - .ldap - .as_mut() - .expect("ldap must be configured for this test") - .cache_path = tempdir.path().join("cache.bin"); + }) + .await +} + +/// Get the module's test environment config +async fn ukt_config() -> &'static Config { + CONFIG_WITH_UKT + .get_or_init(|| async { + let mut config = Config::new(Path::new("tests/environment/config.yaml")) + .expect("failed to parse test env file"); + + config.sources.ukt = serde_yaml::from_slice( + &std::fs::read(Path::new("tests/environment/ukt-config.template.yaml")) + .expect("failed to read ukt config file"), + ) + .expect("failed to parse ukt config"); + + config + }) + .await +} + +/// Get the module's test environment config +async fn csv_config() -> &'static Config { + CONFIG_WITH_CSV + .get_or_init(|| async { + let mut config = Config::new(Path::new("tests/environment/config.yaml")) + .expect("failed to parse test env file"); + + config.sources.csv = serde_yaml::from_slice( + &std::fs::read(Path::new("tests/environment/csv-config.template.yaml")) + .expect("failed to read csv config file"), + ) + .expect("failed to parse csv config"); config }) diff --git a/tests/environment/config.template.yaml b/tests/environment/config.template.yaml index e3fe7b0..5f92a36 100644 --- a/tests/environment/config.template.yaml +++ b/tests/environment/config.template.yaml @@ -5,45 +5,8 @@ zitadel: project_id: @PROJECT_ID@ idp_id: @IDP_ID@ -sources: - ldap: - url: ldap://localhost:1389 - base_dn: ou=testorg,dc=example,dc=org - bind_dn: cn=admin,dc=example,dc=org - bind_password: adminpassword - user_filter: "(objectClass=shadowAccount)" - timeout: 5 - check_for_deleted_entries: true - use_attribute_filter: true - attributes: - first_name: "cn" # objectClass: person - last_name: "sn" # objectClass: person - preferred_username: "displayName" # objectClass: inetOrgPerson - email: "mail" # objectClass: inetOrgPerson - phone: "telephoneNumber" # objectClass: person - user_id: "uid" - status: - name: "shadowFlag" # objectClass: shadowAccount - is_binary: false - disable_bitmasks: [0x2, 0x10] - tls: - client_key: ./tests/environment/certs/client.key - client_certificate: ./tests/environment/certs/client.crt - server_certificate: ./tests/environment/certs/server.crt - danger_disable_tls_verify: false - danger_use_start_tls: false - cache_path: ./test - - ukt: - endpoint_url: https://list.example.invalid/usersync4chat/maillist - oauth2_url: https://list.example.invalid/token - client_id: mock_client_id - client_secret: mock_client_secret - scope: "openid read-maillist" - grant_type: client_credentials - - csv: - file_path: ./tests/environment/files/test-users.csv - feature_flags: - sso_login + +sources: + test: 1 diff --git a/tests/environment/csv-config.template.yaml b/tests/environment/csv-config.template.yaml new file mode 100644 index 0000000..5766673 --- /dev/null +++ b/tests/environment/csv-config.template.yaml @@ -0,0 +1,6 @@ +# Updates Zitadel to match the CSV file. +#! DANGER: This will delete all users that are not in the CSV file! + +# Expected structure of the CSV file is as follows: +# email,first_name,last_name,phone +file_path: ./tests/environment/files/test-users.csv diff --git a/tests/environment/docker-compose.yaml b/tests/environment/docker-compose.yaml index 159335c..179d42a 100644 --- a/tests/environment/docker-compose.yaml +++ b/tests/environment/docker-compose.yaml @@ -53,7 +53,7 @@ services: condition: 'service_healthy' zitadel: - image: ghcr.io/zitadel/zitadel:v2.58.3 + image: ghcr.io/zitadel/zitadel:v2.64.1 command: start-from-init --masterkey "MasterkeyNeedsToHave32Characters" --tlsMode disabled --config /zitadel-config/zitadel-config.yaml --steps /zitadel-config/zitadel-init.yaml ports: - 8080:8080 diff --git a/tests/environment/ldap-config.template.yaml b/tests/environment/ldap-config.template.yaml new file mode 100644 index 0000000..950d747 --- /dev/null +++ b/tests/environment/ldap-config.template.yaml @@ -0,0 +1,25 @@ +url: ldap://localhost:1389 +base_dn: ou=testorg,dc=example,dc=org +bind_dn: cn=admin,dc=example,dc=org +bind_password: adminpassword +user_filter: "(objectClass=shadowAccount)" +timeout: 5 +check_for_deleted_entries: true +use_attribute_filter: true +attributes: + first_name: "cn" # objectClass: person + last_name: "sn" # objectClass: person + preferred_username: "displayName" # objectClass: inetOrgPerson + email: "mail" # objectClass: inetOrgPerson + phone: "telephoneNumber" # objectClass: person + user_id: "uid" + status: + name: "shadowFlag" # objectClass: shadowAccount + is_binary: false + disable_bitmasks: [0x2, 0x10] +tls: + client_key: ./tests/environment/certs/client.key + client_certificate: ./tests/environment/certs/client.crt + server_certificate: ./tests/environment/certs/server.crt + danger_disable_tls_verify: false + danger_use_start_tls: false diff --git a/tests/environment/ukt-config.template.yaml b/tests/environment/ukt-config.template.yaml new file mode 100644 index 0000000..7157db4 --- /dev/null +++ b/tests/environment/ukt-config.template.yaml @@ -0,0 +1,6 @@ +endpoint_url: https://list.example.invalid/usersync4chat/maillist +oauth2_url: https://list.example.invalid/token +client_id: mock_client_id +client_secret: mock_client_secret +scope: "openid read-maillist" +grant_type: client_credentials