Skip to content

Conversation

@grundprinzip
Copy link
Contributor

What changes were proposed in this pull request?

To be able to modify the incoming requests for the Spark Connect GRPC service, for example to be able to translate metadata from the HTTP/2 request to values in the proto message the GRPC service needs to be configured using an interceptor.

This patch adds two ways to configure interceptors for the GRPC service. First, we can now configure interceptors in the SparkConnectInterceptorRegistry by adding a value to the interceptorChain like in the example below:

object SparkConnectInterceptorRegistry {

  // Contains the list of configured interceptors.
  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
    interceptor[LoggingInterceptor](classOf[LoggingInterceptor])
  )
  // ...
}

The second way to configure interceptors is by configuring them using Spark configuration values at startup. Therefore a new config key has been added called: spark.connect.grpc.interceptor.classes. This config value contains a comma-separated list of classes that are added as interceptors to the system.

./bin/pyspark --conf spark.connect.grpc.interceptor.classes=com.my.important.LoggingInterceptor

During startup all of the interceptors are added in order to the NettyServerBuilder.

// Add all registered interceptors to the server builder.
SparkConnectInterceptorRegistry.chainInterceptors(sb)

Why are the changes needed?

Provide a configurable and extensible way to configure interceptors.

Does this PR introduce any user-facing change?

No

How was this patch tested?

Unit Tests

@grundprinzip
Copy link
Contributor Author

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@amaliujia
Copy link
Contributor

LGTM

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in 4ba7ce2 Oct 24, 2022
* added to the GRPC server in order of their position in the list. Once the statically compiled
* interceptors are added, dynamically configured interceptors are added.
*/
object SparkConnectInterceptorRegistry {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this an API? We should mark @Unstable and added version.

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or it has to be private[service]. Or we should at least mention what are supposed to be an API at src/main/scala/org/apache/spark/sql/connect/package.scala

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought everything under org.apache.spark.sql.connect.service is private, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not .. unless we document so ... Should probably either document it like https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/package.scala or explicitly make it private/public.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's document it, just like what catalyst does. cc @amaliujia

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense. Let me check places that is intentional to be private or internal API but are documented. I can follow up on it.

.intConf
.createWithDefault(15002)

val CONNECT_GRPC_INTERCEPTOR_CLASSES =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just realized that this is under apach.spark.sql. ... we should either move this module out of sql or use StaticSQLConf ideally.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is in the connect module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but it's under apache.spark.sql.connect. Should probably move it to apache.spark.connect?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right now, everything in Spark connect is under apache.spark.sql.connect. Are you proposing a overall package renaming?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I am proposing before it's too late.

Either: if we target to cover other components too, should probably rename them before it's too late. For PySpark too, should probably move it from pyspark.sql.connect.DataFrame -> pyspark.connect.sql.DataFrame.

Or, use StaticSQLConf since we're in SQL package. We're doing this in Hive thirft server, Hive modules, etc.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then we should better use StaticSQLConf or SQLConf instead of SparkConf.

Copy link
Member

@HyukjinKwon HyukjinKwon Oct 25, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that it's mixed. It's in SQL package but the configuration being used is SparkConf. The configuration name doesn't follow it either.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StaticSQLConf and SQLConf are in the SQL module, it's weird to add spark connect configs there...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's what Hive thriftserver (separate module) dose. Avro (separate module) and Kafka (separate module for Structured Streaming) do. Pandas API on Spark also leverages runtime configurations via SparkSession under the hood instead of SparkConf.

This is weird that it's a SQL thing uses SQL package namespace but it doesn't use SQLConf.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what's the benefit of doing so. Config definition can be anywhere and we can still use SQLConf to access it. e.g. SQLConf.get.getConf(CONNECT_GRPC_INTERCEPTOR_CLASSES).

Copy link
Member

@HyukjinKwon HyukjinKwon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. couple of comments

SandishKumarHN pushed a commit to SandishKumarHN/spark that referenced this pull request Dec 12, 2022
### What changes were proposed in this pull request?
To be able to modify the incoming requests for the Spark Connect GRPC service, for example to be able to translate metadata from the HTTP/2 request to values in the proto message the GRPC service needs to be configured using an interceptor.

This patch adds two ways to configure interceptors for the GRPC service. First, we can now configure interceptors in the `SparkConnectInterceptorRegistry` by adding a value to the `interceptorChain` like in the example below:

```
object SparkConnectInterceptorRegistry {

  // Contains the list of configured interceptors.
  private lazy val interceptorChain: Seq[InterceptorBuilder] = Seq(
    interceptor[LoggingInterceptor](classOf[LoggingInterceptor])
  )
  // ...
}
```

The second way to configure interceptors is by configuring them using Spark configuration values at startup. Therefore a new config key has been added called: `spark.connect.grpc.interceptor.classes`. This config value contains a comma-separated list of classes that are added as interceptors to the system.

```
./bin/pyspark --conf spark.connect.grpc.interceptor.classes=com.my.important.LoggingInterceptor
```

During startup all of the interceptors are added in order to the `NettyServerBuilder`.

```
// Add all registered interceptors to the server builder.
SparkConnectInterceptorRegistry.chainInterceptors(sb)
```

### Why are the changes needed?
Provide a configurable and extensible way to configure interceptors.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Unit Tests

Closes apache#38320 from grundprinzip/SPARK-40857.

Lead-authored-by: Martin Grund <martin.grund@databricks.com>
Co-authored-by: Martin Grund <grundprinzip@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants