Skip to content

Commit

Permalink
HBASE-28001: Add request attribute support to BufferedMutator (#6076)
Browse files Browse the repository at this point in the history
Co-authored-by: Evie Boland <eboland@hubspot.com>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
eab148 and Evie Boland authored Sep 17, 2024
1 parent d355c95 commit 192f640
Show file tree
Hide file tree
Showing 12 changed files with 587 additions and 349 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -93,4 +94,11 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
default long getPeriodicalFlushTimeout(TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -103,6 +104,16 @@ default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
*/
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);

/**
* Set a rpc request attribute.
*/
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);

/**
* Set multiple rpc request attributes.
*/
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);

/**
* Create the {@link AsyncBufferedMutator} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -78,6 +79,20 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) {
tableBuilder.setRequestAttribute(key, value);
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
for (Map.Entry<String, byte[]> requestAttribute : requestAttributes.entrySet()) {
tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue());
}
return this;
}

@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -170,4 +171,9 @@ public long getWriteBufferSize() {
public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return table.getRequestAttributes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;

import com.google.protobuf.RpcChannel;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -117,7 +117,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* @return a map of request attributes supplied by the client
*/
default Map<String, byte[]> getRequestAttributes() {
throw new NotImplementedException("Add an implementation!");
return Collections.emptyMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -194,6 +196,13 @@ default void setOperationTimeout(int timeout) {
"The BufferedMutator::setOperationTimeout has not been implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}

/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -89,6 +90,7 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final ExecutorService pool;
private final AtomicInteger rpcTimeout;
private final AtomicInteger operationTimeout;
private final Map<String, byte[]> requestAttributes;
private final boolean cleanupPoolOnClose;
private volatile boolean closed = false;
private final AsyncProcess ap;
Expand Down Expand Up @@ -135,6 +137,9 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.operationTimeout = new AtomicInteger(params.getOperationTimeout() != UNSET
? params.getOperationTimeout()
: conn.getConnectionConfiguration().getOperationTimeout());

this.requestAttributes = params.getRequestAttributes();

this.ap = ap;
}

Expand Down Expand Up @@ -252,7 +257,8 @@ public synchronized void close() throws IOException {

private AsyncProcessTask createTask(QueueRowAccess access) {
return new AsyncProcessTask(AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE).build()) {
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
.setRequestAttributes(requestAttributes).build()) {
@Override
public int getRpcTimeout() {
return rpcTimeout.get();
Expand Down Expand Up @@ -391,6 +397,11 @@ public void setOperationTimeout(int operationTimeout) {
this.operationTimeout.set(operationTimeout);
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

long getCurrentWriteBufferSize() {
return currentWriteBufferSize.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -38,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
private String implementationClassName = null;
private int rpcTimeout = UNSET;
private int operationTimeout = UNSET;
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
Expand Down Expand Up @@ -85,6 +89,18 @@ public int getOperationTimeout() {
return operationTimeout;
}

public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
}
requestAttributes.put(key, value);
return this;
}

public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

/**
* Override the write buffer size specified by the provided {@link Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
Expand Down Expand Up @@ -67,12 +71,18 @@ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener);
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener)
.setRequestAttribute("requestInfo", Bytes.toBytes("bar"));

//
// step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
//
try (final Connection conn = ConnectionFactory.createConnection(getConf());
Map<String, byte[]> connectionAttributes = new HashMap<>();
connectionAttributes.put("clientId", Bytes.toBytes("foo"));
Configuration conf = getConf();
try (
final Connection conn = ConnectionFactory.createConnection(conf, null,
AuthUtil.loginClient(conf), connectionAttributes);
final BufferedMutator mutator = conn.getBufferedMutator(params)) {

/** worker pool that operates on BufferedTable instances */
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparator;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.RegionObserver;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({ ClientTests.class, MediumTests.class })
public class TestConnectionAttributes {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestConnectionAttributes.class);

private static final Map<String, byte[]> CONNECTION_ATTRIBUTES = new HashMap<>();
static {
CONNECTION_ATTRIBUTES.put("clientId", Bytes.toBytes("foo"));
}
private static final byte[] FAMILY = Bytes.toBytes("0");
private static final TableName TABLE_NAME = TableName.valueOf("testConnectionAttributes");

private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private static MiniHBaseCluster cluster;

@BeforeClass
public static void setUp() throws Exception {
cluster = TEST_UTIL.startMiniCluster(1);
Table table = TEST_UTIL.createTable(TABLE_NAME, new byte[][] { FAMILY }, 1,
HConstants.DEFAULT_BLOCKSIZE, TestConnectionAttributes.AttributesCoprocessor.class.getName());
table.close();
}

@AfterClass
public static void afterClass() throws Exception {
cluster.close();
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testConnectionHeaderOverwrittenAttributesRemain() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
try (Connection conn = ConnectionFactory.createConnection(conf, null,
AuthUtil.loginClient(conf), CONNECTION_ATTRIBUTES); Table table = conn.getTable(TABLE_NAME)) {

// submit a 300 byte rowkey here to encourage netty's allocator to overwrite the connection
// header
byte[] bytes = new byte[300];
new Random().nextBytes(bytes);
Result result = table.get(new Get(bytes));

assertEquals(CONNECTION_ATTRIBUTES.size(), result.size());
for (Map.Entry<String, byte[]> attr : CONNECTION_ATTRIBUTES.entrySet()) {
byte[] val = result.getValue(FAMILY, Bytes.toBytes(attr.getKey()));
assertEquals(Bytes.toStringBinary(attr.getValue()), Bytes.toStringBinary(val));
}
}
}

public static class AttributesCoprocessor implements RegionObserver, RegionCoprocessor {

@Override
public Optional<RegionObserver> getRegionObserver() {
return Optional.of(this);
}

@Override
public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> c, Get get,
List<Cell> result) throws IOException {
RpcCall rpcCall = RpcServer.getCurrentCall().get();
for (Map.Entry<String, byte[]> attr : rpcCall.getConnectionAttributes().entrySet()) {
result.add(c.getEnvironment().getCellBuilder().clear().setRow(get.getRow())
.setFamily(FAMILY).setQualifier(Bytes.toBytes(attr.getKey())).setValue(attr.getValue())
.setType(Cell.Type.Put).setTimestamp(1).build());
}
result.sort(CellComparator.getInstance());
c.bypass();
}
}
}
Loading

0 comments on commit 192f640

Please sign in to comment.