Skip to content

Commit

Permalink
ensure connection attributes are not corrupted by released byte buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
Ray Mattingly committed Aug 22, 2023
1 parent dae078e commit a99d6b9
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.ipc;

import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -27,7 +28,6 @@
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
import org.apache.hbase.thirdparty.com.google.protobuf.Message;

import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;

/**
Expand Down Expand Up @@ -83,7 +83,7 @@ public interface RpcCall extends RpcCallContext {
/** Returns The request header of this call. */
RequestHeader getHeader();

ConnectionHeader getConnectionHeader();
Map<String, byte[]> getConnectionAttributes();

/** Returns Port of remote address in this call */
int getRemotePort();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.CellScanner;
Expand All @@ -49,7 +50,6 @@

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.CellBlockMeta;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ExceptionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.RequestHeader;
Expand Down Expand Up @@ -209,8 +209,8 @@ public RequestHeader getHeader() {
}

@Override
public RPCProtos.ConnectionHeader getConnectionHeader() {
return this.connection.connectionHeader;
public Map<String, byte[]> getConnectionAttributes() {
return this.connection.connectionAttributes;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.security.GeneralSecurityException;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.crypto.cipher.CryptoCipherFactory;
import org.apache.commons.crypto.random.CryptoRandom;
import org.apache.commons.crypto.random.CryptoRandomFactory;
Expand Down Expand Up @@ -75,6 +77,7 @@
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.VersionInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.RPCProtos.ConnectionHeader;
Expand Down Expand Up @@ -103,6 +106,7 @@ abstract class ServerRpcConnection implements Closeable {
protected int remotePort;
protected InetAddress addr;
protected ConnectionHeader connectionHeader;
protected Map<String, byte[]> connectionAttributes;

/**
* Codec the client asked use.
Expand Down Expand Up @@ -405,6 +409,9 @@ private CodedInputStream createCis(ByteBuff buf) {
// Reads the connection header following version
private void processConnectionHeader(ByteBuff buf) throws IOException {
this.connectionHeader = ConnectionHeader.parseFrom(createCis(buf));
this.connectionAttributes = connectionHeader.getAttributeList().stream()
.collect(Collectors.toMap(HBaseProtos.NameBytesPair::getName,
nameBytesPair -> nameBytesPair.getValue().toByteArray()));
String serviceName = connectionHeader.getServiceName();
if (serviceName == null) {
throw new EmptyServiceNameException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.io.ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
Expand All @@ -27,6 +28,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -84,6 +86,8 @@ public class TestRequestAndConnectionAttributes {
@BeforeClass
public static void setUp() throws Exception {
TEST_UTIL = new HBaseTestingUtil();
Configuration conf = TEST_UTIL.getConfiguration();
conf.setBoolean(ALLOCATOR_POOL_ENABLED_KEY, true);
TEST_UTIL.startMiniCluster(1);
TEST_UTIL.createTable(REQUEST_ATTRIBUTES_TEST_TABLE,
new byte[][] { REQUEST_ATTRIBUTES_TEST_TABLE_CF }, 1, HConstants.DEFAULT_BLOCKSIZE,
Expand All @@ -101,15 +105,19 @@ public void setup() {
}

@Test
public void testConnectionAttributes() throws IOException {
public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException {
TableName tableName = TableName.valueOf("testConnectionAttributes");
TEST_UTIL.createTable(tableName, new byte[][] { Bytes.toBytes("0") }, 1,
HConstants.DEFAULT_BLOCKSIZE, AttributesCoprocessor.class.getName());
byte[] cf = Bytes.toBytes("0");
TEST_UTIL.createTable(tableName, new byte[][] { cf }, 1, HConstants.DEFAULT_BLOCKSIZE,
AttributesCoprocessor.class.getName());

Configuration conf = TEST_UTIL.getConfiguration();
try (Connection conn = ConnectionFactory.createConnection(conf, null,
AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(tableName)) {

ensureBuffersAreOverwritten(table, cf);
Result result = table.get(new Get(Bytes.toBytes(0)));

assertEquals(CONNECTION_ATTRIBUTES.size(), result.size());
for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) {
byte[] val = result.getValue(Bytes.toBytes("c"), Bytes.toBytes(attr.getKey()));
Expand All @@ -118,6 +126,15 @@ public void testConnectionAttributes() throws IOException {
}
}

private void ensureBuffersAreOverwritten(Table table, byte[] cf) throws IOException {
// this will cause unread connection attributes on the
Put put = new Put(Bytes.toBytes(UUID.randomUUID().toString()));
byte[] bytes = new byte[100];
new Random().nextBytes(bytes);
put.addColumn(cf, bytes, bytes);
table.put(put);
}

@Test
public void testRequestAttributesGet() throws IOException {
addRandomRequestAttributes();
Expand Down Expand Up @@ -275,10 +292,10 @@ public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
.setFamily(Bytes.toBytes("r")).setQualifier(Bytes.toBytes(attr.getName()))
.setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build());
}
for (HBaseProtos.NameBytesPair attr : rpcCall.getConnectionHeader().getAttributeList()) {
for (Map.Entry<String, byte[]> attr : rpcCall.getConnectionAttributes().entrySet()) {
result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
.setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getName()))
.setValue(attr.getValue().toByteArray()).setType(Cell.Type.Put).setTimestamp(1).build());
.setFamily(Bytes.toBytes("c")).setQualifier(Bytes.toBytes(attr.getKey()))
.setValue(attr.getValue()).setType(Cell.Type.Put).setTimestamp(1).build());
}
result.sort(CellComparator.getInstance());
c.bypass();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.security.PrivilegedExceptionAction;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -695,7 +696,7 @@ public RPCProtos.RequestHeader getHeader() {
}

@Override
public RPCProtos.ConnectionHeader getConnectionHeader() {
public Map<String, byte[]> getConnectionAttributes() {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.io.IOException;
import java.net.InetAddress;
import java.util.HashSet;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.hadoop.hbase.CellScanner;
Expand Down Expand Up @@ -222,7 +223,7 @@ public RPCProtos.RequestHeader getHeader() {
}

@Override
public RPCProtos.ConnectionHeader getConnectionHeader() {
public Map<String, byte[]> getConnectionAttributes() {
return null;
}

Expand Down

0 comments on commit a99d6b9

Please sign in to comment.