Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(python): add capability to read unity catalog (uc://) uris #3113

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 69 additions & 2 deletions crates/catalog-unity/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,11 @@ pub enum UnityCatalogConfigKey {
/// - `azure_use_azure_cli`
/// - `use_azure_cli`
UseAzureCli,

/// Allow http url (e.g. http://localhost:8080/api/2.1/...)
/// Supported keys:
/// - `unity_allow_http_url`
AllowHttpUrl,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This allows users to work with a local (non-https) Unity Catalog REST API with delta-rs.

}

impl FromStr for UnityCatalogConfigKey {
Expand Down Expand Up @@ -224,6 +229,7 @@ impl FromStr for UnityCatalogConfigKey {
"workspace_url" | "unity_workspace_url" | "databricks_workspace_url" => {
Ok(UnityCatalogConfigKey::WorkspaceUrl)
}
"allow_http_url" | "unity_allow_http_url" => Ok(UnityCatalogConfigKey::AllowHttpUrl),
_ => Err(DataCatalogError::UnknownConfigKey {
catalog: "unity",
key: s.to_string(),
Expand All @@ -237,6 +243,7 @@ impl AsRef<str> for UnityCatalogConfigKey {
fn as_ref(&self) -> &str {
match self {
UnityCatalogConfigKey::AccessToken => "unity_access_token",
UnityCatalogConfigKey::AllowHttpUrl => "unity_allow_http_url",
UnityCatalogConfigKey::AuthorityHost => "unity_authority_host",
UnityCatalogConfigKey::AuthorityId => "unity_authority_id",
UnityCatalogConfigKey::ClientId => "unity_client_id",
Expand Down Expand Up @@ -289,6 +296,9 @@ pub struct UnityCatalogBuilder {
/// When set to true, azure cli has to be used for acquiring access token
use_azure_cli: bool,

/// When set to true, http will be allowed in the catalog url
allow_http_url: bool,

/// Retry config
retry_config: RetryConfig,

Expand All @@ -311,6 +321,9 @@ impl UnityCatalogBuilder {
) -> DataCatalogResult<Self> {
match UnityCatalogConfigKey::from_str(key.as_ref())? {
UnityCatalogConfigKey::AccessToken => self.bearer_token = Some(value.into()),
UnityCatalogConfigKey::AllowHttpUrl => {
self.allow_http_url = str_is_truthy(&value.into())
}
UnityCatalogConfigKey::ClientId => self.client_id = Some(value.into()),
UnityCatalogConfigKey::ClientSecret => self.client_secret = Some(value.into()),
UnityCatalogConfigKey::AuthorityId => self.authority_id = Some(value.into()),
Expand Down Expand Up @@ -407,6 +420,44 @@ impl UnityCatalogBuilder {
self
}

/// Returns true if table uri is a valid Unity Catalog URI, false otherwise.
pub fn is_unity_catalog_uri(table_uri: &str) -> bool {
table_uri.starts_with("uc://")
}

/// Returns the storage location and temporary token to be used with the
/// Unity Catalog table.
pub async fn get_uc_location_and_token(
table_uri: &str,
) -> Result<(String, String), UnityCatalogError> {
let uri_parts: Vec<&str> = table_uri[5..].split('.').collect();
if uri_parts.len() != 3 {
panic!("Invalid Unity Catalog URI: {}", table_uri);
}

let catalog_id = uri_parts[0];
let database_name = uri_parts[1];
let table_name = uri_parts[2];

let unity_catalog = match UnityCatalogBuilder::from_env().build() {
Ok(uc) => uc,
Err(_e) => panic!("Unable to build Unity Catalog."),
};
let storage_location = match unity_catalog
.get_table_storage_location(Some(catalog_id.to_string()), database_name, table_name)
.await
{
Ok(s) => s,
Err(_e) => panic!("Unable to find the table's storage location."),
};
let token = unity_catalog.get_credential().await?;
let credential = match token.to_str() {
Ok(header_str) => header_str.to_string(),
Err(_e) => panic!("Unable to get string value from Unity Catalog token."),
};
Ok((storage_location, credential))
}

fn get_credential_provider(&self) -> Option<CredentialProvider> {
if let Some(token) = self.bearer_token.as_ref() {
return Some(CredentialProvider::BearerToken(token.clone()));
Expand Down Expand Up @@ -451,7 +502,12 @@ impl UnityCatalogBuilder {
.trim_end_matches('/')
.to_string();

let client = self.client_options.client()?;
let client_options = if self.allow_http_url {
self.client_options.with_allow_http(true)
} else {
self.client_options
};
let client = client_options.client()?;

Ok(UnityCatalog {
client,
Expand Down Expand Up @@ -612,7 +668,7 @@ impl UnityCatalog {
self.catalog_url(),
catalog_id.as_ref(),
database_name.as_ref(),
table_name.as_ref()
table_name.as_ref(),
))
.header(AUTHORIZATION, token)
.send()
Expand Down Expand Up @@ -661,6 +717,7 @@ mod tests {
use crate::models::tests::{GET_SCHEMA_RESPONSE, GET_TABLE_RESPONSE, LIST_SCHEMAS_RESPONSE};
use crate::models::*;
use crate::UnityCatalogBuilder;
use deltalake_core::DataCatalog;
use httpmock::prelude::*;

#[tokio::test]
Expand Down Expand Up @@ -716,5 +773,15 @@ mod tests {
.await
.unwrap();
assert!(matches!(get_table_response, GetTableResponse::Success(_)));

let storage_location = client
.get_table_storage_location(
Some("catalog_name".to_string()),
"schema_name",
"table_name",
)
.await
.unwrap();
assert!(storage_location.eq_ignore_ascii_case("string"));
}
}
6 changes: 4 additions & 2 deletions crates/catalog-unity/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,14 +266,14 @@ pub struct TableSummary {
pub struct Table {
/// Username of table creator.
#[serde(default)]
pub created_by: String,
pub created_by: Option<String>,
Copy link
Contributor Author

@omkar-foss omkar-foss Jan 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are instances I encountered where created_by and updated_by are null in UC Get Tables response, so we need to keep this as an Option<String> to handle it.


/// Name of table, relative to parent schema.
pub name: String,

/// Username of user who last modified the table.
#[serde(default)]
pub updated_by: String,
pub updated_by: Option<String>,

/// List of schemes whose objects can be referenced without qualification.
#[serde(default)]
Expand All @@ -283,6 +283,7 @@ pub struct Table {
pub data_source_format: DataSourceFormat,

/// Full name of table, in form of catalog_name.schema_name.table_name
#[serde(default)]
pub full_name: String,

/// Name of parent schema relative to its parent catalog.
Expand All @@ -292,6 +293,7 @@ pub struct Table {
pub storage_location: String,

/// Unique identifier of parent metastore.
#[serde(default)]
pub metastore_id: String,
}

Expand Down
3 changes: 3 additions & 0 deletions python/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ doc = false
[dependencies]
delta_kernel.workspace = true

# deltalake_catalog_unity - local crate
deltalake-catalog-unity = { path = "../crates/catalog-unity" }

# arrow
arrow-schema = { workspace = true, features = ["serde"] }

Expand Down
44 changes: 35 additions & 9 deletions python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ use crate::merge::PyMergeBuilder;
use crate::query::PyQueryBuilder;
use crate::schema::{schema_to_pyobject, Field};
use crate::utils::rt;
use deltalake_catalog_unity::UnityCatalogBuilder;

#[derive(FromPyObject)]
enum PartitionFilterValue {
Expand Down Expand Up @@ -171,12 +172,24 @@ impl RawDeltaTable {
log_buffer_size: Option<usize>,
) -> PyResult<Self> {
py.allow_threads(|| {
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri)
.with_io_runtime(IORuntime::default());
let options = storage_options.clone().unwrap_or_default();
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options)
let (table_path, uc_token) = if UnityCatalogBuilder::is_unity_catalog_uri(table_uri) {
match rt().block_on(UnityCatalogBuilder::get_uc_location_and_token(table_uri)) {
Ok(tup) => tup,
Err(err) => return Err(PyRuntimeError::new_err(err.to_string())),
}
} else {
(table_uri.to_string(), "".to_string())
};

let mut options = storage_options.clone().unwrap_or_default();
if !uc_token.is_empty() {
options.insert("UNITY_CATALOG_TEMPORARY_TOKEN".to_string(), uc_token);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have added this key UNITY_CATALOG_TEMPORARY_TOKEN for Polars or similar readers to be able reuse the credentials. cc: @ion-elgreco

Comment on lines +175 to +186
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this would be a better fit for code inside an object store factory

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Als you might have to wait before @hntd187's code gets merged (#3078)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Als you might have to wait before @hntd187's code gets merged (#3078)

Yep will have to wait for #3078 to get merged before this one, the temp credentials part is in there :)

I think this would be a better fit for code inside an object store factory

Yes, this code is currently duplicated in new() and is_deltatable(). Shall I move it to a private function? Or object store factory works too, let me know what you have in mind about that.

}

let mut builder = deltalake::DeltaTableBuilder::from_uri(&table_path)
.with_io_runtime(IORuntime::default());
builder = builder.with_storage_options(options.clone());

if let Some(version) = version {
builder = builder.with_version(version)
}
Expand All @@ -193,7 +206,7 @@ impl RawDeltaTable {
Ok(RawDeltaTable {
_table: Arc::new(Mutex::new(table)),
_config: FsConfig {
root_url: table_uri.into(),
root_url: table_path,
options,
},
})
Expand All @@ -206,10 +219,23 @@ impl RawDeltaTable {
table_uri: &str,
storage_options: Option<HashMap<String, String>>,
) -> PyResult<bool> {
let mut builder = deltalake::DeltaTableBuilder::from_uri(table_uri);
if let Some(storage_options) = storage_options {
builder = builder.with_storage_options(storage_options)
let (table_path, uc_token) = if UnityCatalogBuilder::is_unity_catalog_uri(table_uri) {
match rt().block_on(UnityCatalogBuilder::get_uc_location_and_token(table_uri)) {
Ok(tup) => tup,
Err(err) => return Err(PyRuntimeError::new_err(err.to_string())),
}
} else {
(table_uri.to_string(), "".to_string())
};

let mut options = storage_options.clone().unwrap_or_default();
if !uc_token.is_empty() {
options.insert("UNITY_CATALOG_TEMPORARY_TOKEN".to_string(), uc_token);
}

let mut builder = deltalake::DeltaTableBuilder::from_uri(&table_path)
.with_io_runtime(IORuntime::default());
builder = builder.with_storage_options(options.clone());
Ok(rt()
.block_on(async {
match builder.build() {
Expand Down
Loading