Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions java/lance-jni/src/blocking_dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ impl BlockingDataset {
operation: Operation,
read_version: Option<u64>,
storage_options: HashMap<String, String>,
enable_v2_manifest_paths: Option<bool>,
) -> Result<Self> {
let inner = RT.block_on(Dataset::commit(
uri,
Expand All @@ -159,7 +160,7 @@ impl BlockingDataset {
}),
None,
Default::default(),
false, // TODO: support enable_v2_manifest_paths
enable_v2_manifest_paths.unwrap_or(false),
))?;
Ok(Self { inner })
}
Expand Down Expand Up @@ -290,10 +291,12 @@ impl BlockingDataset {
&mut self,
transaction: Transaction,
store_params: ObjectStoreParams,
enable_v2_manifest_paths: bool
) -> Result<Self> {
let new_dataset = RT.block_on(
CommitBuilder::new(Arc::new(self.clone().inner))
.with_store_params(store_params)
.enable_v2_manifest_paths(enable_v2_manifest_paths)
.execute(transaction),
)?;
Ok(BlockingDataset { inner: new_dataset })
Expand Down Expand Up @@ -337,6 +340,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -351,7 +355,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>(
enable_stable_row_ids,
data_storage_version,
storage_options_obj,
s3_credentials_refresh_offset_seconds_obj
s3_credentials_refresh_offset_seconds_obj,
enable_v2_manifest_paths,
)
)
}
Expand All @@ -369,6 +374,7 @@ fn inner_create_with_ffi_schema<'local>(
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
) -> Result<JObject<'local>> {
let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema;
let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) };
Expand All @@ -388,6 +394,7 @@ fn inner_create_with_ffi_schema<'local>(
JObject::null(), // No provider for schema-only creation
s3_credentials_refresh_offset_seconds_obj,
reader,
enable_v2_manifest_paths,
)
}

Expand Down Expand Up @@ -419,6 +426,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
data_storage_version: JObject, // Optional<String>
storage_options_obj: JObject, // Map<String, String>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -434,7 +442,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>(
data_storage_version,
storage_options_obj,
JObject::null(),
s3_credentials_refresh_offset_seconds_obj
s3_credentials_refresh_offset_seconds_obj,
enable_v2_manifest_paths,
)
)
}
Expand All @@ -454,6 +463,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo
storage_options_obj: JObject, // Map<String, String>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -469,7 +479,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo
data_storage_version,
storage_options_obj,
storage_options_provider_obj,
s3_credentials_refresh_offset_seconds_obj
s3_credentials_refresh_offset_seconds_obj,
enable_v2_manifest_paths,
)
)
}
Expand All @@ -488,6 +499,7 @@ fn inner_create_with_ffi_stream<'local>(
storage_options_obj: JObject, // Map<String, String>
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject, // Optional<Long>
enable_v2_manifest_paths: JObject, // Optional<Boolean>
) -> Result<JObject<'local>> {
let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream;
let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?;
Expand All @@ -504,6 +516,7 @@ fn inner_create_with_ffi_stream<'local>(
storage_options_provider_obj,
s3_credentials_refresh_offset_seconds_obj,
reader,
enable_v2_manifest_paths,
)
}

Expand All @@ -521,6 +534,7 @@ fn create_dataset<'local>(
storage_options_provider_obj: JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: JObject,
reader: impl RecordBatchReader + Send + 'static,
enable_v2_manifest_paths: JObject,
) -> Result<JObject<'local>> {
let path_str = path.extract(env)?;

Expand All @@ -535,6 +549,7 @@ fn create_dataset<'local>(
&storage_options_obj,
&storage_options_provider_obj,
&s3_credentials_refresh_offset_seconds_obj,
&enable_v2_manifest_paths,
)?;

let dataset = BlockingDataset::write(reader, &path_str, Some(write_params))?;
Expand Down Expand Up @@ -613,6 +628,7 @@ pub extern "system" fn Java_org_lance_Dataset_commitAppend<'local>(
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
enable_v2_manifest_paths_obj: JObject, // Optional<Boolean>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -621,7 +637,8 @@ pub extern "system" fn Java_org_lance_Dataset_commitAppend<'local>(
path,
read_version_obj,
fragments_obj,
storage_options_obj
storage_options_obj,
enable_v2_manifest_paths_obj,
)
)
}
Expand All @@ -632,6 +649,7 @@ pub fn inner_commit_append<'local>(
read_version_obj: JObject, // Optional<Long>
fragment_objs: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
enable_v2_manifest_paths_obj: JObject, // Optional<Boolean>
) -> Result<JObject<'local>> {
let fragment_objs = import_vec(env, &fragment_objs)?;
let mut fragments = Vec::with_capacity(fragment_objs.len());
Expand All @@ -642,7 +660,8 @@ pub fn inner_commit_append<'local>(
let path_str = path.extract(env)?;
let read_version = env.get_u64_opt(&read_version_obj)?;
let storage_options = extract_storage_options(env, &storage_options_obj)?;
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?;
let enable_v2_manifest_paths = env.get_boolean_opt(&enable_v2_manifest_paths_obj)?;
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options, enable_v2_manifest_paths)?;
dataset.into_java(env)
}

Expand All @@ -655,6 +674,7 @@ pub extern "system" fn Java_org_lance_Dataset_commitOverwrite<'local>(
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
enable_v2_manifest_paths_obj: JObject, // Optional<Boolean>
) -> JObject<'local> {
ok_or_throw!(
env,
Expand All @@ -664,7 +684,8 @@ pub extern "system" fn Java_org_lance_Dataset_commitOverwrite<'local>(
arrow_schema_addr,
read_version_obj,
fragments_obj,
storage_options_obj
storage_options_obj,
enable_v2_manifest_paths_obj,
)
)
}
Expand All @@ -676,6 +697,7 @@ pub fn inner_commit_overwrite<'local>(
read_version_obj: JObject, // Optional<Long>
fragments_obj: JObject, // List<FragmentMetadata>
storage_options_obj: JObject, // Map<String, String>
enable_v2_manifest_paths_obj: JObject, // Optional<Boolean>
) -> Result<JObject<'local>> {
let fragment_objs = import_vec(env, &fragments_obj)?;
let mut fragments = Vec::with_capacity(fragment_objs.len());
Expand All @@ -697,7 +719,8 @@ pub fn inner_commit_overwrite<'local>(
let read_version = env.get_u64_opt(&read_version_obj)?;
let jmap = JMap::from_env(env, &storage_options_obj)?;
let storage_options = to_rust_map(env, &jmap)?;
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?;
let enable_v2_manifest_paths = env.get_boolean_opt(&enable_v2_manifest_paths_obj)?;
let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options, enable_v2_manifest_paths)?;
dataset.into_java(env)
}

Expand Down
1 change: 1 addition & 0 deletions java/lance-jni/src/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@ fn create_fragment<'a>(
&storage_options_obj,
&storage_options_provider_obj,
&s3_credentials_refresh_offset_seconds_obj,
&JObject::null(),
)?;

let fragments = RT.block_on(FileFragment::create_fragments(
Expand Down
8 changes: 7 additions & 1 deletion java/lance-jni/src/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,12 @@ fn inner_commit_transaction<'local>(
let write_param_jmap = JMap::from_env(env, &write_param_jobj)?;
let mut write_param = to_rust_map(env, &write_param_jmap)?;

// Extract enable_v2_manifest_paths from write_param
let enable_v2_manifest_paths = match write_param.get("enable_v2_manifest_paths") {
Some(value) => value.to_string().to_lowercase().eq("true"),
None => false, // Default to false if the key doesn't exist
};

// Extract s3_credentials_refresh_offset_seconds from write_param
let s3_credentials_refresh_offset = write_param
.remove("s3_credentials_refresh_offset_seconds")
Expand All @@ -702,7 +708,7 @@ fn inner_commit_transaction<'local>(
let new_blocking_ds = {
let mut dataset_guard =
unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?;
dataset_guard.commit_transaction(transaction, store_params)?
dataset_guard.commit_transaction(transaction, store_params, enable_v2_manifest_paths)?
};
new_blocking_ds.into_java(env)
}
Expand Down
4 changes: 4 additions & 0 deletions java/lance-jni/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ pub fn extract_write_params(
storage_options_obj: &JObject,
storage_options_provider_obj: &JObject, // Optional<StorageOptionsProvider>
s3_credentials_refresh_offset_seconds_obj: &JObject, // Optional<Long>
enable_v2_manifest_paths: &JObject, // Optional<Boolean>
) -> Result<WriteParams> {
let mut write_params = WriteParams::default();

Expand Down Expand Up @@ -98,6 +99,9 @@ pub fn extract_write_params(
s3_credentials_refresh_offset,
..Default::default()
});
if let Some(enable_v2_manifest_paths_val) = env.get_boolean_opt(enable_v2_manifest_paths)? {
write_params.enable_v2_manifest_paths = enable_v2_manifest_paths_val;
}
Ok(write_params)
}

Expand Down
15 changes: 10 additions & 5 deletions java/src/main/java/org/lance/Dataset.java
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ public static Dataset create(
params.getEnableStableRowIds(),
params.getDataStorageVersion(),
params.getStorageOptions(),
params.getS3CredentialsRefreshOffsetSeconds());
params.getS3CredentialsRefreshOffsetSeconds(),
params.getEnableV2ManifestPaths());
dataset.allocator = allocator;
return dataset;
}
Expand Down Expand Up @@ -199,7 +200,8 @@ static Dataset create(
params.getDataStorageVersion(),
params.getStorageOptions(),
Optional.ofNullable(storageOptionsProvider),
params.getS3CredentialsRefreshOffsetSeconds());
params.getS3CredentialsRefreshOffsetSeconds(),
params.getEnableV2ManifestPaths());
dataset.allocator = allocator;
return dataset;
}
Expand All @@ -214,7 +216,8 @@ private static native Dataset createWithFfiSchema(
Optional<Boolean> enableStableRowIds,
Optional<String> dataStorageVersion,
Map<String, String> storageOptions,
Optional<Long> s3CredentialsRefreshOffsetSeconds);
Optional<Long> s3CredentialsRefreshOffsetSeconds,
Optional<Boolean> enableV2ManifestPaths);

private static native Dataset createWithFfiStream(
long arrowStreamMemoryAddress,
Expand All @@ -226,7 +229,8 @@ private static native Dataset createWithFfiStream(
Optional<Boolean> enableStableRowIds,
Optional<String> dataStorageVersion,
Map<String, String> storageOptions,
Optional<Long> s3CredentialsRefreshOffsetSeconds);
Optional<Long> s3CredentialsRefreshOffsetSeconds,
Optional<Boolean> enableV2ManifestPaths);

private static native Dataset createWithFfiStreamAndProvider(
long arrowStreamMemoryAddress,
Expand All @@ -239,7 +243,8 @@ private static native Dataset createWithFfiStreamAndProvider(
Optional<String> dataStorageVersion,
Map<String, String> storageOptions,
Optional<StorageOptionsProvider> storageOptionsProvider,
Optional<Long> s3CredentialsRefreshOffsetSeconds);
Optional<Long> s3CredentialsRefreshOffsetSeconds,
Optional<Boolean> enableV2ManifestPaths);

/**
* Open a dataset from the specified path.
Expand Down
14 changes: 14 additions & 0 deletions java/src/main/java/org/lance/WriteDatasetBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public class WriteDatasetBuilder {
private Optional<Boolean> enableStableRowIds = Optional.empty();
private Optional<WriteParams.LanceFileVersion> dataStorageVersion = Optional.empty();
private Optional<Long> s3CredentialsRefreshOffsetSeconds = Optional.empty();
private Optional<Boolean> enableV2ManifestPaths = Optional.empty();

/** Creates a new builder instance. Package-private, use Dataset.write() instead. */
WriteDatasetBuilder() {
Expand Down Expand Up @@ -287,6 +288,17 @@ public WriteDatasetBuilder s3CredentialsRefreshOffsetSeconds(
return this;
}

/**
* Sets enable V2 Manifest path
*
* @param enableV2ManifestPaths Whether to enable v2 Manifest path
* @return this builder instance
*/
public WriteDatasetBuilder enableV2ManifestPaths(boolean enableV2ManifestPaths) {
this.enableV2ManifestPaths = Optional.of(enableV2ManifestPaths);
return this;
}

/**
* Executes the write operation and returns the created dataset.
*
Expand Down Expand Up @@ -397,6 +409,7 @@ private Dataset executeWithNamespace() {
dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion);
s3CredentialsRefreshOffsetSeconds.ifPresent(
paramsBuilder::withS3CredentialsRefreshOffsetSeconds);
enableV2ManifestPaths.ifPresent(paramsBuilder::withEnableV2ManifestPaths);

WriteParams params = paramsBuilder.build();

Expand All @@ -421,6 +434,7 @@ private Dataset executeWithUri() {
dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion);
s3CredentialsRefreshOffsetSeconds.ifPresent(
paramsBuilder::withS3CredentialsRefreshOffsetSeconds);
enableV2ManifestPaths.ifPresent(paramsBuilder::withEnableV2ManifestPaths);

WriteParams params = paramsBuilder.build();

Expand Down
Loading
Loading