-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support Iceberg ingestion from REST based catalogs #17124
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall, thanks! A few suggestions.
@@ -31,7 +31,7 @@ Iceberg refers to these metastores as catalogs. The Iceberg extension lets you c | |||
* Hive metastore catalog | |||
* Local catalog |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
* Local catalog | |
* Local catalog | |
* REST-based catalog |
@JacksonInject @HiveConf Configuration configuration | ||
) | ||
{ | ||
this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consider using InvalidInput.exception()
docs/ingestion/input-sources.md
Outdated
### Iceberg filter object | ||
|
||
This input source provides the following filters: `and`, `equals`, `interval`, and `or`. You can use these filters to filter out data files from a snapshot, reducing the number of files Druid has to ingest. | ||
If the filter column is not an Iceberg partition column, it is highly recommended to define an additional filter defined in the [`transformSpec`](./ingestion-spec.md#transformspec). This is because for non-partition columns, Iceberg filters may return rows that do not match the expression. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This filtering behavior applies to Delta Lake as well. I think typically, in the Lakehouse world, filtering is performed on table partition columns. Filtering on non-partitioned columns are best-effort.
I'm not sure if transformSpec
would fully guarantee additional filtering in all scenarios. Perhaps for these docs, we can:
- Highly recommend filtering on Iceberg partitioned columns
- If filtering on non-partitioned columns, call out that it's best-effort and recommend using additional filters by defining it in a
transformSpec
if applicable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the comments.
Curious, have you encountered any cases where the transformSpec
doesn't fully guarantee additional filtering on top of the delta lake filters?
Transform spec filters have worked out well for us in this case and so I'm keen to understand if there are any gotchas.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good to know. I was just imagining a scenario where the transformSpec
filters in Druid don't map 1:1 to a native filter that Delta Lake supports. I did a quick search, and it seems they can be mapped to the ones we already support at least, so I think we should be good.
docs/ingestion/input-sources.md
Outdated
@@ -1063,7 +1063,7 @@ The following is a sample spec for a S3 warehouse source: | |||
|
|||
### Catalog Object | |||
|
|||
The catalog object supports `local` and `hive` catalog types. | |||
The catalog object supports `local`,`hive` and `rest` catalog types. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: maybe reorder this and the catalog sections by relevance/usage — rest, hive, and local?
HttpServer server = null; | ||
ServerSocket serverSocket = null; | ||
try { | ||
serverSocket = new ServerSocket(0); | ||
int port = serverSocket.getLocalPort(); | ||
serverSocket.close(); | ||
server = HttpServer.create(new InetSocketAddress("localhost", port), 0); | ||
server.createContext( | ||
"/v1/config", // API for catalog fetchConfig which is invoked on catalog initialization | ||
(httpExchange) -> { | ||
String payload = "{}"; | ||
byte[] outputBytes = payload.getBytes(StandardCharsets.UTF_8); | ||
httpExchange.sendResponseHeaders(200, outputBytes.length); | ||
OutputStream os = httpExchange.getResponseBody(); | ||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_TYPE, "application/octet-stream"); | ||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_LENGTH, String.valueOf(outputBytes.length)); | ||
httpExchange.getResponseHeaders().set(HttpHeaders.CONTENT_RANGE, "bytes 0"); | ||
os.write(outputBytes); | ||
os.close(); | ||
} | ||
); | ||
server.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be in test setup()
?
Adds support to the iceberg input source to read from Iceberg REST Catalogs.
Description
iceberg
input source to read from Iceberg REST catalogs.1.6.1
Release note
Adds support to the
iceberg
input source to read from Iceberg REST Catalogs.This PR has: