Skip to content

Commit

Permalink
Allow list of resopnseFormat (#426)
Browse files Browse the repository at this point in the history
* Allow list of resopnseFormat

* clean up

* add comment

* update protocol

* update comment
  • Loading branch information
linzhou-db authored Oct 22, 2023
1 parent 2b8f6c3 commit d9e9417
Show file tree
Hide file tree
Showing 9 changed files with 285 additions and 186 deletions.
7 changes: 5 additions & 2 deletions PROTOCOL.md
Original file line number Diff line number Diff line change
Expand Up @@ -2442,9 +2442,12 @@ This header can be used in the request for [Query Table Metadata](#query-table-m
</td>
<td>The header is processed properly by the server.

The server may choose to respond in parquet format if the table does not have any advanced features.
If there's only one responseFormat specified, the server must respect and return in the requested format.

The server must respond in delta format if the table has advanced features which are not compatible with the parquet format.</td>
If there's a list of responseFormat specified, such as `responseFormat=delta,parquet`. The server
may choose to respond in parquet format if the table does not have any advanced features. The server
must respond in delta format if the table has advanced features which are not compatible with the parquet format.
</td>
</tr>
</table>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@ class DeltaSharingRestClient(

@volatile private var created = false

// Convert the responseFormat to a Seq to be used later.
private val responseFormatSet = responseFormat.split(",").toSet

private lazy val client = {
val clientBuilder: HttpClientBuilder = if (sslTrustAll) {
val sslBuilder = new SSLContextBuilder()
Expand Down Expand Up @@ -224,24 +227,17 @@ class DeltaSharingRestClient(
}

/**
* Compare requestedFormat and respondedFormat, only error out when requested parquet but got
* delta in response. The client allows backward compatibility: requested delta but got parquet
* in response.
* Compare responseFormatSet and respondedFormat, error out when responseFormatSet doesn't contain
* respondedFormat. The client allows backward compatibility by specifying
* responseFormat=parquet,delta in the request header.
*/
private def checkRespondedFormat(
requestedFormat: String, respondedFormat: String, rpc: String, table: String): Unit = {
if (requestedFormat == respondedFormat) {
return
}
if (responseFormat == RESPONSE_FORMAT_PARQUET && respondedFormat == RESPONSE_FORMAT_DELTA) {
private def checkRespondedFormat(respondedFormat: String, rpc: String, table: String): Unit = {
if (!responseFormatSet.contains(respondedFormat)) {
logError(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table.")
throw new IllegalArgumentException("The responseFormat returned from the delta sharing " +
s"server doesn't match the requested responseFormat: respondedFormat($respondedFormat)" +
s" != requestedFormat($responseFormat).")
} else {
logWarning(s"RespondedFormat($respondedFormat) is different from requested " +
s"responseFormat($responseFormat) for $rpc for table $table.")
}
}

Expand All @@ -260,7 +256,6 @@ class DeltaSharingRestClient(
val (version, respondedFormat, lines) = getNDJson(target)

checkRespondedFormat(
responseFormat,
respondedFormat,
rpc = "getMetadata",
table = s"${table.share}.${table.schema}.${table.name}"
Expand Down Expand Up @@ -336,7 +331,6 @@ class DeltaSharingRestClient(
}

checkRespondedFormat(
responseFormat,
respondedFormat,
rpc = s"getFiles(versionAsOf-$versionAsOf, timestampAsOf-$timestampAsOf)",
table = s"${table.share}.${table.schema}.${table.name}"
Expand Down Expand Up @@ -408,7 +402,6 @@ class DeltaSharingRestClient(
}

checkRespondedFormat(
responseFormat,
respondedFormat,
rpc = s"getFiles(startingVersion:$startingVersion, endingVersion:$endingVersion)",
table = s"${table.share}.${table.schema}.${table.name}"
Expand Down Expand Up @@ -532,7 +525,6 @@ class DeltaSharingRestClient(
}

checkRespondedFormat(
responseFormat,
respondedFormat,
rpc = s"getCDFFiles(cdfOptions:$cdfOptions)",
table = s"${table.share}.${table.schema}.${table.name}."
Expand Down Expand Up @@ -894,7 +886,7 @@ class DeltaSharingRestClient(
// Example: "capability1=value1;capability2=value3,value4,value5"
private def getDeltaSharingCapabilities(): String = {
var capabilities = Seq[String](s"${RESPONSE_FORMAT}=$responseFormat")
if (responseFormat == RESPONSE_FORMAT_DELTA && readerFeatures.nonEmpty) {
if (responseFormatSet.contains(RESPONSE_FORMAT_DELTA) && readerFeatures.nonEmpty) {
capabilities = capabilities :+ s"$READER_FEATURES=$readerFeatures"
}
capabilities.mkString(DELTA_SHARING_CAPABILITIES_DELIMITER)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,15 @@ trait DeltaSharingReadOptions extends DeltaSharingOptionParser {

val timestampAsOf = options.get(TIME_TRAVEL_TIMESTAMP).map(getFormattedTimestamp(_))

val responseFormat = options.get(RESPONSE_FORMAT).getOrElse(RESPONSE_FORMAT_PARQUET)
val responseFormat = options.get(RESPONSE_FORMAT).map { str =>
if (!(str == RESPONSE_FORMAT_PARQUET || str == RESPONSE_FORMAT_DELTA)) {
throw DeltaSharingErrors.illegalDeltaSharingOptionException(
RESPONSE_FORMAT, str,
s"The user input must be one of:{$RESPONSE_FORMAT_PARQUET, $RESPONSE_FORMAT_DELTA}."
)
}
str
}.getOrElse(RESPONSE_FORMAT_PARQUET)

def isTimeTravel: Boolean = versionAsOf.isDefined || timestampAsOf.isDefined

Expand Down
Loading

0 comments on commit d9e9417

Please sign in to comment.