diff --git a/java/lance-jni/src/blocking_dataset.rs b/java/lance-jni/src/blocking_dataset.rs index b15132ad00b..1536a7a9119 100644 --- a/java/lance-jni/src/blocking_dataset.rs +++ b/java/lance-jni/src/blocking_dataset.rs @@ -148,6 +148,7 @@ impl BlockingDataset { operation: Operation, read_version: Option, storage_options: HashMap, + enable_v2_manifest_paths: Option, ) -> Result { let inner = RT.block_on(Dataset::commit( uri, @@ -159,7 +160,7 @@ impl BlockingDataset { }), None, Default::default(), - false, // TODO: support enable_v2_manifest_paths + enable_v2_manifest_paths.unwrap_or(false), ))?; Ok(Self { inner }) } @@ -290,10 +291,12 @@ impl BlockingDataset { &mut self, transaction: Transaction, store_params: ObjectStoreParams, + enable_v2_manifest_paths: bool ) -> Result { let new_dataset = RT.block_on( CommitBuilder::new(Arc::new(self.clone().inner)) .with_store_params(store_params) + .enable_v2_manifest_paths(enable_v2_manifest_paths) .execute(transaction), )?; Ok(BlockingDataset { inner: new_dataset }) @@ -337,6 +340,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( data_storage_version: JObject, // Optional storage_options_obj: JObject, // Map s3_credentials_refresh_offset_seconds_obj: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -351,7 +355,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiSchema<'local>( enable_stable_row_ids, data_storage_version, storage_options_obj, - s3_credentials_refresh_offset_seconds_obj + s3_credentials_refresh_offset_seconds_obj, + enable_v2_manifest_paths, ) ) } @@ -369,6 +374,7 @@ fn inner_create_with_ffi_schema<'local>( data_storage_version: JObject, // Optional storage_options_obj: JObject, // Map s3_credentials_refresh_offset_seconds_obj: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional ) -> Result> { let c_schema_ptr = arrow_schema_addr as *mut FFI_ArrowSchema; let c_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema_ptr) }; @@ -388,6 +394,7 @@ fn inner_create_with_ffi_schema<'local>( JObject::null(), // No provider for schema-only creation s3_credentials_refresh_offset_seconds_obj, reader, + enable_v2_manifest_paths, ) } @@ -419,6 +426,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( data_storage_version: JObject, // Optional storage_options_obj: JObject, // Map s3_credentials_refresh_offset_seconds_obj: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -434,7 +442,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStream<'local>( data_storage_version, storage_options_obj, JObject::null(), - s3_credentials_refresh_offset_seconds_obj + s3_credentials_refresh_offset_seconds_obj, + enable_v2_manifest_paths, ) ) } @@ -454,6 +463,7 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo storage_options_obj: JObject, // Map storage_options_provider_obj: JObject, // Optional s3_credentials_refresh_offset_seconds_obj: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -469,7 +479,8 @@ pub extern "system" fn Java_org_lance_Dataset_createWithFfiStreamAndProvider<'lo data_storage_version, storage_options_obj, storage_options_provider_obj, - s3_credentials_refresh_offset_seconds_obj + s3_credentials_refresh_offset_seconds_obj, + enable_v2_manifest_paths, ) ) } @@ -488,6 +499,7 @@ fn inner_create_with_ffi_stream<'local>( storage_options_obj: JObject, // Map storage_options_provider_obj: JObject, // Optional s3_credentials_refresh_offset_seconds_obj: JObject, // Optional + enable_v2_manifest_paths: JObject, // Optional ) -> Result> { let stream_ptr = arrow_array_stream_addr as *mut FFI_ArrowArrayStream; let reader = unsafe { ArrowArrayStreamReader::from_raw(stream_ptr) }?; @@ -504,6 +516,7 @@ fn inner_create_with_ffi_stream<'local>( storage_options_provider_obj, s3_credentials_refresh_offset_seconds_obj, reader, + enable_v2_manifest_paths, ) } @@ -521,6 +534,7 @@ fn create_dataset<'local>( storage_options_provider_obj: JObject, // Optional s3_credentials_refresh_offset_seconds_obj: JObject, reader: impl RecordBatchReader + Send + 'static, + enable_v2_manifest_paths: JObject, ) -> Result> { let path_str = path.extract(env)?; @@ -535,6 +549,7 @@ fn create_dataset<'local>( &storage_options_obj, &storage_options_provider_obj, &s3_credentials_refresh_offset_seconds_obj, + &enable_v2_manifest_paths, )?; let dataset = BlockingDataset::write(reader, &path_str, Some(write_params))?; @@ -613,6 +628,7 @@ pub extern "system" fn Java_org_lance_Dataset_commitAppend<'local>( read_version_obj: JObject, // Optional fragments_obj: JObject, // List storage_options_obj: JObject, // Map + enable_v2_manifest_paths_obj: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -621,7 +637,8 @@ pub extern "system" fn Java_org_lance_Dataset_commitAppend<'local>( path, read_version_obj, fragments_obj, - storage_options_obj + storage_options_obj, + enable_v2_manifest_paths_obj, ) ) } @@ -632,6 +649,7 @@ pub fn inner_commit_append<'local>( read_version_obj: JObject, // Optional fragment_objs: JObject, // List storage_options_obj: JObject, // Map + enable_v2_manifest_paths_obj: JObject, // Optional ) -> Result> { let fragment_objs = import_vec(env, &fragment_objs)?; let mut fragments = Vec::with_capacity(fragment_objs.len()); @@ -642,7 +660,8 @@ pub fn inner_commit_append<'local>( let path_str = path.extract(env)?; let read_version = env.get_u64_opt(&read_version_obj)?; let storage_options = extract_storage_options(env, &storage_options_obj)?; - let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?; + let enable_v2_manifest_paths = env.get_boolean_opt(&enable_v2_manifest_paths_obj)?; + let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options, enable_v2_manifest_paths)?; dataset.into_java(env) } @@ -655,6 +674,7 @@ pub extern "system" fn Java_org_lance_Dataset_commitOverwrite<'local>( read_version_obj: JObject, // Optional fragments_obj: JObject, // List storage_options_obj: JObject, // Map + enable_v2_manifest_paths_obj: JObject, // Optional ) -> JObject<'local> { ok_or_throw!( env, @@ -664,7 +684,8 @@ pub extern "system" fn Java_org_lance_Dataset_commitOverwrite<'local>( arrow_schema_addr, read_version_obj, fragments_obj, - storage_options_obj + storage_options_obj, + enable_v2_manifest_paths_obj, ) ) } @@ -676,6 +697,7 @@ pub fn inner_commit_overwrite<'local>( read_version_obj: JObject, // Optional fragments_obj: JObject, // List storage_options_obj: JObject, // Map + enable_v2_manifest_paths_obj: JObject, // Optional ) -> Result> { let fragment_objs = import_vec(env, &fragments_obj)?; let mut fragments = Vec::with_capacity(fragment_objs.len()); @@ -697,7 +719,8 @@ pub fn inner_commit_overwrite<'local>( let read_version = env.get_u64_opt(&read_version_obj)?; let jmap = JMap::from_env(env, &storage_options_obj)?; let storage_options = to_rust_map(env, &jmap)?; - let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options)?; + let enable_v2_manifest_paths = env.get_boolean_opt(&enable_v2_manifest_paths_obj)?; + let dataset = BlockingDataset::commit(&path_str, op, read_version, storage_options, enable_v2_manifest_paths)?; dataset.into_java(env) } diff --git a/java/lance-jni/src/fragment.rs b/java/lance-jni/src/fragment.rs index 775ad0d906d..9dc2a0849e2 100644 --- a/java/lance-jni/src/fragment.rs +++ b/java/lance-jni/src/fragment.rs @@ -257,6 +257,7 @@ fn create_fragment<'a>( &storage_options_obj, &storage_options_provider_obj, &s3_credentials_refresh_offset_seconds_obj, + &JObject::null(), )?; let fragments = RT.block_on(FileFragment::create_fragments( diff --git a/java/lance-jni/src/transaction.rs b/java/lance-jni/src/transaction.rs index 32ffe3c99e0..108fdd8a677 100644 --- a/java/lance-jni/src/transaction.rs +++ b/java/lance-jni/src/transaction.rs @@ -676,6 +676,12 @@ fn inner_commit_transaction<'local>( let write_param_jmap = JMap::from_env(env, &write_param_jobj)?; let mut write_param = to_rust_map(env, &write_param_jmap)?; + // Extract enable_v2_manifest_paths from write_param + let enable_v2_manifest_paths = match write_param.get("enable_v2_manifest_paths") { + Some(value) => value.to_string().to_lowercase().eq("true"), + None => false, // Default to false if the key doesn't exist + }; + // Extract s3_credentials_refresh_offset_seconds from write_param let s3_credentials_refresh_offset = write_param .remove("s3_credentials_refresh_offset_seconds") @@ -702,7 +708,7 @@ fn inner_commit_transaction<'local>( let new_blocking_ds = { let mut dataset_guard = unsafe { env.get_rust_field::<_, _, BlockingDataset>(&java_dataset, NATIVE_DATASET) }?; - dataset_guard.commit_transaction(transaction, store_params)? + dataset_guard.commit_transaction(transaction, store_params, enable_v2_manifest_paths)? }; new_blocking_ds.into_java(env) } diff --git a/java/lance-jni/src/utils.rs b/java/lance-jni/src/utils.rs index dc6f1e6e60f..7868687ad81 100644 --- a/java/lance-jni/src/utils.rs +++ b/java/lance-jni/src/utils.rs @@ -49,6 +49,7 @@ pub fn extract_write_params( storage_options_obj: &JObject, storage_options_provider_obj: &JObject, // Optional s3_credentials_refresh_offset_seconds_obj: &JObject, // Optional + enable_v2_manifest_paths: &JObject, // Optional ) -> Result { let mut write_params = WriteParams::default(); @@ -98,6 +99,9 @@ pub fn extract_write_params( s3_credentials_refresh_offset, ..Default::default() }); + if let Some(enable_v2_manifest_paths_val) = env.get_boolean_opt(enable_v2_manifest_paths)? { + write_params.enable_v2_manifest_paths = enable_v2_manifest_paths_val; + } Ok(write_params) } diff --git a/java/src/main/java/org/lance/Dataset.java b/java/src/main/java/org/lance/Dataset.java index 21572214eda..16e2a87d7fe 100644 --- a/java/src/main/java/org/lance/Dataset.java +++ b/java/src/main/java/org/lance/Dataset.java @@ -140,7 +140,8 @@ public static Dataset create( params.getEnableStableRowIds(), params.getDataStorageVersion(), params.getStorageOptions(), - params.getS3CredentialsRefreshOffsetSeconds()); + params.getS3CredentialsRefreshOffsetSeconds(), + params.getEnableV2ManifestPaths()); dataset.allocator = allocator; return dataset; } @@ -199,7 +200,8 @@ static Dataset create( params.getDataStorageVersion(), params.getStorageOptions(), Optional.ofNullable(storageOptionsProvider), - params.getS3CredentialsRefreshOffsetSeconds()); + params.getS3CredentialsRefreshOffsetSeconds(), + params.getEnableV2ManifestPaths()); dataset.allocator = allocator; return dataset; } @@ -214,7 +216,8 @@ private static native Dataset createWithFfiSchema( Optional enableStableRowIds, Optional dataStorageVersion, Map storageOptions, - Optional s3CredentialsRefreshOffsetSeconds); + Optional s3CredentialsRefreshOffsetSeconds, + Optional enableV2ManifestPaths); private static native Dataset createWithFfiStream( long arrowStreamMemoryAddress, @@ -226,7 +229,8 @@ private static native Dataset createWithFfiStream( Optional enableStableRowIds, Optional dataStorageVersion, Map storageOptions, - Optional s3CredentialsRefreshOffsetSeconds); + Optional s3CredentialsRefreshOffsetSeconds, + Optional enableV2ManifestPaths); private static native Dataset createWithFfiStreamAndProvider( long arrowStreamMemoryAddress, @@ -239,7 +243,8 @@ private static native Dataset createWithFfiStreamAndProvider( Optional dataStorageVersion, Map storageOptions, Optional storageOptionsProvider, - Optional s3CredentialsRefreshOffsetSeconds); + Optional s3CredentialsRefreshOffsetSeconds, + Optional enableV2ManifestPaths); /** * Open a dataset from the specified path. diff --git a/java/src/main/java/org/lance/WriteDatasetBuilder.java b/java/src/main/java/org/lance/WriteDatasetBuilder.java index 74f8c298fe8..b319ee1fb8d 100644 --- a/java/src/main/java/org/lance/WriteDatasetBuilder.java +++ b/java/src/main/java/org/lance/WriteDatasetBuilder.java @@ -79,6 +79,7 @@ public class WriteDatasetBuilder { private Optional enableStableRowIds = Optional.empty(); private Optional dataStorageVersion = Optional.empty(); private Optional s3CredentialsRefreshOffsetSeconds = Optional.empty(); + private Optional enableV2ManifestPaths = Optional.empty(); /** Creates a new builder instance. Package-private, use Dataset.write() instead. */ WriteDatasetBuilder() { @@ -287,6 +288,17 @@ public WriteDatasetBuilder s3CredentialsRefreshOffsetSeconds( return this; } + /** + * Sets enable V2 Manifest path + * + * @param enableV2ManifestPaths Whether to enable v2 Manifest path + * @return this builder instance + */ + public WriteDatasetBuilder enableV2ManifestPaths(boolean enableV2ManifestPaths) { + this.enableV2ManifestPaths = Optional.of(enableV2ManifestPaths); + return this; + } + /** * Executes the write operation and returns the created dataset. * @@ -397,6 +409,7 @@ private Dataset executeWithNamespace() { dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion); s3CredentialsRefreshOffsetSeconds.ifPresent( paramsBuilder::withS3CredentialsRefreshOffsetSeconds); + enableV2ManifestPaths.ifPresent(paramsBuilder::withEnableV2ManifestPaths); WriteParams params = paramsBuilder.build(); @@ -421,6 +434,7 @@ private Dataset executeWithUri() { dataStorageVersion.ifPresent(paramsBuilder::withDataStorageVersion); s3CredentialsRefreshOffsetSeconds.ifPresent( paramsBuilder::withS3CredentialsRefreshOffsetSeconds); + enableV2ManifestPaths.ifPresent(paramsBuilder::withEnableV2ManifestPaths); WriteParams params = paramsBuilder.build(); diff --git a/java/src/main/java/org/lance/WriteParams.java b/java/src/main/java/org/lance/WriteParams.java index a0ce1c8c375..abc4acbabd0 100644 --- a/java/src/main/java/org/lance/WriteParams.java +++ b/java/src/main/java/org/lance/WriteParams.java @@ -57,6 +57,7 @@ public String getVersionString() { private final Optional dataStorageVersion; private Map storageOptions = new HashMap<>(); private final Optional s3CredentialsRefreshOffsetSeconds; + private final Optional enableV2ManifestPaths; private WriteParams( Optional maxRowsPerFile, @@ -66,7 +67,8 @@ private WriteParams( Optional enableStableRowIds, Optional dataStorageVersion, Map storageOptions, - Optional s3CredentialsRefreshOffsetSeconds) { + Optional s3CredentialsRefreshOffsetSeconds, + Optional enableV2ManifestPaths) { this.maxRowsPerFile = maxRowsPerFile; this.maxRowsPerGroup = maxRowsPerGroup; this.maxBytesPerFile = maxBytesPerFile; @@ -75,6 +77,7 @@ private WriteParams( this.dataStorageVersion = dataStorageVersion; this.storageOptions = storageOptions; this.s3CredentialsRefreshOffsetSeconds = s3CredentialsRefreshOffsetSeconds; + this.enableV2ManifestPaths = enableV2ManifestPaths; } public Optional getMaxRowsPerFile() { @@ -114,6 +117,10 @@ public Optional getS3CredentialsRefreshOffsetSeconds() { return s3CredentialsRefreshOffsetSeconds; } + public Optional getEnableV2ManifestPaths() { + return enableV2ManifestPaths; + } + @Override public String toString() { return MoreObjects.toStringHelper(this) @@ -135,6 +142,7 @@ public static class Builder { private Optional dataStorageVersion = Optional.empty(); private Map storageOptions = new HashMap<>(); private Optional s3CredentialsRefreshOffsetSeconds = Optional.empty(); + private Optional enableV2ManifestPaths = Optional.empty(); public Builder withMaxRowsPerFile(int maxRowsPerFile) { this.maxRowsPerFile = Optional.of(maxRowsPerFile); @@ -176,6 +184,11 @@ public Builder withS3CredentialsRefreshOffsetSeconds(long s3CredentialsRefreshOf return this; } + public Builder withEnableV2ManifestPaths(boolean enableV2ManifestPaths) { + this.enableV2ManifestPaths = Optional.of(enableV2ManifestPaths); + return this; + } + public WriteParams build() { return new WriteParams( maxRowsPerFile, @@ -185,7 +198,8 @@ public WriteParams build() { enableStableRowIds, dataStorageVersion, storageOptions, - s3CredentialsRefreshOffsetSeconds); + s3CredentialsRefreshOffsetSeconds, + enableV2ManifestPaths); } } }