Skip to content

Commit

Permalink
HBASE-28540 Cache Results in org.apache.hadoop.hbase.rest.client.Remo…
Browse files Browse the repository at this point in the history
…teHTable.Scanner (apache#5846)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
  • Loading branch information
stoty committed Jun 10, 2024
1 parent 092ce0d commit 6a6343d
Show file tree
Hide file tree
Showing 13 changed files with 44 additions and 45 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@
*/
package org.apache.hadoop.hbase.rest;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import org.apache.yetus.audience.InterfaceAudience;

import com.google.protobuf.CodedInputStream;

import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

Expand Down Expand Up @@ -67,11 +71,9 @@ default byte[] createProtobufOutput() {
* @param message the raw bytes of the protobuf message
* @return reference to self for convenience
*/
// TODO implement proper stream handling for unmarshalling.
// Using byte array here lets us use ProtobufUtil.mergeFrom in the implementations to
// avoid the CodedOutputStream size limitation, but is slow
// and memory intensive. We could use the ProtobufUtil.mergeFrom() variant that takes
// an inputStream and sets the size limit to maxInt.
// This would help both on the client side, and when processing large Puts on the server.
ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException;
// default ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
// return getObjectFromMessage(new ByteArrayInputStream(message));
// }

ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import javax.xml.bind.annotation.XmlAccessType;
import javax.xml.bind.annotation.XmlAccessorType;
Expand Down Expand Up @@ -218,9 +219,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
Cell.Builder builder = Cell.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
setColumn(builder.getColumn().toByteArray());
setValue(builder.getData().toByteArray());
if (builder.hasTimestamp()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.hadoop.hbase.rest.model.CellModel.MAGIC_LENGTH;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -139,9 +140,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
CellSet.Builder builder = CellSet.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
for (CellSet.Row row : builder.getRowsList()) {
RowModel rowModel = new RowModel(row.getKey().toByteArray());
for (Cell cell : row.getValuesList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.rest.model;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.HashMap;
import java.util.List;
Expand All @@ -32,7 +33,7 @@
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.NamespacePropertiesMessage.NamespaceProperties;

/**
Expand Down Expand Up @@ -157,9 +158,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
NamespaceProperties.Builder builder = NamespaceProperties.newBuilder();
builder.mergeFrom(message);
ProtobufUtil.mergeFrom(builder, is);
List<NamespaceProperties.Property> properties = builder.getPropsList();
for (NamespaceProperties.Property property : properties) {
addProperty(property.getKey(), property.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -32,7 +33,7 @@
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.rest.protobuf.generated.NamespacesMessage.Namespaces;

/**
Expand Down Expand Up @@ -104,9 +105,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
Namespaces.Builder builder = Namespaces.newBuilder();
builder.mergeFrom(message);
ProtobufUtil.mergeFrom(builder, is);
namespaces = builder.getNamespaceList();
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -187,7 +188,7 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
// there is no standalone row protobuf message
throw new UnsupportedOperationException("no protobuf equivalent to RowModel");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.annotation.JsonInclude;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -925,9 +926,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
Scanner.Builder builder = Scanner.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
if (builder.hasStartRow()) {
startRow = builder.getStartRow().toByteArray();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -713,9 +714,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
StorageClusterStatus.Builder builder = StorageClusterStatus.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
if (builder.hasRegions()) {
regions = builder.getRegions();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.rest.model;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -140,9 +141,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
TableInfo.Builder builder = TableInfo.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
setName(builder.getName());
for (TableInfo.Region region : builder.getRegionsList()) {
add(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.rest.model;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
Expand Down Expand Up @@ -101,9 +102,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
TableList.Builder builder = TableList.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
for (String table : builder.getNameList()) {
this.add(new TableModel(table));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.fasterxml.jackson.annotation.JsonAnySetter;
import com.fasterxml.jackson.annotation.JsonIgnore;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
Expand Down Expand Up @@ -287,9 +288,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
TableSchema.Builder builder = TableSchema.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
this.setName(builder.getName());
for (TableSchema.Attribute attr : builder.getAttrsList()) {
this.addAttribute(attr.getName(), attr.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.rest.model;

import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import javax.servlet.ServletContext;
import javax.xml.bind.annotation.XmlAttribute;
Expand Down Expand Up @@ -174,9 +175,9 @@ public Message messageFromObject() {
}

@Override
public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException {
public ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException {
Version.Builder builder = Version.newBuilder();
ProtobufUtil.mergeFrom(builder, message);
ProtobufUtil.mergeFrom(builder, is);
if (builder.hasRestVersion()) {
restVersion = builder.getRestVersion();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.rest.provider.consumer;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
Expand Down Expand Up @@ -59,23 +58,10 @@ public ProtobufMessageHandler readFrom(Class<ProtobufMessageHandler> type, Type
ProtobufMessageHandler obj = null;
try {
obj = type.getDeclaredConstructor().newInstance();
ByteArrayOutputStream baos = new ByteArrayOutputStream();
byte[] buffer = new byte[4096];
int read;
do {
read = inputStream.read(buffer, 0, buffer.length);
if (read > 0) {
baos.write(buffer, 0, read);
}
} while (read > 0);
if (LOG.isTraceEnabled()) {
LOG.trace(getClass() + ": read " + baos.size() + " bytes from " + inputStream);
}
obj = obj.getObjectFromMessage(baos.toByteArray());
return obj.getObjectFromMessage(inputStream);
} catch (InstantiationException | NoSuchMethodException | InvocationTargetException
| IllegalAccessException e) {
throw new WebApplicationException(e);
}
return obj;
}
}

0 comments on commit 6a6343d

Please sign in to comment.