Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions java/lance-jni/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

97 changes: 80 additions & 17 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use lance_index::scalar::lance_format::LanceIndexStore;
use lance_index::DatasetIndexExt;
use lance_index::{IndexParams, IndexType};
use lance_io::object_store::ObjectStoreRegistry;
use lance_io::object_store::StorageOptionsAccessor;
use lance_io::object_store::StorageOptionsProvider;
use std::collections::HashMap;
use std::future::IntoFuture;
Expand Down Expand Up @@ -75,16 +76,18 @@ pub struct BlockingDataset {
}

impl BlockingDataset {
/// Get the storage options provider that was used when opening this dataset
pub fn get_storage_options_provider(&self) -> Option<Arc<dyn StorageOptionsProvider>> {
self.inner.storage_options_provider()
/// Get the storage options accessor that was used when opening this dataset
pub fn get_storage_options_accessor(&self) -> Option<Arc<StorageOptionsAccessor>> {
self.inner.storage_options_accessor()
}

pub fn drop(uri: &str, storage_options: HashMap<String, String>) -> Result<()> {
RT.block_on(async move {
let registry = Arc::new(ObjectStoreRegistry::default());
let object_store_params = ObjectStoreParams {
storage_options: Some(storage_options.clone()),
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::new_with_options(
storage_options.clone(),
))),
..Default::default()
};
let (object_store, path) =
Expand Down Expand Up @@ -118,18 +121,31 @@ impl BlockingDataset {
storage_options_provider: Option<Arc<dyn StorageOptionsProvider>>,
s3_credentials_refresh_offset_seconds: Option<u64>,
) -> Result<Self> {
let mut store_params = ObjectStoreParams {
let s3_credentials_refresh_offset = s3_credentials_refresh_offset_seconds
.map(std::time::Duration::from_secs)
.unwrap_or(std::time::Duration::from_secs(60));

// Create StorageOptionsAccessor with options and optional provider
let storage_options_accessor = match storage_options_provider {
Some(provider) => Some(Arc::new(
StorageOptionsAccessor::new_with_options_and_provider(
storage_options.clone(),
provider,
s3_credentials_refresh_offset,
),
)),
None => Some(Arc::new(StorageOptionsAccessor::new_with_options(
storage_options.clone(),
))),
};

let store_params = ObjectStoreParams {
block_size: block_size.map(|size| size as usize),
storage_options: Some(storage_options.clone()),
storage_options_accessor: storage_options_accessor.clone(),
s3_credentials_refresh_offset,
..Default::default()
};
if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds {
store_params.s3_credentials_refresh_offset =
std::time::Duration::from_secs(offset_seconds);
}
if let Some(provider) = storage_options_provider.clone() {
store_params.storage_options_provider = Some(provider);
}

let params = ReadParams {
index_cache_size_bytes: index_cache_size_bytes as usize,
metadata_cache_size_bytes: metadata_cache_size_bytes as usize,
Expand All @@ -143,8 +159,8 @@ impl BlockingDataset {
builder = builder.with_version(ver as u64);
}
builder = builder.with_storage_options(storage_options);
if let Some(provider) = storage_options_provider.clone() {
builder = builder.with_storage_options_provider(provider)
if let Some(accessor) = storage_options_accessor {
builder = builder.with_storage_options_accessor(accessor)
}
if let Some(offset_seconds) = s3_credentials_refresh_offset_seconds {
builder = builder
Expand All @@ -170,7 +186,9 @@ impl BlockingDataset {
operation,
read_version,
Some(ObjectStoreParams {
storage_options: Some(storage_options),
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::new_with_options(
storage_options,
))),
..Default::default()
}),
None,
Expand Down Expand Up @@ -1394,7 +1412,9 @@ fn inner_shallow_clone<'local>(
storage_options
.map(|options| {
Some(ObjectStoreParams {
storage_options: Some(options),
storage_options_accessor: Some(Arc::new(
StorageOptionsAccessor::new_with_options(options),
)),
..Default::default()
})
})
Expand Down Expand Up @@ -1546,6 +1566,49 @@ fn inner_get_config<'local>(
Ok(java_hashmap)
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeGetStorageOptions<'local>(
mut env: JNIEnv<'local>,
java_dataset: JObject,
) -> JObject<'local> {
ok_or_throw!(env, inner_get_storage_options(&mut env, java_dataset))
}

fn inner_get_storage_options<'local>(
env: &mut JNIEnv<'local>,
java_dataset: JObject,
) -> Result<JObject<'local>> {
let options = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(java_dataset, NATIVE_DATASET) }?;
RT.block_on(dataset_guard.inner.storage_options())
.map_err(|e| Error::io_error(e.to_string()))?
};

let java_hashmap = env
.new_object("java/util/HashMap", "()V", &[])
.expect("Failed to create Java HashMap");

for (k, v) in options {
let java_key = env
.new_string(&k)
.expect("Failed to create Java String (key)");
let java_value = env
.new_string(&v)
.expect("Failed to create Java String (value)");

env.call_method(
&java_hashmap,
"put",
"(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;",
&[JValue::Object(&java_key), JValue::Object(&java_value)],
)
.expect("Failed to call HashMap.put()");
}

Ok(java_hashmap)
}

#[no_mangle]
pub extern "system" fn Java_org_lance_Dataset_nativeTake(
mut env: JNIEnv,
Expand Down
6 changes: 4 additions & 2 deletions java/lance-jni/src/file_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use lance_core::cache::LanceCache;
use lance_core::datatypes::Schema;
use lance_encoding::decoder::{DecoderPlugins, FilterExpression};
use lance_file::reader::{FileReader, FileReaderOptions, ReaderProjection};
use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry};
use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor};
use lance_io::{
scheduler::{ScanScheduler, SchedulerConfig},
utils::CachedFileSize,
Expand Down Expand Up @@ -112,7 +112,9 @@ fn inner_open<'local>(
let storage_options = to_rust_map(env, &jmap)?;
let reader = RT.block_on(async move {
let object_params = ObjectStoreParams {
storage_options: Some(storage_options),
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::new_with_options(
storage_options,
))),
..Default::default()
};
let (obj_store, path) = ObjectStore::from_uri_and_params(
Expand Down
6 changes: 4 additions & 2 deletions java/lance-jni/src/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use lance_file::{
version::LanceFileVersion,
writer::{FileWriter, FileWriterOptions},
};
use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry};
use lance_io::object_store::{ObjectStoreParams, ObjectStoreRegistry, StorageOptionsAccessor};

pub const NATIVE_WRITER: &str = "nativeFileWriterHandle";

Expand Down Expand Up @@ -94,7 +94,9 @@ fn inner_open<'local>(

let writer = RT.block_on(async move {
let object_params = ObjectStoreParams {
storage_options: Some(storage_options),
storage_options_accessor: Some(Arc::new(StorageOptionsAccessor::new_with_options(
storage_options,
))),
..Default::default()
};
let (obj_store, path) = ObjectStore::from_uri_and_params(
Expand Down
17 changes: 11 additions & 6 deletions java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use lance::dataset::transaction::{
UpdateMap, UpdateMapEntry, UpdateMode,
};
use lance::io::ObjectStoreParams;
use lance_io::object_store::StorageOptionsAccessor;
use lance::table::format::{Fragment, IndexMetadata};
use lance_core::datatypes::Schema as LanceSchema;
use prost::Message;
Expand Down Expand Up @@ -752,17 +753,21 @@ fn inner_commit_transaction<'local>(
.map(std::time::Duration::from_secs)
.unwrap_or_else(|| std::time::Duration::from_secs(10));

// Get the Dataset's storage_options_provider
let storage_options_provider = {
// Get the Dataset's storage_options_accessor
let existing_accessor = {
let dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?;
dataset_guard.get_storage_options_provider()
dataset_guard.get_storage_options_accessor()
};

// Build ObjectStoreParams using write_param for storage_options and provider from Dataset
// Build ObjectStoreParams using write_param for storage_options and accessor from Dataset
let storage_options_accessor = Some(Arc::new(existing_accessor.map_or_else(
|| StorageOptionsAccessor::new_with_options(write_param.clone()),
|a| a.with_updated_initial_options(write_param.clone(), s3_credentials_refresh_offset),
)));

let store_params = ObjectStoreParams {
storage_options: Some(write_param),
storage_options_provider,
storage_options_accessor,
s3_credentials_refresh_offset,
..Default::default()
};
Expand Down
23 changes: 17 additions & 6 deletions java/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use crate::storage_options::JavaStorageOptionsProvider;

use crate::traits::FromJObjectWithEnv;
use lance_index::vector::Query;
use lance_io::object_store::StorageOptionsProvider;
use lance_io::object_store::{StorageOptionsAccessor, StorageOptionsProvider};
use std::collections::HashMap;
use std::str::FromStr;

Expand Down Expand Up @@ -84,15 +84,27 @@ pub fn extract_write_params(
JavaStorageOptionsProvider::new(env, provider_obj)
})?;

let storage_options_provider_arc: Option<Arc<dyn StorageOptionsProvider>> =
storage_options_provider.map(|v| Arc::new(v) as Arc<dyn StorageOptionsProvider>);

// Extract s3_credentials_refresh_offset_seconds if present
let s3_credentials_refresh_offset = env
.get_long_opt(s3_credentials_refresh_offset_seconds_obj)?
.map(|v| std::time::Duration::from_secs(v as u64))
.unwrap_or_else(|| std::time::Duration::from_secs(10));

// Create StorageOptionsAccessor with options and optional provider
let storage_options_accessor = match storage_options_provider {
Some(provider) => {
let provider_arc = Arc::new(provider) as Arc<dyn StorageOptionsProvider>;
Some(Arc::new(StorageOptionsAccessor::new_with_options_and_provider(
storage_options.clone(),
provider_arc,
s3_credentials_refresh_offset,
)))
}
None => Some(Arc::new(StorageOptionsAccessor::new_with_options(
storage_options.clone(),
))),
};

if let Some(initial_bases) =
env.get_list_opt(initial_bases, |env, elem| elem.extract_object(env))?
{
Expand All @@ -104,8 +116,7 @@ pub fn extract_write_params(
}

write_params.store_params = Some(ObjectStoreParams {
storage_options: Some(storage_options),
storage_options_provider: storage_options_provider_arc,
storage_options_accessor,
s3_credentials_refresh_offset,
..Default::default()
});
Expand Down
18 changes: 18 additions & 0 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,24 @@ public Map<String, String> getConfig() {

private native Map<String, String> nativeGetConfig();

/**
* Get the current storage options.
*
* <p>Returns a map combining the initial storage_options with any overrides from the
* storage_options_provider. Options from the provider are cached until they expire (based on
* expires_at_millis).
*
* @return the current storage options
*/
public Map<String, String> getStorageOptions() {
try (LockManager.ReadLock readLock = lockManager.acquireReadLock()) {
Preconditions.checkArgument(nativeDatasetHandle != 0, "Dataset is closed");
return nativeGetStorageOptions();
}
}

private native Map<String, String> nativeGetStorageOptions();

/**
* Compact the dataset to improve performance.
*
Expand Down
48 changes: 41 additions & 7 deletions java/src/main/java/org/lance/OpenDatasetBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,22 @@
* Builder for opening a Dataset.
*
* <p>This builder provides a fluent API for opening datasets either directly from a URI or from a
* LanceNamespace. When using a namespace, the table location and storage options are automatically
* fetched.
* LanceNamespace.
*
* <p>Example usage with URI:
* <h3>Namespace Behavior</h3>
*
* <p>When {@code namespace} and {@code tableId} are provided (instead of {@code uri}), the builder
* automatically:
*
* <ol>
* <li>Calls {@code describeTable()} on the namespace to get table metadata
* <li>Uses the returned location as the dataset URI
* <li>Merges namespace storage options with any user-provided storage options (namespace options
* take precedence)
* <li>Creates a {@link LanceNamespaceStorageOptionsProvider} for automatic credential refresh
* </ol>
*
* <h3>Example: URI only</h3>
*
* <pre>{@code
* Dataset dataset = Dataset.open()
Expand All @@ -42,14 +54,30 @@
* .build();
* }</pre>
*
* <p>Example usage with namespace:
* <h3>Example: Namespace</h3>
*
* <pre>{@code
* Dataset dataset = Dataset.open()
* .namespace(myNamespace)
* .tableId(Arrays.asList("my_table"))
* .build();
* }</pre>
*
* <h3>Example: URI with custom storage options provider</h3>
*
* <p>If you need to use a specific URI with namespace-based credential refresh, create the provider
* manually:
*
* <pre>{@code
* LanceNamespaceStorageOptionsProvider provider =
* new LanceNamespaceStorageOptionsProvider(myNamespace, Arrays.asList("my_table"));
* Dataset dataset = Dataset.open()
* .uri("s3://bucket/table.lance")
* .readOptions(new ReadOptions.Builder()
* .setStorageOptionsProvider(provider)
* .build())
* .build();
* }</pre>
*/
public class OpenDatasetBuilder {
private BufferAllocator allocator;
Expand Down Expand Up @@ -213,9 +241,15 @@ private Dataset buildFromNamespace() {
.setMetadataCacheSizeBytes(options.getMetadataCacheSizeBytes());

if (namespaceStorageOptions != null && !namespaceStorageOptions.isEmpty()) {
LanceNamespaceStorageOptionsProvider storageOptionsProvider =
new LanceNamespaceStorageOptionsProvider(namespace, tableId);
optionsBuilder.setStorageOptionsProvider(storageOptionsProvider);
// Only create namespace provider if user didn't provide one explicitly via ReadOptions
if (!options.getStorageOptionsProvider().isPresent()) {
LanceNamespaceStorageOptionsProvider storageOptionsProvider =
new LanceNamespaceStorageOptionsProvider(namespace, tableId);
optionsBuilder.setStorageOptionsProvider(storageOptionsProvider);
} else {
// User provided their own provider - use it
optionsBuilder.setStorageOptionsProvider(options.getStorageOptionsProvider().get());
}
}

options.getVersion().ifPresent(optionsBuilder::setVersion);
Expand Down
Loading