diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index 5f9a4411ecdcc..522bc1113a945 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -183,6 +183,85 @@ message ExecutePlanResponse { } } +// Request to update or fetch the configurations. +message ConfigRequest { + // (Required) + // + // The client_id is set by the client to be able to collate streaming responses from + // different queries. + string client_id = 1; + + // (Required) User context + UserContext user_context = 2; + + Operation operation = 3; + + // (Optional) + // + // Identify which keys will be updated or fetched. + // Required when the 'operation' is 'SET', 'GET', 'GET_OPTION', 'UNSET', + // 'CONTAINS', 'IS_MODIFIABLE'. + repeated string keys = 4; + + // (Optional) + // + // Corresponding values to the keys. + // Required when the 'operation' is 'SET'. + // Optional when the 'operation' is 'GET', the values here will be used + // as the default values. + repeated string values = 5; + + // (Optional) + // + // Only used when the 'operation' is 'GET_ALL'. + // If prefix is given, only parameters that start with the prefix will be returned. + optional string prefix = 6; + + enum Operation { + OPERATION_UNSPECIFIED = 0; + OPERATION_SET = 1; + OPERATION_GET = 2; + OPERATION_GET_OPTION = 3; + OPERATION_GET_ALL = 4; + OPERATION_UNSET = 5; + OPERATION_CONTAINS = 6; + OPERATION_IS_MODIFIABLE = 7; + } +} + +message ConfigResponse { + string client_id = 1; + + // (Optional) + // + // Available when the operation is 'GET_ALL'. + repeated string keys = 2; + + // (Optional) + // + // Available when the operation is 'GET', 'GET_ALL'. + repeated string values = 3; + + // (Optional) + // + // Available when the operation is 'GET_OPTION'. + repeated OptionalValue optional_values = 4; + + // (Optional) + // + // Available when the operation is 'CONTAINS', 'IS_MODIFIABLE'. + repeated bool bools = 5; + + // (Optional) + // + // Warning messages for deprecated or unsupported configurations. + repeated string warnings = 6; + + message OptionalValue { + optional string value = 1; + } +} + // Main interface for the SparkConnect service. service SparkConnectService { @@ -193,5 +272,8 @@ service SparkConnectService { // Analyzes a query and returns a [[AnalyzeResponse]] containing metadata about the query. rpc AnalyzePlan(AnalyzePlanRequest) returns (AnalyzePlanResponse) {} + + // Update or fetch the configurations and returns a [[ConfigResponse]] containing the result. + rpc Config(ConfigRequest) returns (ConfigResponse) {} } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala new file mode 100644 index 0000000000000..80f80d76e6539 --- /dev/null +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectConfigHandler.scala @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import scala.collection.JavaConverters._ + +import io.grpc.stub.StreamObserver + +import org.apache.spark.connect.proto +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf + +class SparkConnectConfigHandler(responseObserver: StreamObserver[proto.ConfigResponse]) + extends Logging { + + def handle(request: proto.ConfigRequest): Unit = { + val session = + SparkConnectService + .getOrCreateIsolatedSession(request.getUserContext.getUserId, request.getClientId) + .session + + val builder = handleSQLConf(request, session) + builder.setClientId(request.getClientId) + responseObserver.onNext(builder.build()) + responseObserver.onCompleted() + } + + private def handleSQLConf(request: proto.ConfigRequest, session: SparkSession) = { + val conf = session.conf + val builder = proto.ConfigResponse.newBuilder() + + request.getOperation match { + case proto.ConfigRequest.Operation.OPERATION_SET => + if (request.getKeysCount != request.getValuesCount) { + throw new UnsupportedOperationException("Keys and values should have the same length!") + } + request.getKeysList.asScala.iterator + .zip(request.getValuesList.asScala.iterator) + .foreach { case (key, value) => + conf.set(key, value) + getWarning(key).foreach(builder.addWarnings) + } + + case proto.ConfigRequest.Operation.OPERATION_GET => + if (request.getValuesCount == 0) { + request.getKeysList.asScala.iterator.foreach { key => + builder.addValues(conf.get(key)) + getWarning(key).foreach(builder.addWarnings) + } + } else { + if (request.getKeysCount != request.getValuesCount) { + throw new UnsupportedOperationException( + "Keys and values should have the same length!") + } + request.getKeysList.asScala.iterator + .zip(request.getValuesList.asScala.iterator) + .foreach { case (key, value) => + builder.addValues(conf.get(key, value)) + getWarning(key).foreach(builder.addWarnings) + } + } + + case proto.ConfigRequest.Operation.OPERATION_GET_OPTION => + request.getKeysList.asScala.iterator.foreach { key => + conf.getOption(key) match { + case Some(value) => + builder.addOptionalValues( + proto.ConfigResponse.OptionalValue.newBuilder().setValue(value).build()) + case _ => + builder.addOptionalValues(proto.ConfigResponse.OptionalValue.newBuilder().build()) + } + getWarning(key).foreach(builder.addWarnings) + } + + case proto.ConfigRequest.Operation.OPERATION_GET_ALL => + val results = if (request.hasPrefix) { + val prefix = request.getPrefix + conf.getAll.iterator + .filter { case (key, value) => key.startsWith(prefix) } + .map { case (key, value) => (key.substring(prefix.length), value) } + } else { + conf.getAll.iterator + } + results.foreach { case (key, value) => + builder.addKeys(key).addValues(value) + getWarning(key).foreach(builder.addWarnings) + } + + case proto.ConfigRequest.Operation.OPERATION_UNSET => + request.getKeysList.asScala.iterator.foreach { key => + conf.unset(key) + getWarning(key).foreach(builder.addWarnings) + } + + case proto.ConfigRequest.Operation.OPERATION_CONTAINS => + request.getKeysList.asScala.iterator.foreach { key => + builder.addBools(conf.contains(key)) + getWarning(key).foreach(builder.addWarnings) + } + + case proto.ConfigRequest.Operation.OPERATION_IS_MODIFIABLE => + request.getKeysList.asScala.iterator.foreach { key => + builder.addBools(conf.isModifiable(key)) + getWarning(key).foreach(builder.addWarnings) + } + + case other => + throw new UnsupportedOperationException(s"Unsupported operation $other in SQLConf.") + } + + builder + } + + private def getWarning(key: String): Option[String] = { + if (SparkConnectConfigHandler.unsupportedConfigurations.contains(key)) { + Some(s"The SQL config '$key' is NOT supported in Spark Connect") + } else { + SQLConf.deprecatedSQLConfigs.get(key).map(_.toDeprecationString) + } + } +} + +object SparkConnectConfigHandler { + + private[connect] val unsupportedConfigurations = Set("spark.sql.execution.arrow.enabled") +} diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala index 0a5b4197b7839..ed71513f98260 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectService.scala @@ -212,6 +212,21 @@ class SparkConnectService(debug: Boolean) response.setIsStreaming(ds.isStreaming) response.addAllInputFiles(ds.inputFiles.toSeq.asJava) } + + /** + * This is the main entry method for Spark Connect and all calls to update or fetch + * configuration.. + * + * @param request + * @param responseObserver + */ + override def config( + request: proto.ConfigRequest, + responseObserver: StreamObserver[proto.ConfigResponse]): Unit = { + try { + new SparkConnectConfigHandler(responseObserver).handle(request) + } catch handleError("config", observer = responseObserver) + } } /** diff --git a/python/pyspark/sql/connect/proto/base_pb2.py b/python/pyspark/sql/connect/proto/base_pb2.py index 0d86ce8cd687b..4ee8a5b84243e 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.py +++ b/python/pyspark/sql/connect/proto/base_pb2.py @@ -36,7 +36,7 @@ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x02\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString\x12\x1f\n\x0btree_string\x18\x04 \x01(\tR\ntreeString\x12\x19\n\x08is_local\x18\x05 \x01(\x08R\x07isLocal\x12!\n\x0cis_streaming\x18\x06 \x01(\x08R\x0bisStreaming\x12\x1f\n\x0binput_files\x18\x07 \x03(\tR\ninputFiles"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\x8f\x06\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12N\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchR\narrowBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType2\xc7\x01\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3' + b'\n\x18spark/connect/base.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x1cspark/connect/commands.proto\x1a\x1dspark/connect/relations.proto\x1a\x19spark/connect/types.proto"t\n\x04Plan\x12-\n\x04root\x18\x01 \x01(\x0b\x32\x17.spark.connect.RelationH\x00R\x04root\x12\x32\n\x07\x63ommand\x18\x02 \x01(\x0b\x32\x16.spark.connect.CommandH\x00R\x07\x63ommandB\t\n\x07op_type"\xb5\x01\n\x07\x45xplain\x12\x45\n\x0c\x65xplain_mode\x18\x01 \x01(\x0e\x32".spark.connect.Explain.ExplainModeR\x0b\x65xplainMode"c\n\x0b\x45xplainMode\x12\x14\n\x10MODE_UNSPECIFIED\x10\x00\x12\n\n\x06SIMPLE\x10\x01\x12\x0c\n\x08\x45XTENDED\x10\x02\x12\x0b\n\x07\x43ODEGEN\x10\x03\x12\x08\n\x04\x43OST\x10\x04\x12\r\n\tFORMATTED\x10\x05"z\n\x0bUserContext\x12\x17\n\x07user_id\x18\x01 \x01(\tR\x06userId\x12\x1b\n\tuser_name\x18\x02 \x01(\tR\x08userName\x12\x35\n\nextensions\x18\xe7\x07 \x03(\x0b\x32\x14.google.protobuf.AnyR\nextensions"\x81\x02\n\x12\x41nalyzePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x12\x30\n\x07\x65xplain\x18\x05 \x01(\x0b\x32\x16.spark.connect.ExplainR\x07\x65xplainB\x0e\n\x0c_client_type"\x8a\x02\n\x13\x41nalyzePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12/\n\x06schema\x18\x02 \x01(\x0b\x32\x17.spark.connect.DataTypeR\x06schema\x12%\n\x0e\x65xplain_string\x18\x03 \x01(\tR\rexplainString\x12\x1f\n\x0btree_string\x18\x04 \x01(\tR\ntreeString\x12\x19\n\x08is_local\x18\x05 \x01(\x08R\x07isLocal\x12!\n\x0cis_streaming\x18\x06 \x01(\x08R\x0bisStreaming\x12\x1f\n\x0binput_files\x18\x07 \x03(\tR\ninputFiles"\xcf\x01\n\x12\x45xecutePlanRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\'\n\x04plan\x18\x03 \x01(\x0b\x32\x13.spark.connect.PlanR\x04plan\x12$\n\x0b\x63lient_type\x18\x04 \x01(\tH\x00R\nclientType\x88\x01\x01\x42\x0e\n\x0c_client_type"\x8f\x06\n\x13\x45xecutePlanResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12N\n\x0b\x61rrow_batch\x18\x02 \x01(\x0b\x32-.spark.connect.ExecutePlanResponse.ArrowBatchR\narrowBatch\x12\x44\n\x07metrics\x18\x04 \x01(\x0b\x32*.spark.connect.ExecutePlanResponse.MetricsR\x07metrics\x1a=\n\nArrowBatch\x12\x1b\n\trow_count\x18\x01 \x01(\x03R\x08rowCount\x12\x12\n\x04\x64\x61ta\x18\x02 \x01(\x0cR\x04\x64\x61ta\x1a\x85\x04\n\x07Metrics\x12Q\n\x07metrics\x18\x01 \x03(\x0b\x32\x37.spark.connect.ExecutePlanResponse.Metrics.MetricObjectR\x07metrics\x1a\xcc\x02\n\x0cMetricObject\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x17\n\x07plan_id\x18\x02 \x01(\x03R\x06planId\x12\x16\n\x06parent\x18\x03 \x01(\x03R\x06parent\x12z\n\x11\x65xecution_metrics\x18\x04 \x03(\x0b\x32M.spark.connect.ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntryR\x10\x65xecutionMetrics\x1a{\n\x15\x45xecutionMetricsEntry\x12\x10\n\x03key\x18\x01 \x01(\tR\x03key\x12L\n\x05value\x18\x02 \x01(\x0b\x32\x36.spark.connect.ExecutePlanResponse.Metrics.MetricValueR\x05value:\x02\x38\x01\x1aX\n\x0bMetricValue\x12\x12\n\x04name\x18\x01 \x01(\tR\x04name\x12\x14\n\x05value\x18\x02 \x01(\x03R\x05value\x12\x1f\n\x0bmetric_type\x18\x03 \x01(\tR\nmetricType"\xcf\x03\n\rConfigRequest\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12=\n\x0cuser_context\x18\x02 \x01(\x0b\x32\x1a.spark.connect.UserContextR\x0buserContext\x12\x44\n\toperation\x18\x03 \x01(\x0e\x32&.spark.connect.ConfigRequest.OperationR\toperation\x12\x12\n\x04keys\x18\x04 \x03(\tR\x04keys\x12\x16\n\x06values\x18\x05 \x03(\tR\x06values\x12\x1b\n\x06prefix\x18\x06 \x01(\tH\x00R\x06prefix\x88\x01\x01"\xc7\x01\n\tOperation\x12\x19\n\x15OPERATION_UNSPECIFIED\x10\x00\x12\x11\n\rOPERATION_SET\x10\x01\x12\x11\n\rOPERATION_GET\x10\x02\x12\x18\n\x14OPERATION_GET_OPTION\x10\x03\x12\x15\n\x11OPERATION_GET_ALL\x10\x04\x12\x13\n\x0fOPERATION_UNSET\x10\x05\x12\x16\n\x12OPERATION_CONTAINS\x10\x06\x12\x1b\n\x17OPERATION_IS_MODIFIABLE\x10\x07\x42\t\n\x07_prefix"\x97\x02\n\x0e\x43onfigResponse\x12\x1b\n\tclient_id\x18\x01 \x01(\tR\x08\x63lientId\x12\x12\n\x04keys\x18\x02 \x03(\tR\x04keys\x12\x16\n\x06values\x18\x03 \x03(\tR\x06values\x12T\n\x0foptional_values\x18\x04 \x03(\x0b\x32+.spark.connect.ConfigResponse.OptionalValueR\x0eoptionalValues\x12\x14\n\x05\x62ools\x18\x05 \x03(\x08R\x05\x62ools\x12\x1a\n\x08warnings\x18\x06 \x03(\tR\x08warnings\x1a\x34\n\rOptionalValue\x12\x19\n\x05value\x18\x01 \x01(\tH\x00R\x05value\x88\x01\x01\x42\x08\n\x06_value2\x90\x02\n\x13SparkConnectService\x12X\n\x0b\x45xecutePlan\x12!.spark.connect.ExecutePlanRequest\x1a".spark.connect.ExecutePlanResponse"\x00\x30\x01\x12V\n\x0b\x41nalyzePlan\x12!.spark.connect.AnalyzePlanRequest\x1a".spark.connect.AnalyzePlanResponse"\x00\x12G\n\x06\x43onfig\x12\x1c.spark.connect.ConfigRequest\x1a\x1d.spark.connect.ConfigResponse"\x00\x42"\n\x1eorg.apache.spark.connect.protoP\x01\x62\x06proto3' ) @@ -58,7 +58,11 @@ _EXECUTEPLANRESPONSE_METRICS_METRICVALUE = _EXECUTEPLANRESPONSE_METRICS.nested_types_by_name[ "MetricValue" ] +_CONFIGREQUEST = DESCRIPTOR.message_types_by_name["ConfigRequest"] +_CONFIGRESPONSE = DESCRIPTOR.message_types_by_name["ConfigResponse"] +_CONFIGRESPONSE_OPTIONALVALUE = _CONFIGRESPONSE.nested_types_by_name["OptionalValue"] _EXPLAIN_EXPLAINMODE = _EXPLAIN.enum_types_by_name["ExplainMode"] +_CONFIGREQUEST_OPERATION = _CONFIGREQUEST.enum_types_by_name["Operation"] Plan = _reflection.GeneratedProtocolMessageType( "Plan", (_message.Message,), @@ -186,6 +190,38 @@ _sym_db.RegisterMessage(ExecutePlanResponse.Metrics.MetricObject.ExecutionMetricsEntry) _sym_db.RegisterMessage(ExecutePlanResponse.Metrics.MetricValue) +ConfigRequest = _reflection.GeneratedProtocolMessageType( + "ConfigRequest", + (_message.Message,), + { + "DESCRIPTOR": _CONFIGREQUEST, + "__module__": "spark.connect.base_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.ConfigRequest) + }, +) +_sym_db.RegisterMessage(ConfigRequest) + +ConfigResponse = _reflection.GeneratedProtocolMessageType( + "ConfigResponse", + (_message.Message,), + { + "OptionalValue": _reflection.GeneratedProtocolMessageType( + "OptionalValue", + (_message.Message,), + { + "DESCRIPTOR": _CONFIGRESPONSE_OPTIONALVALUE, + "__module__": "spark.connect.base_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.ConfigResponse.OptionalValue) + }, + ), + "DESCRIPTOR": _CONFIGRESPONSE, + "__module__": "spark.connect.base_pb2" + # @@protoc_insertion_point(class_scope:spark.connect.ConfigResponse) + }, +) +_sym_db.RegisterMessage(ConfigResponse) +_sym_db.RegisterMessage(ConfigResponse.OptionalValue) + _SPARKCONNECTSERVICE = DESCRIPTOR.services_by_name["SparkConnectService"] if _descriptor._USE_C_DESCRIPTORS == False: @@ -219,6 +255,14 @@ _EXECUTEPLANRESPONSE_METRICS_METRICOBJECT_EXECUTIONMETRICSENTRY._serialized_end = 2017 _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_start = 2019 _EXECUTEPLANRESPONSE_METRICS_METRICVALUE._serialized_end = 2107 - _SPARKCONNECTSERVICE._serialized_start = 2110 - _SPARKCONNECTSERVICE._serialized_end = 2309 + _CONFIGREQUEST._serialized_start = 2110 + _CONFIGREQUEST._serialized_end = 2573 + _CONFIGREQUEST_OPERATION._serialized_start = 2363 + _CONFIGREQUEST_OPERATION._serialized_end = 2562 + _CONFIGRESPONSE._serialized_start = 2576 + _CONFIGRESPONSE._serialized_end = 2855 + _CONFIGRESPONSE_OPTIONALVALUE._serialized_start = 2803 + _CONFIGRESPONSE_OPTIONALVALUE._serialized_end = 2855 + _SPARKCONNECTSERVICE._serialized_start = 2858 + _SPARKCONNECTSERVICE._serialized_end = 3130 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi index ea82aaf21e252..21feba0e88f48 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.pyi +++ b/python/pyspark/sql/connect/proto/base_pb2.pyi @@ -570,3 +570,225 @@ class ExecutePlanResponse(google.protobuf.message.Message): ) -> None: ... global___ExecutePlanResponse = ExecutePlanResponse + +class ConfigRequest(google.protobuf.message.Message): + """Request to update or fetch the configurations.""" + + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class _Operation: + ValueType = typing.NewType("ValueType", builtins.int) + V: typing_extensions.TypeAlias = ValueType + + class _OperationEnumTypeWrapper( + google.protobuf.internal.enum_type_wrapper._EnumTypeWrapper[ + ConfigRequest._Operation.ValueType + ], + builtins.type, + ): # noqa: F821 + DESCRIPTOR: google.protobuf.descriptor.EnumDescriptor + OPERATION_UNSPECIFIED: ConfigRequest._Operation.ValueType # 0 + OPERATION_SET: ConfigRequest._Operation.ValueType # 1 + OPERATION_GET: ConfigRequest._Operation.ValueType # 2 + OPERATION_GET_OPTION: ConfigRequest._Operation.ValueType # 3 + OPERATION_GET_ALL: ConfigRequest._Operation.ValueType # 4 + OPERATION_UNSET: ConfigRequest._Operation.ValueType # 5 + OPERATION_CONTAINS: ConfigRequest._Operation.ValueType # 6 + OPERATION_IS_MODIFIABLE: ConfigRequest._Operation.ValueType # 7 + + class Operation(_Operation, metaclass=_OperationEnumTypeWrapper): ... + OPERATION_UNSPECIFIED: ConfigRequest.Operation.ValueType # 0 + OPERATION_SET: ConfigRequest.Operation.ValueType # 1 + OPERATION_GET: ConfigRequest.Operation.ValueType # 2 + OPERATION_GET_OPTION: ConfigRequest.Operation.ValueType # 3 + OPERATION_GET_ALL: ConfigRequest.Operation.ValueType # 4 + OPERATION_UNSET: ConfigRequest.Operation.ValueType # 5 + OPERATION_CONTAINS: ConfigRequest.Operation.ValueType # 6 + OPERATION_IS_MODIFIABLE: ConfigRequest.Operation.ValueType # 7 + + CLIENT_ID_FIELD_NUMBER: builtins.int + USER_CONTEXT_FIELD_NUMBER: builtins.int + OPERATION_FIELD_NUMBER: builtins.int + KEYS_FIELD_NUMBER: builtins.int + VALUES_FIELD_NUMBER: builtins.int + PREFIX_FIELD_NUMBER: builtins.int + client_id: builtins.str + """(Required) + + The client_id is set by the client to be able to collate streaming responses from + different queries. + """ + @property + def user_context(self) -> global___UserContext: + """(Required) User context""" + operation: global___ConfigRequest.Operation.ValueType + @property + def keys( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """(Optional) + + Identify which keys will be updated or fetched. + Required when the 'operation' is 'SET', 'GET', 'GET_OPTION', 'UNSET', + 'CONTAINS', 'IS_MODIFIABLE'. + """ + @property + def values( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """(Optional) + + Corresponding values to the keys. + Required when the 'operation' is 'SET'. + Optional when the 'operation' is 'GET', the values here will be used + as the default values. + """ + prefix: builtins.str + """(Optional) + + Only used when the 'operation' is 'GET_ALL'. + If prefix is given, only parameters that start with the prefix will be returned. + """ + def __init__( + self, + *, + client_id: builtins.str = ..., + user_context: global___UserContext | None = ..., + operation: global___ConfigRequest.Operation.ValueType = ..., + keys: collections.abc.Iterable[builtins.str] | None = ..., + values: collections.abc.Iterable[builtins.str] | None = ..., + prefix: builtins.str | None = ..., + ) -> None: ... + def HasField( + self, + field_name: typing_extensions.Literal[ + "_prefix", b"_prefix", "prefix", b"prefix", "user_context", b"user_context" + ], + ) -> builtins.bool: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "_prefix", + b"_prefix", + "client_id", + b"client_id", + "keys", + b"keys", + "operation", + b"operation", + "prefix", + b"prefix", + "user_context", + b"user_context", + "values", + b"values", + ], + ) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_prefix", b"_prefix"] + ) -> typing_extensions.Literal["prefix"] | None: ... + +global___ConfigRequest = ConfigRequest + +class ConfigResponse(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + class OptionalValue(google.protobuf.message.Message): + DESCRIPTOR: google.protobuf.descriptor.Descriptor + + VALUE_FIELD_NUMBER: builtins.int + value: builtins.str + def __init__( + self, + *, + value: builtins.str | None = ..., + ) -> None: ... + def HasField( + self, field_name: typing_extensions.Literal["_value", b"_value", "value", b"value"] + ) -> builtins.bool: ... + def ClearField( + self, field_name: typing_extensions.Literal["_value", b"_value", "value", b"value"] + ) -> None: ... + def WhichOneof( + self, oneof_group: typing_extensions.Literal["_value", b"_value"] + ) -> typing_extensions.Literal["value"] | None: ... + + CLIENT_ID_FIELD_NUMBER: builtins.int + KEYS_FIELD_NUMBER: builtins.int + VALUES_FIELD_NUMBER: builtins.int + OPTIONAL_VALUES_FIELD_NUMBER: builtins.int + BOOLS_FIELD_NUMBER: builtins.int + WARNINGS_FIELD_NUMBER: builtins.int + client_id: builtins.str + @property + def keys( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """(Optional) + + Available when the operation is 'GET_ALL'. + """ + @property + def values( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """(Optional) + + Available when the operation is 'GET', 'GET_ALL'. + """ + @property + def optional_values( + self, + ) -> google.protobuf.internal.containers.RepeatedCompositeFieldContainer[ + global___ConfigResponse.OptionalValue + ]: + """(Optional) + + Available when the operation is 'GET_OPTION'. + """ + @property + def bools( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.bool]: + """(Optional) + + Available when the operation is 'CONTAINS', 'IS_MODIFIABLE'. + """ + @property + def warnings( + self, + ) -> google.protobuf.internal.containers.RepeatedScalarFieldContainer[builtins.str]: + """(Optional) + + Warning messages for deprecated or unsupported configurations. + """ + def __init__( + self, + *, + client_id: builtins.str = ..., + keys: collections.abc.Iterable[builtins.str] | None = ..., + values: collections.abc.Iterable[builtins.str] | None = ..., + optional_values: collections.abc.Iterable[global___ConfigResponse.OptionalValue] + | None = ..., + bools: collections.abc.Iterable[builtins.bool] | None = ..., + warnings: collections.abc.Iterable[builtins.str] | None = ..., + ) -> None: ... + def ClearField( + self, + field_name: typing_extensions.Literal[ + "bools", + b"bools", + "client_id", + b"client_id", + "keys", + b"keys", + "optional_values", + b"optional_values", + "values", + b"values", + "warnings", + b"warnings", + ], + ) -> None: ... + +global___ConfigResponse = ConfigResponse diff --git a/python/pyspark/sql/connect/proto/base_pb2_grpc.py b/python/pyspark/sql/connect/proto/base_pb2_grpc.py index aff5897f520f8..007e31fd0ea3b 100644 --- a/python/pyspark/sql/connect/proto/base_pb2_grpc.py +++ b/python/pyspark/sql/connect/proto/base_pb2_grpc.py @@ -40,6 +40,11 @@ def __init__(self, channel): request_serializer=spark_dot_connect_dot_base__pb2.AnalyzePlanRequest.SerializeToString, response_deserializer=spark_dot_connect_dot_base__pb2.AnalyzePlanResponse.FromString, ) + self.Config = channel.unary_unary( + "/spark.connect.SparkConnectService/Config", + request_serializer=spark_dot_connect_dot_base__pb2.ConfigRequest.SerializeToString, + response_deserializer=spark_dot_connect_dot_base__pb2.ConfigResponse.FromString, + ) class SparkConnectServiceServicer(object): @@ -60,6 +65,12 @@ def AnalyzePlan(self, request, context): context.set_details("Method not implemented!") raise NotImplementedError("Method not implemented!") + def Config(self, request, context): + """Update or fetch the configurations and returns a [[ConfigResponse]] containing the result.""" + context.set_code(grpc.StatusCode.UNIMPLEMENTED) + context.set_details("Method not implemented!") + raise NotImplementedError("Method not implemented!") + def add_SparkConnectServiceServicer_to_server(servicer, server): rpc_method_handlers = { @@ -73,6 +84,11 @@ def add_SparkConnectServiceServicer_to_server(servicer, server): request_deserializer=spark_dot_connect_dot_base__pb2.AnalyzePlanRequest.FromString, response_serializer=spark_dot_connect_dot_base__pb2.AnalyzePlanResponse.SerializeToString, ), + "Config": grpc.unary_unary_rpc_method_handler( + servicer.Config, + request_deserializer=spark_dot_connect_dot_base__pb2.ConfigRequest.FromString, + response_serializer=spark_dot_connect_dot_base__pb2.ConfigResponse.SerializeToString, + ), } generic_handler = grpc.method_handlers_generic_handler( "spark.connect.SparkConnectService", rpc_method_handlers @@ -141,3 +157,32 @@ def AnalyzePlan( timeout, metadata, ) + + @staticmethod + def Config( + request, + target, + options=(), + channel_credentials=None, + call_credentials=None, + insecure=False, + compression=None, + wait_for_ready=None, + timeout=None, + metadata=None, + ): + return grpc.experimental.unary_unary( + request, + target, + "/spark.connect.SparkConnectService/Config", + spark_dot_connect_dot_base__pb2.ConfigRequest.SerializeToString, + spark_dot_connect_dot_base__pb2.ConfigResponse.FromString, + options, + channel_credentials, + insecure, + call_credentials, + compression, + wait_for_ready, + timeout, + metadata, + ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index e764e0510d96c..8699ec895b6ce 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -4158,7 +4158,12 @@ object SQLConf { * @param comment Additional info regarding to the removed config. For example, * reasons of config deprecation, what users should use instead of it. */ - case class DeprecatedConfig(key: String, version: String, comment: String) + case class DeprecatedConfig(key: String, version: String, comment: String) { + def toDeprecationString: String = { + s"The SQL config '$key' has been deprecated in Spark v$version " + + s"and may be removed in the future. $comment" + } + } /** * Maps deprecated SQL config keys to information about the deprecation. @@ -5127,11 +5132,8 @@ class SQLConf extends Serializable with Logging { * Logs a warning message if the given config key is deprecated. */ private def logDeprecationWarning(key: String): Unit = { - SQLConf.deprecatedSQLConfigs.get(key).foreach { - case DeprecatedConfig(configName, version, comment) => - logWarning( - s"The SQL config '$configName' has been deprecated in Spark v$version " + - s"and may be removed in the future. $comment") + SQLConf.deprecatedSQLConfigs.get(key).foreach { config => + logWarning(config.toDeprecationString) } }