diff --git a/connect/common/src/main/protobuf/spark/connect/commands.proto b/connect/common/src/main/protobuf/spark/connect/commands.proto index b861598ad39a..71189a3c43a1 100644 --- a/connect/common/src/main/protobuf/spark/connect/commands.proto +++ b/connect/common/src/main/protobuf/spark/connect/commands.proto @@ -240,6 +240,9 @@ message WriteStreamOperationStart { StreamingForeachFunction foreach_writer = 13; StreamingForeachFunction foreach_batch = 14; + + // (Optional) Columns used for clustering the table. + repeated string clustering_column_names = 15; } message StreamingForeachFunction { diff --git a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index e790a25ec97f..2cc396a62ee7 100644 --- a/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -3216,6 +3216,10 @@ class SparkConnectPlanner( writer.partitionBy(writeOp.getPartitioningColumnNamesList.asScala.toList: _*) } + if (writeOp.getClusteringColumnNamesCount > 0) { + writer.clusterBy(writeOp.getClusteringColumnNamesList.asScala.toList: _*) + } + writeOp.getTriggerCase match { case TriggerCase.PROCESSING_TIME_INTERVAL => writer.trigger(Trigger.ProcessingTime(writeOp.getProcessingTimeInterval)) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index fe68f3cb0b57..c8c714047788 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -159,6 +159,23 @@ final class DataStreamWriter[T] private[sql] (ds: Dataset[T]) extends Logging { this } + /** + * Clusters the output by the given columns. If specified, the output is laid out such that + * records with similar values on the clustering column are grouped together in the same file. + * + * Clustering improves query efficiency by allowing queries with predicates on the clustering + * columns to skip unnecessary data. Unlike partitioning, clustering can be used on very high + * cardinality columns. + * + * @since 4.0.0 + */ + @scala.annotation.varargs + def clusterBy(colNames: String*): DataStreamWriter[T] = { + sinkBuilder.clearClusteringColumnNames() + sinkBuilder.addAllClusteringColumnNames(colNames.asJava) + this + } + /** * Adds an output option for the underlying data source. * diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala index fbb2b4af1b12..d79b767e0ec2 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala @@ -268,6 +268,42 @@ class ClientStreamingQuerySuite extends QueryTest with RemoteSparkSession with L } } + test("clusterBy") { + withSQLConf( + "spark.sql.shuffle.partitions" -> "1" // Avoid too many reducers. + ) { + spark.sql("DROP TABLE IF EXISTS my_table").collect() + + withTempPath { ckpt => + val q1 = spark.readStream + .format("rate") + .load() + .writeStream + .clusterBy("value") + .option("checkpointLocation", ckpt.getCanonicalPath) + .toTable("my_table") + + try { + q1.processAllAvailable() + eventually(timeout(30.seconds)) { + checkAnswer( + spark.sql("DESCRIBE my_table"), + Seq( + Row("timestamp", "timestamp", null), + Row("value", "bigint", null), + Row("# Clustering Information", "", ""), + Row("# col_name", "data_type", "comment"), + Row("value", "bigint", null))) + assert(spark.table("my_sink").count() > 0) + } + } finally { + q1.stop() + spark.sql("DROP TABLE my_table") + } + } + } + } + test("throw exception in streaming") { try { val session = spark diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index c126d12b1473..a7227d84ce0e 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -102,7 +102,9 @@ object MimaExcludes { ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.SQLImplicits.session"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.SparkSession#implicits._sqlContext"), // SPARK-48761: Add clusterBy() to CreateTableWriter. - ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.CreateTableWriter.clusterBy") + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.CreateTableWriter.clusterBy"), + // SPARK-48901: Add clusterBy() to DataStreamWriter. + ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.DataStreamWriter.clusterBy") ) // Default exclude rules diff --git a/python/pyspark/sql/connect/proto/commands_pb2.py b/python/pyspark/sql/connect/proto/commands_pb2.py index c24840908f34..43390ffa36d3 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.py +++ b/python/pyspark/sql/connect/proto/commands_pb2.py @@ -35,7 +35,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\r\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12m\n\x17register_table_function\x18\n \x01(\x0b\x32\x33.spark.connect.CommonInlineUserDefinedTableFunctionH\x00R\x15registerTableFunction\x12\x81\x01\n$streaming_query_listener_bus_command\x18\x0b \x01(\x0b\x32/.spark.connect.StreamingQueryListenerBusCommandH\x00R streamingQueryListenerBusCommand\x12\x64\n\x14register_data_source\x18\x0c \x01(\x0b\x32\x30.spark.connect.CommonInlineUserDefinedDataSourceH\x00R\x12registerDataSource\x12t\n\x1f\x63reate_resource_profile_command\x18\r \x01(\x0b\x32+.spark.connect.CreateResourceProfileCommandH\x00R\x1c\x63reateResourceProfileCommand\x12Q\n\x12\x63heckpoint_command\x18\x0e \x01(\x0b\x32 .spark.connect.CheckpointCommandH\x00R\x11\x63heckpointCommand\x12\x84\x01\n%remove_cached_remote_relation_command\x18\x0f \x01(\x0b\x32\x30.spark.connect.RemoveCachedRemoteRelationCommandH\x00R!removeCachedRemoteRelationCommand\x12_\n\x18merge_into_table_command\x18\x10 \x01(\x0b\x32$.spark.connect.MergeIntoTableCommandH\x00R\x15mergeIntoTableCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xaa\x04\n\nSqlCommand\x12\x14\n\x03sql\x18\x01 \x01(\tB\x02\x18\x01R\x03sql\x12;\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryB\x02\x18\x01R\x04\x61rgs\x12@\n\x08pos_args\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralB\x02\x18\x01R\x07posArgs\x12Z\n\x0fnamed_arguments\x18\x04 \x03(\x0b\x32-.spark.connect.SqlCommand.NamedArgumentsEntryB\x02\x18\x01R\x0enamedArguments\x12\x42\n\rpos_arguments\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionB\x02\x18\x01R\x0cposArguments\x12-\n\x05input\x18\x06 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01\x1a\\\n\x13NamedArgumentsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12/\n\x05value\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\xca\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x12-\n\x12\x63lustering_columns\x18\n \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xdc\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x12-\n\x12\x63lustering_columns\x18\t \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xa0\x06\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12N\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\rforeachWriter\x12L\n\rforeach_batch\x18\x0e \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\x0c\x66oreachBatch\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"\xb3\x01\n\x18StreamingForeachFunction\x12\x43\n\x0fpython_function\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0epythonFunction\x12\x46\n\x0escala_function\x18\x02 \x01(\x0b\x32\x1d.spark.connect.ScalarScalaUDFH\x00R\rscalaFunctionB\n\n\x08\x66unction"\xd4\x01\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12<\n\x18query_started_event_json\x18\x03 \x01(\tH\x00R\x15queryStartedEventJson\x88\x01\x01\x42\x1b\n\x19_query_started_event_json"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xbd\x06\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12n\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0b\x61\x64\x64Listener\x12t\n\x0fremove_listener\x18\x06 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0eremoveListener\x12\'\n\x0elist_listeners\x18\x07 \x01(\x08H\x00R\rlistListeners\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_ms\x1a\xcd\x01\n\x1dStreamingQueryListenerCommand\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x12U\n\x17python_listener_payload\x18\x02 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x15pythonListenerPayload\x88\x01\x01\x12\x0e\n\x02id\x18\x03 \x01(\tR\x02idB\x1a\n\x18_python_listener_payloadB\t\n\x07\x63ommand"\xb4\x08\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12#\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x08H\x00R\x0b\x61\x64\x64Listener\x12)\n\x0fremove_listener\x18\x06 \x01(\x08H\x00R\x0eremoveListener\x12{\n\x0elist_listeners\x18\x07 \x01(\x0b\x32R.spark.connect.StreamingQueryManagerCommandResult.ListStreamingQueryListenerResultH\x00R\rlistListeners\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminated\x1aK\n\x1eStreamingQueryListenerInstance\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x1a\x45\n ListStreamingQueryListenerResult\x12!\n\x0clistener_ids\x18\x01 \x03(\tR\x0blistenerIdsB\r\n\x0bresult_type"\xad\x01\n StreamingQueryListenerBusCommand\x12;\n\x19\x61\x64\x64_listener_bus_listener\x18\x01 \x01(\x08H\x00R\x16\x61\x64\x64ListenerBusListener\x12\x41\n\x1cremove_listener_bus_listener\x18\x02 \x01(\x08H\x00R\x19removeListenerBusListenerB\t\n\x07\x63ommand"\x83\x01\n\x1bStreamingQueryListenerEvent\x12\x1d\n\nevent_json\x18\x01 \x01(\tR\teventJson\x12\x45\n\nevent_type\x18\x02 \x01(\x0e\x32&.spark.connect.StreamingQueryEventTypeR\teventType"\xcc\x01\n"StreamingQueryListenerEventsResult\x12\x42\n\x06\x65vents\x18\x01 \x03(\x0b\x32*.spark.connect.StreamingQueryListenerEventR\x06\x65vents\x12\x42\n\x1blistener_bus_listener_added\x18\x02 \x01(\x08H\x00R\x18listenerBusListenerAdded\x88\x01\x01\x42\x1e\n\x1c_listener_bus_listener_added"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01"X\n\x1c\x43reateResourceProfileCommand\x12\x38\n\x07profile\x18\x01 \x01(\x0b\x32\x1e.spark.connect.ResourceProfileR\x07profile"C\n"CreateResourceProfileCommandResult\x12\x1d\n\nprofile_id\x18\x01 \x01(\x05R\tprofileId"d\n!RemoveCachedRemoteRelationCommand\x12?\n\x08relation\x18\x01 \x01(\x0b\x32#.spark.connect.CachedRemoteRelationR\x08relation"t\n\x11\x43heckpointCommand\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x14\n\x05local\x18\x02 \x01(\x08R\x05local\x12\x14\n\x05\x65\x61ger\x18\x03 \x01(\x08R\x05\x65\x61ger"\xe8\x03\n\x15MergeIntoTableCommand\x12*\n\x11target_table_name\x18\x01 \x01(\tR\x0ftargetTableName\x12\x43\n\x11source_table_plan\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x0fsourceTablePlan\x12\x42\n\x0fmerge_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x0emergeCondition\x12>\n\rmatch_actions\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0cmatchActions\x12I\n\x13not_matched_actions\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x11notMatchedActions\x12[\n\x1dnot_matched_by_source_actions\x18\x06 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x19notMatchedBySourceActions\x12\x32\n\x15with_schema_evolution\x18\x07 \x01(\x08R\x13withSchemaEvolution*\x85\x01\n\x17StreamingQueryEventType\x12\x1e\n\x1aQUERY_PROGRESS_UNSPECIFIED\x10\x00\x12\x18\n\x14QUERY_PROGRESS_EVENT\x10\x01\x12\x1a\n\x16QUERY_TERMINATED_EVENT\x10\x02\x12\x14\n\x10QUERY_IDLE_EVENT\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' + b'\n\x1cspark/connect/commands.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1aspark/connect/common.proto\x1a\x1fspark/connect/expressions.proto\x1a\x1dspark/connect/relations.proto"\x90\r\n\x07\x43ommand\x12]\n\x11register_function\x18\x01 \x01(\x0b\x32..spark.connect.CommonInlineUserDefinedFunctionH\x00R\x10registerFunction\x12H\n\x0fwrite_operation\x18\x02 \x01(\x0b\x32\x1d.spark.connect.WriteOperationH\x00R\x0ewriteOperation\x12_\n\x15\x63reate_dataframe_view\x18\x03 \x01(\x0b\x32).spark.connect.CreateDataFrameViewCommandH\x00R\x13\x63reateDataframeView\x12O\n\x12write_operation_v2\x18\x04 \x01(\x0b\x32\x1f.spark.connect.WriteOperationV2H\x00R\x10writeOperationV2\x12<\n\x0bsql_command\x18\x05 \x01(\x0b\x32\x19.spark.connect.SqlCommandH\x00R\nsqlCommand\x12k\n\x1cwrite_stream_operation_start\x18\x06 \x01(\x0b\x32(.spark.connect.WriteStreamOperationStartH\x00R\x19writeStreamOperationStart\x12^\n\x17streaming_query_command\x18\x07 \x01(\x0b\x32$.spark.connect.StreamingQueryCommandH\x00R\x15streamingQueryCommand\x12X\n\x15get_resources_command\x18\x08 \x01(\x0b\x32".spark.connect.GetResourcesCommandH\x00R\x13getResourcesCommand\x12t\n\x1fstreaming_query_manager_command\x18\t \x01(\x0b\x32+.spark.connect.StreamingQueryManagerCommandH\x00R\x1cstreamingQueryManagerCommand\x12m\n\x17register_table_function\x18\n \x01(\x0b\x32\x33.spark.connect.CommonInlineUserDefinedTableFunctionH\x00R\x15registerTableFunction\x12\x81\x01\n$streaming_query_listener_bus_command\x18\x0b \x01(\x0b\x32/.spark.connect.StreamingQueryListenerBusCommandH\x00R streamingQueryListenerBusCommand\x12\x64\n\x14register_data_source\x18\x0c \x01(\x0b\x32\x30.spark.connect.CommonInlineUserDefinedDataSourceH\x00R\x12registerDataSource\x12t\n\x1f\x63reate_resource_profile_command\x18\r \x01(\x0b\x32+.spark.connect.CreateResourceProfileCommandH\x00R\x1c\x63reateResourceProfileCommand\x12Q\n\x12\x63heckpoint_command\x18\x0e \x01(\x0b\x32 .spark.connect.CheckpointCommandH\x00R\x11\x63heckpointCommand\x12\x84\x01\n%remove_cached_remote_relation_command\x18\x0f \x01(\x0b\x32\x30.spark.connect.RemoveCachedRemoteRelationCommandH\x00R!removeCachedRemoteRelationCommand\x12_\n\x18merge_into_table_command\x18\x10 \x01(\x0b\x32$.spark.connect.MergeIntoTableCommandH\x00R\x15mergeIntoTableCommand\x12\x35\n\textension\x18\xe7\x07 \x01(\x0b\x32\x14.google.protobuf.AnyH\x00R\textensionB\x0e\n\x0c\x63ommand_type"\xaa\x04\n\nSqlCommand\x12\x14\n\x03sql\x18\x01 \x01(\tB\x02\x18\x01R\x03sql\x12;\n\x04\x61rgs\x18\x02 \x03(\x0b\x32#.spark.connect.SqlCommand.ArgsEntryB\x02\x18\x01R\x04\x61rgs\x12@\n\x08pos_args\x18\x03 \x03(\x0b\x32!.spark.connect.Expression.LiteralB\x02\x18\x01R\x07posArgs\x12Z\n\x0fnamed_arguments\x18\x04 \x03(\x0b\x32-.spark.connect.SqlCommand.NamedArgumentsEntryB\x02\x18\x01R\x0enamedArguments\x12\x42\n\rpos_arguments\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionB\x02\x18\x01R\x0cposArguments\x12-\n\x05input\x18\x06 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x1aZ\n\tArgsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x37\n\x05value\x18\x02 \x01(\x0b\x32!.spark.connect.Expression.LiteralR\x05value:\x02\x38\x01\x1a\\\n\x13NamedArgumentsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12/\n\x05value\x18\x02 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x05value:\x02\x38\x01"\x96\x01\n\x1a\x43reateDataFrameViewCommand\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12\x1b\n\tis_global\x18\x03 \x01(\x08R\x08isGlobal\x12\x18\n\x07replace\x18\x04 \x01(\x08R\x07replace"\xca\x08\n\x0eWriteOperation\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1b\n\x06source\x18\x02 \x01(\tH\x01R\x06source\x88\x01\x01\x12\x14\n\x04path\x18\x03 \x01(\tH\x00R\x04path\x12?\n\x05table\x18\x04 \x01(\x0b\x32\'.spark.connect.WriteOperation.SaveTableH\x00R\x05table\x12:\n\x04mode\x18\x05 \x01(\x0e\x32&.spark.connect.WriteOperation.SaveModeR\x04mode\x12*\n\x11sort_column_names\x18\x06 \x03(\tR\x0fsortColumnNames\x12\x31\n\x14partitioning_columns\x18\x07 \x03(\tR\x13partitioningColumns\x12\x43\n\tbucket_by\x18\x08 \x01(\x0b\x32&.spark.connect.WriteOperation.BucketByR\x08\x62ucketBy\x12\x44\n\x07options\x18\t \x03(\x0b\x32*.spark.connect.WriteOperation.OptionsEntryR\x07options\x12-\n\x12\x63lustering_columns\x18\n \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x82\x02\n\tSaveTable\x12\x1d\n\ntable_name\x18\x01 \x01(\tR\ttableName\x12X\n\x0bsave_method\x18\x02 \x01(\x0e\x32\x37.spark.connect.WriteOperation.SaveTable.TableSaveMethodR\nsaveMethod"|\n\x0fTableSaveMethod\x12!\n\x1dTABLE_SAVE_METHOD_UNSPECIFIED\x10\x00\x12#\n\x1fTABLE_SAVE_METHOD_SAVE_AS_TABLE\x10\x01\x12!\n\x1dTABLE_SAVE_METHOD_INSERT_INTO\x10\x02\x1a[\n\x08\x42ucketBy\x12.\n\x13\x62ucket_column_names\x18\x01 \x03(\tR\x11\x62ucketColumnNames\x12\x1f\n\x0bnum_buckets\x18\x02 \x01(\x05R\nnumBuckets"\x89\x01\n\x08SaveMode\x12\x19\n\x15SAVE_MODE_UNSPECIFIED\x10\x00\x12\x14\n\x10SAVE_MODE_APPEND\x10\x01\x12\x17\n\x13SAVE_MODE_OVERWRITE\x10\x02\x12\x1d\n\x19SAVE_MODE_ERROR_IF_EXISTS\x10\x03\x12\x14\n\x10SAVE_MODE_IGNORE\x10\x04\x42\x0b\n\tsave_typeB\t\n\x07_source"\xdc\x06\n\x10WriteOperationV2\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x1d\n\ntable_name\x18\x02 \x01(\tR\ttableName\x12\x1f\n\x08provider\x18\x03 \x01(\tH\x00R\x08provider\x88\x01\x01\x12L\n\x14partitioning_columns\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x13partitioningColumns\x12\x46\n\x07options\x18\x05 \x03(\x0b\x32,.spark.connect.WriteOperationV2.OptionsEntryR\x07options\x12_\n\x10table_properties\x18\x06 \x03(\x0b\x32\x34.spark.connect.WriteOperationV2.TablePropertiesEntryR\x0ftableProperties\x12\x38\n\x04mode\x18\x07 \x01(\x0e\x32$.spark.connect.WriteOperationV2.ModeR\x04mode\x12J\n\x13overwrite_condition\x18\x08 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x12overwriteCondition\x12-\n\x12\x63lustering_columns\x18\t \x03(\tR\x11\x63lusteringColumns\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x1a\x42\n\x14TablePropertiesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01"\x9f\x01\n\x04Mode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\x0f\n\x0bMODE_CREATE\x10\x01\x12\x12\n\x0eMODE_OVERWRITE\x10\x02\x12\x1d\n\x19MODE_OVERWRITE_PARTITIONS\x10\x03\x12\x0f\n\x0bMODE_APPEND\x10\x04\x12\x10\n\x0cMODE_REPLACE\x10\x05\x12\x1a\n\x16MODE_CREATE_OR_REPLACE\x10\x06\x42\x0b\n\t_provider"\xd8\x06\n\x19WriteStreamOperationStart\x12-\n\x05input\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x05input\x12\x16\n\x06\x66ormat\x18\x02 \x01(\tR\x06\x66ormat\x12O\n\x07options\x18\x03 \x03(\x0b\x32\x35.spark.connect.WriteStreamOperationStart.OptionsEntryR\x07options\x12:\n\x19partitioning_column_names\x18\x04 \x03(\tR\x17partitioningColumnNames\x12:\n\x18processing_time_interval\x18\x05 \x01(\tH\x00R\x16processingTimeInterval\x12%\n\ravailable_now\x18\x06 \x01(\x08H\x00R\x0c\x61vailableNow\x12\x14\n\x04once\x18\x07 \x01(\x08H\x00R\x04once\x12\x46\n\x1e\x63ontinuous_checkpoint_interval\x18\x08 \x01(\tH\x00R\x1c\x63ontinuousCheckpointInterval\x12\x1f\n\x0boutput_mode\x18\t \x01(\tR\noutputMode\x12\x1d\n\nquery_name\x18\n \x01(\tR\tqueryName\x12\x14\n\x04path\x18\x0b \x01(\tH\x01R\x04path\x12\x1f\n\ntable_name\x18\x0c \x01(\tH\x01R\ttableName\x12N\n\x0e\x66oreach_writer\x18\r \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\rforeachWriter\x12L\n\rforeach_batch\x18\x0e \x01(\x0b\x32\'.spark.connect.StreamingForeachFunctionR\x0c\x66oreachBatch\x12\x36\n\x17\x63lustering_column_names\x18\x0f \x03(\tR\x15\x63lusteringColumnNames\x1a:\n\x0cOptionsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x14\n\x05value\x18\x02 \x01(\tR\x05value:\x02\x38\x01\x42\t\n\x07triggerB\x12\n\x10sink_destination"\xb3\x01\n\x18StreamingForeachFunction\x12\x43\n\x0fpython_function\x18\x01 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x0epythonFunction\x12\x46\n\x0escala_function\x18\x02 \x01(\x0b\x32\x1d.spark.connect.ScalarScalaUDFH\x00R\rscalaFunctionB\n\n\x08\x66unction"\xd4\x01\n\x1fWriteStreamOperationStartResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x12\n\x04name\x18\x02 \x01(\tR\x04name\x12<\n\x18query_started_event_json\x18\x03 \x01(\tH\x00R\x15queryStartedEventJson\x88\x01\x01\x42\x1b\n\x19_query_started_event_json"A\n\x18StreamingQueryInstanceId\x12\x0e\n\x02id\x18\x01 \x01(\tR\x02id\x12\x15\n\x06run_id\x18\x02 \x01(\tR\x05runId"\xf8\x04\n\x15StreamingQueryCommand\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12\x18\n\x06status\x18\x02 \x01(\x08H\x00R\x06status\x12%\n\rlast_progress\x18\x03 \x01(\x08H\x00R\x0clastProgress\x12)\n\x0frecent_progress\x18\x04 \x01(\x08H\x00R\x0erecentProgress\x12\x14\n\x04stop\x18\x05 \x01(\x08H\x00R\x04stop\x12\x34\n\x15process_all_available\x18\x06 \x01(\x08H\x00R\x13processAllAvailable\x12O\n\x07\x65xplain\x18\x07 \x01(\x0b\x32\x33.spark.connect.StreamingQueryCommand.ExplainCommandH\x00R\x07\x65xplain\x12\x1e\n\texception\x18\x08 \x01(\x08H\x00R\texception\x12k\n\x11\x61wait_termination\x18\t \x01(\x0b\x32<.spark.connect.StreamingQueryCommand.AwaitTerminationCommandH\x00R\x10\x61waitTermination\x1a,\n\x0e\x45xplainCommand\x12\x1a\n\x08\x65xtended\x18\x01 \x01(\x08R\x08\x65xtended\x1aL\n\x17\x41waitTerminationCommand\x12"\n\ntimeout_ms\x18\x02 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_msB\t\n\x07\x63ommand"\xf5\x08\n\x1bStreamingQueryCommandResult\x12\x42\n\x08query_id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x07queryId\x12Q\n\x06status\x18\x02 \x01(\x0b\x32\x37.spark.connect.StreamingQueryCommandResult.StatusResultH\x00R\x06status\x12j\n\x0frecent_progress\x18\x03 \x01(\x0b\x32?.spark.connect.StreamingQueryCommandResult.RecentProgressResultH\x00R\x0erecentProgress\x12T\n\x07\x65xplain\x18\x04 \x01(\x0b\x32\x38.spark.connect.StreamingQueryCommandResult.ExplainResultH\x00R\x07\x65xplain\x12Z\n\texception\x18\x05 \x01(\x0b\x32:.spark.connect.StreamingQueryCommandResult.ExceptionResultH\x00R\texception\x12p\n\x11\x61wait_termination\x18\x06 \x01(\x0b\x32\x41.spark.connect.StreamingQueryCommandResult.AwaitTerminationResultH\x00R\x10\x61waitTermination\x1a\xaa\x01\n\x0cStatusResult\x12%\n\x0estatus_message\x18\x01 \x01(\tR\rstatusMessage\x12*\n\x11is_data_available\x18\x02 \x01(\x08R\x0fisDataAvailable\x12*\n\x11is_trigger_active\x18\x03 \x01(\x08R\x0fisTriggerActive\x12\x1b\n\tis_active\x18\x04 \x01(\x08R\x08isActive\x1aH\n\x14RecentProgressResult\x12\x30\n\x14recent_progress_json\x18\x05 \x03(\tR\x12recentProgressJson\x1a\'\n\rExplainResult\x12\x16\n\x06result\x18\x01 \x01(\tR\x06result\x1a\xc5\x01\n\x0f\x45xceptionResult\x12\x30\n\x11\x65xception_message\x18\x01 \x01(\tH\x00R\x10\x65xceptionMessage\x88\x01\x01\x12$\n\x0b\x65rror_class\x18\x02 \x01(\tH\x01R\nerrorClass\x88\x01\x01\x12$\n\x0bstack_trace\x18\x03 \x01(\tH\x02R\nstackTrace\x88\x01\x01\x42\x14\n\x12_exception_messageB\x0e\n\x0c_error_classB\x0e\n\x0c_stack_trace\x1a\x38\n\x16\x41waitTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminatedB\r\n\x0bresult_type"\xbd\x06\n\x1cStreamingQueryManagerCommand\x12\x18\n\x06\x61\x63tive\x18\x01 \x01(\x08H\x00R\x06\x61\x63tive\x12\x1d\n\tget_query\x18\x02 \x01(\tH\x00R\x08getQuery\x12|\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32\x46.spark.connect.StreamingQueryManagerCommand.AwaitAnyTerminationCommandH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12n\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0b\x61\x64\x64Listener\x12t\n\x0fremove_listener\x18\x06 \x01(\x0b\x32I.spark.connect.StreamingQueryManagerCommand.StreamingQueryListenerCommandH\x00R\x0eremoveListener\x12\'\n\x0elist_listeners\x18\x07 \x01(\x08H\x00R\rlistListeners\x1aO\n\x1a\x41waitAnyTerminationCommand\x12"\n\ntimeout_ms\x18\x01 \x01(\x03H\x00R\ttimeoutMs\x88\x01\x01\x42\r\n\x0b_timeout_ms\x1a\xcd\x01\n\x1dStreamingQueryListenerCommand\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x12U\n\x17python_listener_payload\x18\x02 \x01(\x0b\x32\x18.spark.connect.PythonUDFH\x00R\x15pythonListenerPayload\x88\x01\x01\x12\x0e\n\x02id\x18\x03 \x01(\tR\x02idB\x1a\n\x18_python_listener_payloadB\t\n\x07\x63ommand"\xb4\x08\n"StreamingQueryManagerCommandResult\x12X\n\x06\x61\x63tive\x18\x01 \x01(\x0b\x32>.spark.connect.StreamingQueryManagerCommandResult.ActiveResultH\x00R\x06\x61\x63tive\x12`\n\x05query\x18\x02 \x01(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceH\x00R\x05query\x12\x81\x01\n\x15\x61wait_any_termination\x18\x03 \x01(\x0b\x32K.spark.connect.StreamingQueryManagerCommandResult.AwaitAnyTerminationResultH\x00R\x13\x61waitAnyTermination\x12+\n\x10reset_terminated\x18\x04 \x01(\x08H\x00R\x0fresetTerminated\x12#\n\x0c\x61\x64\x64_listener\x18\x05 \x01(\x08H\x00R\x0b\x61\x64\x64Listener\x12)\n\x0fremove_listener\x18\x06 \x01(\x08H\x00R\x0eremoveListener\x12{\n\x0elist_listeners\x18\x07 \x01(\x0b\x32R.spark.connect.StreamingQueryManagerCommandResult.ListStreamingQueryListenerResultH\x00R\rlistListeners\x1a\x7f\n\x0c\x41\x63tiveResult\x12o\n\x0e\x61\x63tive_queries\x18\x01 \x03(\x0b\x32H.spark.connect.StreamingQueryManagerCommandResult.StreamingQueryInstanceR\ractiveQueries\x1as\n\x16StreamingQueryInstance\x12\x37\n\x02id\x18\x01 \x01(\x0b\x32\'.spark.connect.StreamingQueryInstanceIdR\x02id\x12\x17\n\x04name\x18\x02 \x01(\tH\x00R\x04name\x88\x01\x01\x42\x07\n\x05_name\x1a;\n\x19\x41waitAnyTerminationResult\x12\x1e\n\nterminated\x18\x01 \x01(\x08R\nterminated\x1aK\n\x1eStreamingQueryListenerInstance\x12)\n\x10listener_payload\x18\x01 \x01(\x0cR\x0flistenerPayload\x1a\x45\n ListStreamingQueryListenerResult\x12!\n\x0clistener_ids\x18\x01 \x03(\tR\x0blistenerIdsB\r\n\x0bresult_type"\xad\x01\n StreamingQueryListenerBusCommand\x12;\n\x19\x61\x64\x64_listener_bus_listener\x18\x01 \x01(\x08H\x00R\x16\x61\x64\x64ListenerBusListener\x12\x41\n\x1cremove_listener_bus_listener\x18\x02 \x01(\x08H\x00R\x19removeListenerBusListenerB\t\n\x07\x63ommand"\x83\x01\n\x1bStreamingQueryListenerEvent\x12\x1d\n\nevent_json\x18\x01 \x01(\tR\teventJson\x12\x45\n\nevent_type\x18\x02 \x01(\x0e\x32&.spark.connect.StreamingQueryEventTypeR\teventType"\xcc\x01\n"StreamingQueryListenerEventsResult\x12\x42\n\x06\x65vents\x18\x01 \x03(\x0b\x32*.spark.connect.StreamingQueryListenerEventR\x06\x65vents\x12\x42\n\x1blistener_bus_listener_added\x18\x02 \x01(\x08H\x00R\x18listenerBusListenerAdded\x88\x01\x01\x42\x1e\n\x1c_listener_bus_listener_added"\x15\n\x13GetResourcesCommand"\xd4\x01\n\x19GetResourcesCommandResult\x12U\n\tresources\x18\x01 \x03(\x0b\x32\x37.spark.connect.GetResourcesCommandResult.ResourcesEntryR\tresources\x1a`\n\x0eResourcesEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12\x38\n\x05value\x18\x02 \x01(\x0b\x32".spark.connect.ResourceInformationR\x05value:\x02\x38\x01"X\n\x1c\x43reateResourceProfileCommand\x12\x38\n\x07profile\x18\x01 \x01(\x0b\x32\x1e.spark.connect.ResourceProfileR\x07profile"C\n"CreateResourceProfileCommandResult\x12\x1d\n\nprofile_id\x18\x01 \x01(\x05R\tprofileId"d\n!RemoveCachedRemoteRelationCommand\x12?\n\x08relation\x18\x01 \x01(\x0b\x32#.spark.connect.CachedRemoteRelationR\x08relation"t\n\x11\x43heckpointCommand\x12\x33\n\x08relation\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationR\x08relation\x12\x14\n\x05local\x18\x02 \x01(\x08R\x05local\x12\x14\n\x05\x65\x61ger\x18\x03 \x01(\x08R\x05\x65\x61ger"\xe8\x03\n\x15MergeIntoTableCommand\x12*\n\x11target_table_name\x18\x01 \x01(\tR\x0ftargetTableName\x12\x43\n\x11source_table_plan\x18\x02 \x01(\x0b\x32\x17.spark.connect.RelationR\x0fsourceTablePlan\x12\x42\n\x0fmerge_condition\x18\x03 \x01(\x0b\x32\x19.spark.connect.ExpressionR\x0emergeCondition\x12>\n\rmatch_actions\x18\x04 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x0cmatchActions\x12I\n\x13not_matched_actions\x18\x05 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x11notMatchedActions\x12[\n\x1dnot_matched_by_source_actions\x18\x06 \x03(\x0b\x32\x19.spark.connect.ExpressionR\x19notMatchedBySourceActions\x12\x32\n\x15with_schema_evolution\x18\x07 \x01(\x08R\x13withSchemaEvolution*\x85\x01\n\x17StreamingQueryEventType\x12\x1e\n\x1aQUERY_PROGRESS_UNSPECIFIED\x10\x00\x12\x18\n\x14QUERY_PROGRESS_EVENT\x10\x01\x12\x1a\n\x16QUERY_TERMINATED_EVENT\x10\x02\x12\x14\n\x10QUERY_IDLE_EVENT\x10\x03\x42\x36\n\x1eorg.apache.spark.connect.protoP\x01Z\x12internal/generatedb\x06proto3' ) _builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, globals()) @@ -71,8 +71,8 @@ _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_options = b"8\001" _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._options = None _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_options = b"8\001" - _STREAMINGQUERYEVENTTYPE._serialized_start = 11106 - _STREAMINGQUERYEVENTTYPE._serialized_end = 11239 + _STREAMINGQUERYEVENTTYPE._serialized_start = 11162 + _STREAMINGQUERYEVENTTYPE._serialized_end = 11295 _COMMAND._serialized_start = 167 _COMMAND._serialized_end = 1847 _SQLCOMMAND._serialized_start = 1850 @@ -104,71 +104,71 @@ _WRITEOPERATIONV2_MODE._serialized_start = 4349 _WRITEOPERATIONV2_MODE._serialized_end = 4508 _WRITESTREAMOPERATIONSTART._serialized_start = 4524 - _WRITESTREAMOPERATIONSTART._serialized_end = 5324 + _WRITESTREAMOPERATIONSTART._serialized_end = 5380 _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_start = 3082 _WRITESTREAMOPERATIONSTART_OPTIONSENTRY._serialized_end = 3140 - _STREAMINGFOREACHFUNCTION._serialized_start = 5327 - _STREAMINGFOREACHFUNCTION._serialized_end = 5506 - _WRITESTREAMOPERATIONSTARTRESULT._serialized_start = 5509 - _WRITESTREAMOPERATIONSTARTRESULT._serialized_end = 5721 - _STREAMINGQUERYINSTANCEID._serialized_start = 5723 - _STREAMINGQUERYINSTANCEID._serialized_end = 5788 - _STREAMINGQUERYCOMMAND._serialized_start = 5791 - _STREAMINGQUERYCOMMAND._serialized_end = 6423 - _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_start = 6290 - _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_end = 6334 - _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 6336 - _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 6412 - _STREAMINGQUERYCOMMANDRESULT._serialized_start = 6426 - _STREAMINGQUERYCOMMANDRESULT._serialized_end = 7567 - _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 7009 - _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 7179 - _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 7181 - _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_end = 7253 - _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 7255 - _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 7294 - _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 7297 - _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 7494 - _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 7496 - _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 7552 - _STREAMINGQUERYMANAGERCOMMAND._serialized_start = 7570 - _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 8399 - _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start = 8101 - _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 8180 - _STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_start = 8183 - _STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_end = 8388 - _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 8402 - _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 9478 - _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 9010 - _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 9137 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 9139 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end = 9254 - _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start = 9256 - _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 9315 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_start = 9317 - _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_end = 9392 - _STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_start = 9394 - _STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_end = 9463 - _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_start = 9481 - _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_end = 9654 - _STREAMINGQUERYLISTENEREVENT._serialized_start = 9657 - _STREAMINGQUERYLISTENEREVENT._serialized_end = 9788 - _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_start = 9791 - _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_end = 9995 - _GETRESOURCESCOMMAND._serialized_start = 9997 - _GETRESOURCESCOMMAND._serialized_end = 10018 - _GETRESOURCESCOMMANDRESULT._serialized_start = 10021 - _GETRESOURCESCOMMANDRESULT._serialized_end = 10233 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 10137 - _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 10233 - _CREATERESOURCEPROFILECOMMAND._serialized_start = 10235 - _CREATERESOURCEPROFILECOMMAND._serialized_end = 10323 - _CREATERESOURCEPROFILECOMMANDRESULT._serialized_start = 10325 - _CREATERESOURCEPROFILECOMMANDRESULT._serialized_end = 10392 - _REMOVECACHEDREMOTERELATIONCOMMAND._serialized_start = 10394 - _REMOVECACHEDREMOTERELATIONCOMMAND._serialized_end = 10494 - _CHECKPOINTCOMMAND._serialized_start = 10496 - _CHECKPOINTCOMMAND._serialized_end = 10612 - _MERGEINTOTABLECOMMAND._serialized_start = 10615 - _MERGEINTOTABLECOMMAND._serialized_end = 11103 + _STREAMINGFOREACHFUNCTION._serialized_start = 5383 + _STREAMINGFOREACHFUNCTION._serialized_end = 5562 + _WRITESTREAMOPERATIONSTARTRESULT._serialized_start = 5565 + _WRITESTREAMOPERATIONSTARTRESULT._serialized_end = 5777 + _STREAMINGQUERYINSTANCEID._serialized_start = 5779 + _STREAMINGQUERYINSTANCEID._serialized_end = 5844 + _STREAMINGQUERYCOMMAND._serialized_start = 5847 + _STREAMINGQUERYCOMMAND._serialized_end = 6479 + _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_start = 6346 + _STREAMINGQUERYCOMMAND_EXPLAINCOMMAND._serialized_end = 6390 + _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_start = 6392 + _STREAMINGQUERYCOMMAND_AWAITTERMINATIONCOMMAND._serialized_end = 6468 + _STREAMINGQUERYCOMMANDRESULT._serialized_start = 6482 + _STREAMINGQUERYCOMMANDRESULT._serialized_end = 7623 + _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_start = 7065 + _STREAMINGQUERYCOMMANDRESULT_STATUSRESULT._serialized_end = 7235 + _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_start = 7237 + _STREAMINGQUERYCOMMANDRESULT_RECENTPROGRESSRESULT._serialized_end = 7309 + _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_start = 7311 + _STREAMINGQUERYCOMMANDRESULT_EXPLAINRESULT._serialized_end = 7350 + _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_start = 7353 + _STREAMINGQUERYCOMMANDRESULT_EXCEPTIONRESULT._serialized_end = 7550 + _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_start = 7552 + _STREAMINGQUERYCOMMANDRESULT_AWAITTERMINATIONRESULT._serialized_end = 7608 + _STREAMINGQUERYMANAGERCOMMAND._serialized_start = 7626 + _STREAMINGQUERYMANAGERCOMMAND._serialized_end = 8455 + _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_start = 8157 + _STREAMINGQUERYMANAGERCOMMAND_AWAITANYTERMINATIONCOMMAND._serialized_end = 8236 + _STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_start = 8239 + _STREAMINGQUERYMANAGERCOMMAND_STREAMINGQUERYLISTENERCOMMAND._serialized_end = 8444 + _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_start = 8458 + _STREAMINGQUERYMANAGERCOMMANDRESULT._serialized_end = 9534 + _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_start = 9066 + _STREAMINGQUERYMANAGERCOMMANDRESULT_ACTIVERESULT._serialized_end = 9193 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_start = 9195 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYINSTANCE._serialized_end = 9310 + _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_start = 9312 + _STREAMINGQUERYMANAGERCOMMANDRESULT_AWAITANYTERMINATIONRESULT._serialized_end = 9371 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_start = 9373 + _STREAMINGQUERYMANAGERCOMMANDRESULT_STREAMINGQUERYLISTENERINSTANCE._serialized_end = 9448 + _STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_start = 9450 + _STREAMINGQUERYMANAGERCOMMANDRESULT_LISTSTREAMINGQUERYLISTENERRESULT._serialized_end = 9519 + _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_start = 9537 + _STREAMINGQUERYLISTENERBUSCOMMAND._serialized_end = 9710 + _STREAMINGQUERYLISTENEREVENT._serialized_start = 9713 + _STREAMINGQUERYLISTENEREVENT._serialized_end = 9844 + _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_start = 9847 + _STREAMINGQUERYLISTENEREVENTSRESULT._serialized_end = 10051 + _GETRESOURCESCOMMAND._serialized_start = 10053 + _GETRESOURCESCOMMAND._serialized_end = 10074 + _GETRESOURCESCOMMANDRESULT._serialized_start = 10077 + _GETRESOURCESCOMMANDRESULT._serialized_end = 10289 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_start = 10193 + _GETRESOURCESCOMMANDRESULT_RESOURCESENTRY._serialized_end = 10289 + _CREATERESOURCEPROFILECOMMAND._serialized_start = 10291 + _CREATERESOURCEPROFILECOMMAND._serialized_end = 10379 + _CREATERESOURCEPROFILECOMMANDRESULT._serialized_start = 10381 + _CREATERESOURCEPROFILECOMMANDRESULT._serialized_end = 10448 + _REMOVECACHEDREMOTERELATIONCOMMAND._serialized_start = 10450 + _REMOVECACHEDREMOTERELATIONCOMMAND._serialized_end = 10550 + _CHECKPOINTCOMMAND._serialized_start = 10552 + _CHECKPOINTCOMMAND._serialized_end = 10668 + _MERGEINTOTABLECOMMAND._serialized_start = 10671 + _MERGEINTOTABLECOMMAND._serialized_end = 11159 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/commands_pb2.pyi b/python/pyspark/sql/connect/proto/commands_pb2.pyi index 03a31ecdfedf..2dedcdfc8e3e 100644 --- a/python/pyspark/sql/connect/proto/commands_pb2.pyi +++ b/python/pyspark/sql/connect/proto/commands_pb2.pyi @@ -905,6 +905,7 @@ class WriteStreamOperationStart(google.protobuf.message.Message): TABLE_NAME_FIELD_NUMBER: builtins.int FOREACH_WRITER_FIELD_NUMBER: builtins.int FOREACH_BATCH_FIELD_NUMBER: builtins.int + CLUSTERING_COLUMN_NAMES_FIELD_NUMBER: builtins.int @property def input(self) -> pyspark.sql.connect.proto.relations_pb2.Relation: """(Required) The output of the `input` streaming relation will be written.""" @@ -932,6 +933,11 @@ class WriteStreamOperationStart(google.protobuf.message.Message): def foreach_writer(self) -> global___StreamingForeachFunction: ... @property def foreach_batch(self) -> global___StreamingForeachFunction: ... + @property + def clustering_column_names( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """(Optional) Columns used for clustering the table.""" def __init__( self, *, @@ -949,6 +955,7 @@ class WriteStreamOperationStart(google.protobuf.message.Message): table_name: builtins.str = ..., foreach_writer: global___StreamingForeachFunction | None = ..., foreach_batch: global___StreamingForeachFunction | None = ..., + clustering_column_names: collections.abc.Iterable[builtins.str] | None = ..., ) -> None: ... def HasField( self, @@ -982,6 +989,8 @@ class WriteStreamOperationStart(google.protobuf.message.Message): field_name: typing_extensions.Literal[ "available_now", b"available_now", + "clustering_column_names", + b"clustering_column_names", "continuous_checkpoint_interval", b"continuous_checkpoint_interval", "foreach_batch", diff --git a/python/pyspark/sql/connect/streaming/readwriter.py b/python/pyspark/sql/connect/streaming/readwriter.py index 22dd22fe02cb..afae9481c354 100644 --- a/python/pyspark/sql/connect/streaming/readwriter.py +++ b/python/pyspark/sql/connect/streaming/readwriter.py @@ -445,6 +445,25 @@ def partitionBy(self, *cols: str) -> "DataStreamWriter": # type: ignore[misc] partitionBy.__doc__ = PySparkDataStreamWriter.partitionBy.__doc__ + @overload + def clusterBy(self, *cols: str) -> "DataStreamWriter": + ... + + @overload + def clusterBy(self, __cols: List[str]) -> "DataStreamWriter": + ... + + def clusterBy(self, *cols: str) -> "DataStreamWriter": # type: ignore[misc] + if len(cols) == 1 and isinstance(cols[0], (list, tuple)): + cols = cols[0] + # Clear any existing columns (if any). + while len(self._write_proto.clustering_column_names) > 0: + self._write_proto.clustering_column_names.pop() + self._write_proto.clustering_column_names.extend(cast(List[str], cols)) + return self + + clusterBy.__doc__ = PySparkDataStreamWriter.clusterBy.__doc__ + def queryName(self, queryName: str) -> "DataStreamWriter": if not queryName or type(queryName) != str or len(queryName.strip()) == 0: raise PySparkValueError( diff --git a/python/pyspark/sql/streaming/readwriter.py b/python/pyspark/sql/streaming/readwriter.py index d10a4b92b2f6..f72016a82a8c 100644 --- a/python/pyspark/sql/streaming/readwriter.py +++ b/python/pyspark/sql/streaming/readwriter.py @@ -1123,6 +1123,65 @@ def partitionBy(self, *cols: str) -> "DataStreamWriter": # type: ignore[misc] self._jwrite = self._jwrite.partitionBy(_to_seq(self._spark._sc, cols)) return self + @overload + def clusterBy(self, *cols: str) -> "DataStreamWriter": + ... + + @overload + def clusterBy(self, __cols: List[str]) -> "DataStreamWriter": + ... + + def clusterBy(self, *cols: str) -> "DataStreamWriter": # type: ignore[misc] + """Clusters the output by the given columns. + + If specified, the output is laid out such that records with similar values on the clustering + column(s) are grouped together in the same file. + + Clustering improves query efficiency by allowing queries with predicates on the clustering + columns to skip unnecessary data. Unlike partitioning, clustering can be used on very high + cardinality columns. + + .. versionadded:: 4.0.0 + + Parameters + ---------- + cols : str or list + name of columns + + Notes + ----- + This API is evolving. + + Examples + -------- + >>> df = spark.readStream.format("rate").load() + >>> df.writeStream.clusterBy("value") + <...streaming.readwriter.DataStreamWriter object ...> + + Cluster-by timestamp column from Rate source. + + >>> import tempfile + >>> import time + >>> with tempfile.TemporaryDirectory(prefix="partitionBy1") as d: + ... with tempfile.TemporaryDirectory(prefix="partitionBy2") as cp: + ... df = spark.readStream.format("rate").option("rowsPerSecond", 10).load() + ... q = df.writeStream.clusterBy( + ... "timestamp").format("parquet").option("checkpointLocation", cp).start(d) + ... time.sleep(5) + ... q.stop() + ... spark.read.schema(df.schema).parquet(d).show() + +...---------+-----+ + |...timestamp|value| + +...---------+-----+ + ... + """ + from pyspark.sql.classic.column import _to_seq + + if len(cols) == 1 and isinstance(cols[0], (list, tuple)): + cols = cols[0] + self._jwrite = self._jwrite.clusterBy(_to_seq(self._spark._sc, cols)) + return self + def queryName(self, queryName: str) -> "DataStreamWriter": """Specifies the name of the :class:`StreamingQuery` that can be started with :func:`start`. This name must be unique among all the currently active queries diff --git a/python/pyspark/sql/tests/streaming/test_streaming.py b/python/pyspark/sql/tests/streaming/test_streaming.py index 00d1fbf53885..a0e85c73aedf 100644 --- a/python/pyspark/sql/tests/streaming/test_streaming.py +++ b/python/pyspark/sql/tests/streaming/test_streaming.py @@ -429,6 +429,31 @@ def test_streaming_write_to_table(self): result = self.spark.sql("SELECT value FROM output_table").collect() self.assertTrue(len(result) > 0) + def test_streaming_write_to_table_cluster_by(self): + with self.table("output_table"), tempfile.TemporaryDirectory(prefix="to_table") as tmpdir: + df = self.spark.readStream.format("rate").option("rowsPerSecond", 10).load() + q = df.writeStream.clusterBy("value").toTable( + "output_table", format="parquet", checkpointLocation=tmpdir + ) + self.assertTrue(q.isActive) + time.sleep(10) + q.stop() + result = self.spark.sql("DESCRIBE output_table").collect() + self.assertEqual( + set( + [ + Row(col_name="timestamp", data_type="timestamp", comment=None), + Row(col_name="value", data_type="bigint", comment=None), + Row(col_name="# Clustering Information", data_type="", comment=""), + Row(col_name="# col_name", data_type="data_type", comment="comment"), + Row(col_name="value", data_type="bigint", comment=None), + ] + ), + set(result), + ) + result = self.spark.sql("SELECT value FROM output_table").collect() + self.assertTrue(len(result) > 0) + def test_streaming_with_temporary_view(self): """ This verifies createOrReplaceTempView() works with a streaming dataframe. An SQL diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 1db03c5d816f..ab4d350c1e68 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -36,9 +36,10 @@ import org.apache.spark.sql.catalyst.types.DataTypeUtils import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap import org.apache.spark.sql.connector.catalog.{Identifier, SupportsWrite, Table, TableCatalog, TableProvider, V1Table, V2TableWithV1Fallback} import org.apache.spark.sql.connector.catalog.TableCapability._ +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, FieldReference} import org.apache.spark.sql.errors.QueryCompilationErrors import org.apache.spark.sql.execution.command.DDLUtils -import org.apache.spark.sql.execution.datasources.DataSource +import org.apache.spark.sql.execution.datasources.{DataSource, DataSourceUtils} import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Utils, FileDataSourceV2} import org.apache.spark.sql.execution.datasources.v2.python.PythonDataSourceV2 import org.apache.spark.sql.execution.streaming._ @@ -166,6 +167,24 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { @scala.annotation.varargs def partitionBy(colNames: String*): DataStreamWriter[T] = { this.partitioningColumns = Option(colNames) + validatePartitioningAndClustering() + this + } + + /** + * Clusters the output by the given columns. If specified, the output is laid out such that + * records with similar values on the clustering column are grouped together in the same file. + * + * Clustering improves query efficiency by allowing queries with predicates on the clustering + * columns to skip unnecessary data. Unlike partitioning, clustering can be used on very high + * cardinality columns. + * + * @since 4.0.0 + */ + @scala.annotation.varargs + def clusterBy(colNames: String*): DataStreamWriter[T] = { + this.clusteringColumns = Option(colNames) + validatePartitioningAndClustering() this } @@ -288,12 +307,21 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { if (!catalog.asTableCatalog.tableExists(identifier)) { import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ + + val properties = normalizedClusteringCols.map { cols => + Map( + DataSourceUtils.CLUSTERING_COLUMNS_KEY -> DataSourceUtils.encodePartitioningColumns(cols)) + }.getOrElse(Map.empty) + val partitioningOrClusteringTransform = normalizedClusteringCols.map { colNames => + Array(ClusterByTransform(colNames.map(col => FieldReference(col)))).toImmutableArraySeq + }.getOrElse(partitioningColumns.getOrElse(Nil).asTransforms.toImmutableArraySeq) + /** * Note, currently the new table creation by this API doesn't fully cover the V2 table. * TODO (SPARK-33638): Full support of v2 table creation */ val tableSpec = UnresolvedTableSpec( - Map.empty[String, String], + properties, Some(source), OptionList(Seq.empty), extraOptions.get("path"), @@ -303,7 +331,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val cmd = CreateTable( UnresolvedIdentifier(originalMultipartIdentifier), df.schema.asNullable.map(ColumnDefinition.fromV1Column(_, parser)), - partitioningColumns.getOrElse(Nil).asTransforms.toImmutableArraySeq, + partitioningOrClusteringTransform, tableSpec, ignoreIfExists = false) Dataset.ofRows(df.sparkSession, cmd) @@ -439,10 +467,22 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } private def createV1Sink(optionsWithPath: CaseInsensitiveMap[String]): Sink = { + // Do not allow the user to specify clustering columns in the options. Ignoring this option is + // consistent with the behavior of DataFrameWriter on non Path-based tables and with the + // behavior of DataStreamWriter on partitioning columns specified in options. + val optionsWithoutClusteringKey = + optionsWithPath.originalMap - DataSourceUtils.CLUSTERING_COLUMNS_KEY + + val optionsWithClusteringColumns = normalizedClusteringCols match { + case Some(cols) => optionsWithoutClusteringKey + ( + DataSourceUtils.CLUSTERING_COLUMNS_KEY -> + DataSourceUtils.encodePartitioningColumns(cols)) + case None => optionsWithoutClusteringKey + } val ds = DataSource( df.sparkSession, className = source, - options = optionsWithPath.originalMap, + options = optionsWithClusteringColumns, partitionColumns = normalizedParCols.getOrElse(Nil)) ds.createSink(outputMode) } @@ -514,6 +554,10 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { cols.map(normalize(_, "Partition")) } + private def normalizedClusteringCols: Option[Seq[String]] = clusteringColumns.map { cols => + cols.map(normalize(_, "Clustering")) + } + /** * The given column name may not be equal to any of the existing column names if we were in * case-insensitive context. Normalize the given column name to the real one so that we don't @@ -532,6 +576,13 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { } } + // Validate that partitionBy isn't used with clusterBy. + private def validatePartitioningAndClustering(): Unit = { + if (clusteringColumns.nonEmpty && partitioningColumns.nonEmpty) { + throw QueryCompilationErrors.clusterByWithPartitionedBy() + } + } + /////////////////////////////////////////////////////////////////////////////////////// // Builder pattern config options /////////////////////////////////////////////////////////////////////////////////////// @@ -554,6 +605,8 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { private var foreachBatchWriter: (Dataset[T], Long) => Unit = null private var partitioningColumns: Option[Seq[String]] = None + + private var clusteringColumns: Option[Seq[String]] = None } object DataStreamWriter { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index c2416cca8874..c4ec0af80b72 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -30,6 +30,7 @@ import org.mockito.Mockito._ import org.scalatest.BeforeAndAfter import org.apache.spark.sql._ +import org.apache.spark.sql.execution.datasources.DataSourceUtils import org.apache.spark.sql.execution.streaming._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider} @@ -47,6 +48,7 @@ object LastOptions { var sinkParameters: Map[String, String] = null var schema: Option[StructType] = null var partitionColumns: Seq[String] = Nil + var clusteringColumns: Seq[String] = Nil def clear(): Unit = { parameters = null @@ -104,6 +106,8 @@ class DefaultSource extends StreamSourceProvider with StreamSinkProvider { outputMode: OutputMode): Sink = { LastOptions.sinkParameters = parameters LastOptions.partitionColumns = partitionColumns + LastOptions.clusteringColumns = parameters.get(DataSourceUtils.CLUSTERING_COLUMNS_KEY) + .map(DataSourceUtils.decodePartitioningColumns).getOrElse(Nil) LastOptions.mockStreamSinkProvider.createSink(spark, parameters, partitionColumns, outputMode) (_: Long, _: DataFrame) => {} } @@ -258,6 +262,56 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { } } + test("clustering") { + val df = spark.readStream + .format("org.apache.spark.sql.streaming.test") + .load() + + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .start() + .stop() + assert(LastOptions.partitionColumns == Nil) + + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .clusterBy("a") + .start() + .stop() + assert(LastOptions.clusteringColumns == Seq("a")) + + withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .clusterBy("A") + .start() + .stop() + assert(LastOptions.clusteringColumns == Seq("a")) + } + + intercept[AnalysisException] { + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .clusterBy("b") + .start() + .stop() + } + + intercept[AnalysisException] { + df.writeStream + .format("org.apache.spark.sql.streaming.test") + .option("checkpointLocation", newMetadataDir) + .clusterBy("a") + .partitionBy("a") + .start() + .stop() + } + } + test("stream paths") { val df = spark.readStream .format("org.apache.spark.sql.streaming.test") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala index af07aceaed14..5ae7b3eec37e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamTableAPISuite.scala @@ -29,7 +29,7 @@ import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable import org.apache.spark.sql.catalyst.streaming.StreamingRelationV2 import org.apache.spark.sql.connector.{FakeV2Provider, FakeV2ProviderWithCustomSchema, InMemoryTableSessionCatalog} import org.apache.spark.sql.connector.catalog.{Column, Identifier, InMemoryTableCatalog, MetadataColumn, SupportsMetadataColumns, SupportsRead, Table, TableCapability, V2TableWithV1Fallback} -import org.apache.spark.sql.connector.expressions.Transform +import org.apache.spark.sql.connector.expressions.{ClusterByTransform, FieldReference, Transform} import org.apache.spark.sql.connector.read.ScanBuilder import org.apache.spark.sql.execution.streaming.{MemoryStream, MemoryStreamScanBuilder, StreamingQueryWrapper} import org.apache.spark.sql.functions.lit @@ -334,6 +334,31 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { } } + test("write: write to new table with clusterBy") { + val tableIdentifier = "testcat.cluster_test" + + withTable(tableIdentifier) { + runStreamAppendWithClusterBy(tableIdentifier) + + val table = spark.sessionState.catalogManager.catalog("testcat").asTableCatalog + .loadTable(Identifier.of(Array(), "cluster_test")) + assert(table.partitioning === Seq(ClusterByTransform(Seq(FieldReference("id"))))) + } + } + + test("write: write to existing table with matching clustering column using clusterBy") { + val tableIdentifier = "testcat.cluster_test" + + withTable(tableIdentifier) { + sql(s"CREATE TABLE $tableIdentifier (id BIGINT, data STRING) CLUSTER BY (id)") + runStreamAppendWithClusterBy(tableIdentifier) + + val table = spark.sessionState.catalogManager.catalog("testcat").asTableCatalog + .loadTable(Identifier.of(Array(), "cluster_test")) + assert(table.partitioning === Seq(ClusterByTransform(Seq(FieldReference("id"))))) + } + } + test("explain with table on DSv1 data source") { val tblSourceName = "tbl_src" val tblTargetName = "tbl_target" @@ -591,6 +616,24 @@ class DataStreamTableAPISuite extends StreamTest with BeforeAndAfter { expectedOutputs.map { case (id, data) => Row(id, data) } ) } + + private def runStreamAppendWithClusterBy(tableIdentifier: String): Unit = { + withTempDir { ckptDir => + val inputData = MemoryStream[(Long, String)] + val inputDF = inputData.toDF().toDF("id", "data") + + val query = inputDF + .writeStream + .clusterBy("id") + .option("checkpointLocation", ckptDir.getAbsolutePath) + .toTable(tableIdentifier) + + inputData.addData(Seq((1L, "a"), (2L, "b"), (3L, "c"))) + + query.processAllAvailable() + query.stop() + } + } } object DataStreamTableAPISuite {