Skip to content

Commit

Permalink
HBASE-28646 Use Streams to unmarshall protobuf REST data (#5974)
Browse files Browse the repository at this point in the history
Signed-off-by: Duo Zhang <zhangduo@apache.org>
(cherry picked from commit 91b3512)
  • Loading branch information
stoty committed Jun 11, 2024
1 parent cb6cdbd commit 7f60bd4
Show file tree
Hide file tree
Showing 14 changed files with 84 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
package org.apache.hadoop.hbase.rest;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

Expand All @@ -47,7 +49,7 @@ default void writeProtobufOutput(OutputStream os) throws IOException {
}

/**
* Returns the protobuf represention of the model in a byte array Use
* Returns the protobuf represention of the model in a byte array. Use
* {@link org.apache.hadoop.hbase.rest.ProtobufMessageHandler#writeProtobufOutput(OutputStream)}
* for better performance
* @return the protobuf encoded object in a byte array
Expand All @@ -63,15 +65,28 @@ default byte[] createProtobufOutput() {
Message messageFromObject();

/**
* Initialize the model from a protobuf representation.
* Initialize the model from a protobuf representation. Use
* {@link org.apache.hadoop.hbase.rest.ProtobufMessageHandler#getObjectFromMessage(InputStream)}
* for better performance
* @param message the raw bytes of the protobuf message
* @return reference to self for convenience
*/
// TODO implement proper stream handling for unmarshalling.
// Using byte array here lets us use ProtobufUtil.mergeFrom in the implementations to
// avoid the CodedOutputStream size limitation, but is slow
// and memory intensive. We could use the ProtobufUtil.mergeFrom() variant that takes
// an inputStream and sets the size limit to maxInt.
// This would help both on the client side, and when processing large Puts on the server.
ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException;
default ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
final CodedInputStream codedInput = CodedInputStream.newInstance(message);
codedInput.setSizeLimit(message.length);
return getObjectFromMessage(codedInput);
}

/**
* Initialize the model from a protobuf representation.
* @param is InputStream providing the protobuf message
* @return reference to self for convenience
*/
default ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
final CodedInputStream codedInput = CodedInputStream.newInstance(is);
codedInput.setSizeLimit(Integer.MAX_VALUE);
return getObjectFromMessage(codedInput);
}

ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,16 @@
*/
package org.apache.hadoop.hbase.rest;

import java.io.IOException;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.rest.model.CellModel;
import org.apache.hadoop.hbase.rest.model.RowModel;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

@InterfaceAudience.Private
public final class RestUtil {

Expand All @@ -45,4 +49,17 @@ public static RowModel createRowModelFromResult(Result r) {
}
return rowModel;
}

/**
* Merges the object from codedInput, then calls checkLastTagWas. This is based on
* ProtobufUtil.mergeFrom, but we have already taken care of setSizeLimit() before calling, so
* only the checkLastTagWas() call is retained.
* @param builder protobuf object builder
* @param codedInput encoded object data
*/
public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput)
throws IOException {
builder.mergeFrom(codedInput);
codedInput.checkLastTagWas(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,13 @@
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.CellMessage.Cell;

/**
Expand Down Expand Up @@ -218,9 +219,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
Cell.Builder builder = Cell.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
setColumn(builder.getColumn().toByteArray());
setValue(builder.getData().toByteArray());
if (builder.hasTimestamp()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.CellMessage.Cell;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.CellSetMessage.CellSet;

Expand Down Expand Up @@ -139,9 +140,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
CellSet.Builder builder = CellSet.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
for (CellSet.Row row : builder.getRowsList()) {
RowModel rowModel = new RowModel(row.getKey().toByteArray());
for (Cell cell : row.getValuesList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.NamespacePropertiesMessage.NamespaceProperties;
Expand Down Expand Up @@ -157,9 +159,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
NamespaceProperties.Builder builder = NamespaceProperties.newBuilder();
builder.mergeFrom(message);
RestUtil.mergeFrom(builder, cis);
List<NamespaceProperties.Property> properties = builder.getPropsList();
for (NamespaceProperties.Property property : properties) {
addProperty(property.getKey(), property.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,10 @@
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.NamespacesMessage.Namespaces;
Expand Down Expand Up @@ -104,9 +106,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
Namespaces.Builder builder = Namespaces.newBuilder();
builder.mergeFrom(message);
RestUtil.mergeFrom(builder, cis);
namespaces = builder.getNamespaceList();
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

/**
Expand Down Expand Up @@ -187,7 +188,7 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream is) throws IOException {
// there is no standalone row protobuf message
throw new UnsupportedOperationException("no protobuf equivalent to RowModel");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,17 +67,18 @@
import org.apache.hadoop.hbase.filter.ValueFilter;
import org.apache.hadoop.hbase.filter.WhileMatchFilter;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.hadoop.hbase.security.visibility.Authorizations;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.fasterxml.jackson.jaxrs.json.JacksonJaxbJsonProvider;
import org.apache.hbase.thirdparty.com.google.protobuf.ByteString;
import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.javax.ws.rs.core.MediaType;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.ScannerMessage.Scanner;

/**
Expand Down Expand Up @@ -925,9 +926,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
Scanner.Builder builder = Scanner.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
if (builder.hasStartRow()) {
startRow = builder.getStartRow().toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@
import javax.xml.bind.annotation.XmlElementWrapper;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.StorageClusterStatusMessage.StorageClusterStatus;

/**
Expand Down Expand Up @@ -713,9 +714,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
StorageClusterStatus.Builder builder = StorageClusterStatus.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
if (builder.hasRegions()) {
regions = builder.getRegions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@
import javax.xml.bind.annotation.XmlElement;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.TableInfoMessage.TableInfo;

/**
Expand Down Expand Up @@ -140,9 +141,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
TableInfo.Builder builder = TableInfo.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
setName(builder.getName());
for (TableInfo.Region region : builder.getRegionsList()) {
add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,12 @@
import javax.xml.bind.annotation.XmlElementRef;
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.TableListMessage.TableList;

/**
Expand Down Expand Up @@ -101,9 +102,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
TableList.Builder builder = TableList.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
for (String table : builder.getNameList()) {
this.add(new TableModel(table));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,13 @@
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.ColumnSchemaMessage.ColumnSchema;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.TableSchemaMessage.TableSchema;

Expand Down Expand Up @@ -287,9 +288,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
TableSchema.Builder builder = TableSchema.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
this.setName(builder.getName());
for (TableSchema.Attribute attr : builder.getAttrsList()) {
this.addAttribute(attr.getName(), attr.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,13 @@
import javax.xml.bind.annotation.XmlRootElement;
import org.apache.hadoop.hbase.rest.ProtobufMessageHandler;
import org.apache.hadoop.hbase.rest.RESTServlet;
import org.apache.hadoop.hbase.rest.RestUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedInputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;
import org.apache.hbase.thirdparty.org.glassfish.jersey.servlet.ServletContainer;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.VersionMessage.Version;

/**
Expand Down Expand Up @@ -174,9 +175,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException {
Version.Builder builder = Version.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
RestUtil.mergeFrom(builder, cis);
if (builder.hasRestVersion()) {
restVersion = builder.getRestVersion();
}
Expand Down
Loading

0 comments on commit 7f60bd4

Please sign in to comment.