-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WIP] HBase connector #6037
[WIP] HBase connector #6037
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add tests
@@ -0,0 +1,146 @@ | |||
<?xml version="1.0" encoding="UTF-8"?> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you make this module to be submodule of presto like other connector modules are. Also use <packaging>presto-plugin</packaging>
. Take a look at presto-kafka for example. Then all below comments and Plugin
file wouldn't be needed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address it in next push.
<groupId>org.apache.hbase</groupId> | ||
<artifactId>hbase-client</artifactId> | ||
<version>${hbase.version}</version> | ||
</dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will address it in next push.
|
||
/** | ||
* Column Name | ||
*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove such comments where it is obvious from the code what particular thing does
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copy/paste from kafka. Will address it in next push.
|
||
import static java.util.Objects.requireNonNull; | ||
|
||
public class HBaseConnectorId |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this wrapper is needed? Is it just for readability? I would just use string explicitly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then you would have to use annotations to use that String as @Inject
target. I think this class is fine.
|
||
public static Set<HBaseInternalFieldDescription> getInternalFields() | ||
{ | ||
return ImmutableSet.of(PARTITION_ID_FIELD, PARTITION_OFFSET_FIELD, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make each entry in a separate line
host.getPort(), | ||
connectTimeoutMillis, | ||
bufferSizeBytes, | ||
format("presto-hbase-%s-%s", connectorId, nodeManager.getCurrentNode().getNodeIdentifier()));*/ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove commented code
|
||
private static <T> T selectRandom(Iterable<T> iterable) | ||
{ | ||
List<T> list = ImmutableList.copyOf(iterable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe store nodes as list, so you would not need to copy them each time you find splits
|
||
|
||
for (HRegionLocation regionLocation : regionLocationList) { | ||
HRegionInfo regionInfo = regionLocation.getRegionInfo(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
format (indentation) this file
.add("fields", fields) | ||
.toString(); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no new line
try { | ||
tableName = SchemaTableName.valueOf(definedTable); | ||
} | ||
catch (IllegalArgumentException iae) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it looks like a hack. Can you replace try-catch
with if
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
once you apply my comments please squash the commits
@@ -0,0 +1,256 @@ | |||
package com.facebook.presto.hbase; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no copyrights
public void close() { | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no new line
import io.airlift.slice.Slice; | ||
import io.airlift.slice.Slices; | ||
|
||
public class HBaseRecordCursor implements RecordCursor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how it is extracted where no there is no deletion lines.
</build> | ||
<groupId>presto-hbase</groupId> | ||
<artifactId>presto-hbase</artifactId> | ||
<version>0.0.1-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do not declare version explicitly. Inherit from parent.
<version>3.1</version> | ||
</dependency> | ||
|
||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use spaces for formatting.
For java code follow https://github.com/airlift/codestyle
</profile> | ||
</profiles> | ||
|
||
</project> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
new line missing
/** | ||
* Mapping hint for the decoder. Can be null. | ||
*/ | ||
private final String mapping; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Optional
instead of making field nullable.
/** | ||
* Data format to use (selects the decoder). Can be null. | ||
*/ | ||
private final String dataFormat; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use Optional
|
||
// private Iterator<MessageAndOffset> messageAndOffsetIterator; | ||
//private Result messageAndOffsetIterator; | ||
private final AtomicBoolean reported = new AtomicBoolean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
seems like most - if not all these fields should be declared as final.
HBaseRecordCursor(HBaseSplit split, Set<FieldValueProvider> globalInternalFieldValueProviders, List<DecoderColumnHandle> columnHandles, ResultScanner scanner, | ||
RowDecoder keyDecoder, RowDecoder messageDecoder, Map<DecoderColumnHandle, FieldDecoder<?>> keyFieldDecoders, | ||
Map<DecoderColumnHandle, FieldDecoder<?>> messageFieldDecoders) { | ||
this.split = split; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check for nulls
|
||
@Override | ||
public long getCompletedBytes() { | ||
return totalBytes; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather use 0 and add TODO to add stats support.
|
||
@Override | ||
public Type getType(int field) { | ||
checkArgument(field < columnHandles.size(), "Invalid field index"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add index to error message
private long totalMessages; | ||
|
||
// private Iterator<MessageAndOffset> messageAndOffsetIterator; | ||
//private Result messageAndOffsetIterator; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this file has bad indentation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Enough for first pass. Let us know when you address the comments.
private Map<String, String> optionalConfig = ImmutableMap.of(); | ||
|
||
@Override | ||
public synchronized void setOptionalConfig(Map<String, String> optionalConfig) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it not synchronized
} | ||
|
||
@Override | ||
public synchronized Iterable<ConnectorFactory> getConnectorFactories(ConnectorFactoryContext context) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make it not synchronized
} | ||
|
||
@VisibleForTesting | ||
public synchronized void setTableDescriptionSupplier(Supplier<Map<SchemaTableName, HBaseTableDescription>> tableDescriptionSupplier) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also I do not belive synchronized would be needed here.
|
||
private static final Logger log = Logger.get(HBaseRecordCursor.class); | ||
|
||
//private static final int HBASE_READ_BUFFER_SIZE = 100_000; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@Override | ||
public boolean advanceNextPosition() { | ||
try | ||
{ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this {
should be in previous line
fieldValueProviders.addAll(globalInternalFieldValueProviders); | ||
fieldValueProviders.add(HBaseInternalFieldDescription.SEGMENT_COUNT_FIELD.forLongValue(totalMessages)); | ||
fieldValueProviders.add(HBaseInternalFieldDescription.PARTITION_OFFSET_FIELD | ||
.forLongValue(0 /* messageAndOffset.offset() */)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
be consistent with either wrapping or not wrapping lines here.
fieldValueProviders | ||
.add(HBaseInternalFieldDescription.KEY_LENGTH_FIELD.forLongValue(messageAndOffset.getRow().length)); | ||
|
||
fieldValueProviders.add(HBaseInternalFieldDescription.KEY_CORRUPT_FIELD.forBooleanValue(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
always true?
.add(HBaseInternalFieldDescription.KEY_LENGTH_FIELD.forLongValue(messageAndOffset.getRow().length)); | ||
|
||
fieldValueProviders.add(HBaseInternalFieldDescription.KEY_CORRUPT_FIELD.forBooleanValue(true)); | ||
fieldValueProviders.add(HBaseInternalFieldDescription.MESSAGE_CORRUPT_FIELD.forBooleanValue(true)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto?
public class HBaseSplit | ||
implements ConnectorSplit | ||
{ | ||
private final String connectorId; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use Optional
for optional fields.
return new HBaseRecordCursor(); | ||
} | ||
|
||
public class HBaseRecordCursor |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is also defined as toplevel class
We were previously relying on that for injection, which makes it hard to track down where the list of optimizers comes from when debugging.
It was incorrectly deriving "false" if the source expression produces no rows. A GROUP BY () always produces one group.
Note that in AstBuilder we transform it to "DOUBLE". This servers two purposes: 1) Correctness, otherwise system doesn't find type "DOUBLE PRECISION" 2) Avoid test error in cases of multiple space between DOUBLE and PRECISION tokens.
This downgrades Jetty version to 9.3.9.M1
Don't specify data types in result files. For Java tests that must be different for the two JDBC drivers, check which driver is in use and act accordingly.
Add object name to the exception message for easier debugging.
Multiple tests instantiate and populate the database using the same keyspace name. This results in a race condition when they run in parallel. This fix allocates a different keyspace for each suite of tests that can run concurrently.
@IsNull annotation will help the scalar functions determine if the input arguments are null without using the boxed type.
Tested manually.
Use same logic as for HashBuilderOperator
The updateTaskStatus method performs a callback while holding a lock. The lock on this method is not needed at all.
This allows for running additional product tests checking if new versions of docker images behave as expected.
After the following commit a few variables became unused, delete them: "Use SubqueryPlanner in RelationPlanner"
Without this change, aggregationBuilder was being set to null without calling close on it. This change ensures that aggregationBuilder is always properly closed AND that it won't be closed during iteration on it's outputIterator.
BinaryFileSpiller spills data to binary files without any compression.
4c3a50c
to
25583a9
Compare
what is the process of hbase connector? |
@linqerli I don't understand your question. I'm working on it that's all. |
@damiencarol are you still planning on publishing v2 of this? |
@damiencarol - lots of great work you started... what happened? |
@damiencarol stilling planning on supporting hbase? |
Any News?? |
2018.5 |
@damiencarol I am also working hard See: |
We are exploring ways of trying to use Presto to query a HBase table. Can I get an update on where we are w.r.t HBase connector for Presto and any references for the same ? |
@damiencarol pls dont leave us |
@damiencarol I coming from June 2023. |
This is a first version. (naive mapping Cells<->JSON message)
Most of stuff works (metadata, columns definitions, scan, ...)
More commit will comes.