Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ object DataSourceReadOptions {
.defaultValue(QUERY_TYPE_SNAPSHOT_OPT_VAL)
.withAlternatives("hoodie.datasource.view.type")
.withValidValues(QUERY_TYPE_SNAPSHOT_OPT_VAL, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL, QUERY_TYPE_INCREMENTAL_OPT_VAL)
.sinceVersion("0.9.0")
.withDocumentation("Whether data needs to be read, in `" + QUERY_TYPE_INCREMENTAL_OPT_VAL + "` mode (new data since an instantTime) " +
"(or) `" + QUERY_TYPE_READ_OPTIMIZED_OPT_VAL + "` mode (obtain latest view, based on base files) (or) `" + QUERY_TYPE_SNAPSHOT_OPT_VAL + "` mode " +
"(obtain latest view, by merging base and (if any) log files)")
Expand All @@ -84,6 +85,7 @@ object DataSourceReadOptions {
.key("hoodie.datasource.read.paths")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Comma separated list of file paths to read within a Hudi table.")

@Deprecated
Expand All @@ -109,6 +111,7 @@ object DataSourceReadOptions {
val START_COMMIT: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.begin.instanttime")
.noDefaultValue()
.sinceVersion("0.9.0")
.withDocumentation("Required when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL + "`. "
+ "Represents the completion time to start incrementally pulling data from. The completion time here need not necessarily "
+ "correspond to an instant on the timeline. New data written with completion_time >= START_COMMIT are fetched out. "
Expand All @@ -117,6 +120,7 @@ object DataSourceReadOptions {
val END_COMMIT: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.end.instanttime")
.noDefaultValue()
.sinceVersion("0.9.0")
.withDocumentation("Used when `" + QUERY_TYPE.key() + "` is set to `" + QUERY_TYPE_INCREMENTAL_OPT_VAL
+ "`. Represents the completion time to limit incrementally fetched data to. When not specified latest commit "
+ "completion time from timeline is assumed by default. When specified, new data written with "
Expand All @@ -126,23 +130,27 @@ object DataSourceReadOptions {
val STREAMING_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.streaming.table.version")
.noDefaultValue()
.sinceVersion("1.0.0")
.withDocumentation("The table version assumed for streaming read")

val INCREMENTAL_READ_TABLE_VERSION: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.table.version")
.noDefaultValue()
.sinceVersion("1.0.0")
.withDocumentation("The table version assumed for incremental read")

val INCREMENTAL_READ_SCHEMA_USE_END_INSTANTTIME: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.schema.use.end.instanttime")
.defaultValue("false")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Uses end instant schema when incrementally fetched data to. Default: users latest instant schema.")

val PUSH_DOWN_INCR_FILTERS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.read.incr.filters")
.defaultValue("")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("For use-cases like DeltaStreamer which reads from Hoodie Incremental table and applies "
+ "opaque map functions, filters appearing late in the sequence of transformations cannot be automatically "
+ "pushed down. This option allows setting filters directly on Hoodie Source.")
Expand All @@ -151,20 +159,23 @@ object DataSourceReadOptions {
.key("hoodie.datasource.read.incr.path.glob")
.defaultValue("")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("For the use-cases like users only want to incremental pull from certain partitions "
+ "instead of the full table. This option allows using glob pattern to directly filter on path.")

val INCREMENTAL_READ_SKIP_COMPACT: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.datasource.read.incr.skip_compact")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.0.1")
.withDocumentation("Whether to skip compaction instants and avoid reading compacted base files for streaming "
+ "read to improve read performance.")

val INCREMENTAL_READ_SKIP_CLUSTER: ConfigProperty[Boolean] = ConfigProperty
.key("hoodie.datasource.read.incr.skip_cluster")
.defaultValue(false)
.markAdvanced()
.sinceVersion("1.0.1")
.withDocumentation("Whether to skip clustering instants to avoid reading base files of clustering operations "
+ "for streaming read to improve read performance.")

Expand Down Expand Up @@ -346,6 +357,7 @@ object DataSourceWriteOptions {
WriteOperationType.COMPACT.value,
WriteOperationType.ALTER_SCHEMA.value
)
.sinceVersion("0.9.0")
.withDocumentation("Whether to do upsert, insert or bulk_insert for the write operation. " +
"Use bulk_insert to load new data into a table, and there on use upsert/insert. " +
"bulk insert uses a disk based write path to scale to load large inputs without need to cache it.")
Expand All @@ -356,6 +368,7 @@ object DataSourceWriteOptions {
val TABLE_TYPE: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.table.type")
.defaultValue(COW_TABLE_TYPE_OPT_VAL)
.sinceVersion("0.9.0")
.withValidValues(COW_TABLE_TYPE_OPT_VAL, MOR_TABLE_TYPE_OPT_VAL)
.withAlternatives("hoodie.datasource.write.storage.type")
.withDocumentation("The table type for the underlying data, for this write. This can’t change between writes.")
Expand Down Expand Up @@ -411,6 +424,7 @@ object DataSourceWriteOptions {
.key(HoodieTableConfig.HOODIE_WRITE_TABLE_NAME_KEY)
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Table name for the datasource write. Also used to register the table into meta stores.")

/**
Expand Down Expand Up @@ -482,6 +496,7 @@ object DataSourceWriteOptions {
.defaultValue(classOf[SimpleKeyGenerator].getName)
.withInferFunction(keyGeneratorInferFunc)
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Key generator class, that implements `org.apache.hudi.keygen.KeyGenerator`")

val KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED: ConfigProperty[String] = KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED
Expand All @@ -502,6 +517,7 @@ object DataSourceWriteOptions {
})
)
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("When set to true, will perform write operations directly using the spark native " +
"`Row` representation, avoiding any additional conversion costs.")

Expand Down Expand Up @@ -533,6 +549,7 @@ object DataSourceWriteOptions {
.key("hoodie.datasource.write.commitmeta.key.prefix")
.defaultValue("_")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Option keys beginning with this prefix, are automatically added to the commit/deltacommit metadata. " +
"This is useful to store checkpointing information, in a consistent way with the hudi timeline")

Expand All @@ -541,6 +558,7 @@ object DataSourceWriteOptions {
.key("hoodie.datasource.write.insert.drop.duplicates")
.defaultValue("false")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("If set to true, records from the incoming dataframe will not overwrite existing records with the same key during the write operation. " +
"<br /> **Note** Just for Insert operation in Spark SQL writing since 0.14.0, users can switch to the config `hoodie.datasource.insert.dup.policy` instead " +
"for a simplified duplicate handling experience. The new config will be incorporated into all other writing flows and this config will be fully deprecated " +
Expand All @@ -550,18 +568,21 @@ object DataSourceWriteOptions {
.key("hoodie.datasource.write.partitions.to.delete")
.noDefaultValue()
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Comma separated list of partitions to delete. Allows use of wildcard *")

val STREAMING_RETRY_CNT: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.retry.count")
.defaultValue("3")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Config to indicate how many times streaming job should retry for a failed micro batch.")

val STREAMING_RETRY_INTERVAL_MS: ConfigProperty[String] = ConfigProperty
.key("hoodie.datasource.write.streaming.retry.interval.ms")
.defaultValue("2000")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation(" Config to indicate how long (by millisecond) before a retry should issued for failed microbatch")

/**
Expand All @@ -571,6 +592,7 @@ object DataSourceWriteOptions {
.key("hoodie.datasource.write.streaming.ignore.failed.batch")
.defaultValue("false")
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Config to indicate whether to ignore any non exception error (e.g. writestatus error)"
+ " within a streaming microbatch. Turning this on, could hide the write status errors while the spark checkpoint moves ahead." +
"So, would recommend users to use this with caution.")
Expand Down Expand Up @@ -600,6 +622,7 @@ object DataSourceWriteOptions {
.key("hoodie.meta.sync.client.tool.class")
.defaultValue(classOf[HiveSyncTool].getName)
.markAdvanced()
.sinceVersion("0.9.0")
.withDocumentation("Sync tool class name used to sync to metastore. Defaults to Hive.")

@Deprecated
Expand Down
Loading