Skip to content

Commit

Permalink
HBASE-28001: Add request attribute support to BufferedMutator (apache…
Browse files Browse the repository at this point in the history
…#6076)

Co-authored-by: Evie Boland <eboland@hubspot.com>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
  • Loading branch information
2 people authored and ndimiduk committed Sep 9, 2024
1 parent c033996 commit c17fef3
Show file tree
Hide file tree
Showing 16 changed files with 581 additions and 334 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -93,4 +94,11 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
default long getPeriodicalFlushTimeout(TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand All @@ -38,6 +39,16 @@ public interface AsyncBufferedMutatorBuilder {
*/
AsyncBufferedMutatorBuilder setRpcTimeout(long timeout, TimeUnit unit);

/**
* Set a rpc request attribute.
*/
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);

/**
* Set multiple rpc request attributes.
*/
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);

/**
* Set the base pause time for retrying. We use an exponential policy to generate sleep time when
* retrying.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -78,6 +79,20 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) {
tableBuilder.setRequestAttribute(key, value);
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
for (Map.Entry<String, byte[]> requestAttribute : requestAttributes.entrySet()) {
tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue());
}
return this;
}

@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -130,7 +131,7 @@ Stream.<CompletableFuture<Void>> generate(CompletableFuture::new).limit(mutation
periodicFlushTask = periodicalFlushTimer.newTimeout(timeout -> {
synchronized (AsyncBufferedMutatorImpl.this) {
// confirm that we are still valid, if there is already an internalFlush call before us,
// then we should not execute any more. And in internalFlush we will set periodicFlush
// then we should not execute anymore. And in internalFlush we will set periodicFlush
// to null, and since we may schedule a new one, so here we check whether the references
// are equal.
if (timeout == periodicFlushTask) {
Expand Down Expand Up @@ -170,4 +171,9 @@ public long getWriteBufferSize() {
public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return table.getRequestAttributes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@
import static org.apache.hadoop.hbase.client.ConnectionUtils.toCheckExistenceOnly;
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;

import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -117,7 +117,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* @return a map of request attributes supplied by the client
*/
default Map<String, byte[]> getRequestAttributes() {
throw new NotImplementedException("Add an implementation!");
return Collections.emptyMap();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,10 +129,10 @@ public AsyncTableBuilderBase<C> setStartLogErrorsCnt(int startLogErrorsCnt) {

@Override
public AsyncTableBuilder<C> setRequestAttribute(String key, byte[] value) {
if (this.requestAttributes.isEmpty()) {
this.requestAttributes = new HashMap<>();
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
}
this.requestAttributes.put(key, value);
requestAttributes.put(key, value);
return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -204,6 +206,13 @@ default void setOperationTimeout(int timeout) {
"The BufferedMutator::setOperationTimeout has not been implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}

/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -186,4 +187,9 @@ public void setRpcTimeout(int timeout) {
public void setOperationTimeout(int timeout) {
// no effect
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return mutator.getRequestAttributes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand All @@ -38,6 +41,7 @@ public class BufferedMutatorParams implements Cloneable {
private String implementationClassName = null;
private int rpcTimeout = UNSET;
private int operationTimeout = UNSET;
protected Map<String, byte[]> requestAttributes = Collections.emptyMap();
private BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException exception,
Expand Down Expand Up @@ -85,6 +89,18 @@ public int getOperationTimeout() {
return operationTimeout;
}

public BufferedMutatorParams setRequestAttribute(String key, byte[] value) {
if (requestAttributes.isEmpty()) {
requestAttributes = new HashMap<>();
}
requestAttributes.put(key, value);
return this;
}

public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

/**
* Override the write buffer size specified by the provided {@link Connection}'s
* {@link org.apache.hadoop.conf.Configuration} instance, via the configuration key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,10 @@ public BufferedMutator getBufferedMutator(BufferedMutatorParams params) throws I
if (params.getMaxKeyValueSize() != BufferedMutatorParams.UNSET) {
builder.setMaxKeyValueSize(params.getMaxKeyValueSize());
}
if (!params.getRequestAttributes().isEmpty()) {

builder.setRequestAttributes(params.getRequestAttributes());
}
return new BufferedMutatorOverAsyncBufferedMutator(builder.build(), params.getListener());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,8 @@ private <T> SingleRequestCallerBuilder<T> newCaller(byte[] row, int priority, lo
.operationTimeout(operationTimeoutNs, TimeUnit.NANOSECONDS)
.pause(pauseNs, TimeUnit.NANOSECONDS)
.pauseForServerOverloaded(pauseNsForServerOverloaded, TimeUnit.NANOSECONDS)
.maxAttempts(maxAttempts).setRequestAttributes(requestAttributes)
.startLogErrorsCnt(startLogErrorsCnt).setRequestAttributes(requestAttributes);
.maxAttempts(maxAttempts).startLogErrorsCnt(startLogErrorsCnt)
.setRequestAttributes(requestAttributes);
}

private <T, R extends OperationWithAttributes & Row> SingleRequestCallerBuilder<T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,6 +757,6 @@ default long getOperationTimeout(TimeUnit unit) {
* @return map of request attributes
*/
default Map<String, byte[]> getRequestAttributes() {
throw new NotImplementedException("Add an implementation!");
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,19 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.hbase.AuthUtil;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.BufferedMutatorParams;
Expand Down Expand Up @@ -67,12 +71,19 @@ public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener);

BufferedMutatorParams params = new BufferedMutatorParams(TABLE).listener(listener)
.setRequestAttribute("requestInfo", Bytes.toBytes("bar"));

//
// step 1: create a single Connection and a BufferedMutator, shared by all worker threads.
//
try (final Connection conn = ConnectionFactory.createConnection(getConf());
Map<String, byte[]> connectionAttributes = new HashMap<>();
connectionAttributes.put("clientId", Bytes.toBytes("foo"));
Configuration conf = getConf();
try (
final Connection conn = ConnectionFactory.createConnection(conf, null,
AuthUtil.loginClient(conf), connectionAttributes);
final BufferedMutator mutator = conn.getBufferedMutator(params)) {

/** worker pool that operates on BufferedTable instances */
Expand Down Expand Up @@ -104,6 +115,7 @@ public Void call() throws Exception {
f.get(5, TimeUnit.MINUTES);
}
workerPool.shutdown();
mutator.flush();
} catch (IOException e) {
// exception while creating/destroying Connection or BufferedMutator
LOG.info("exception while creating/destroying Connection or BufferedMutator", e);
Expand Down
Loading

0 comments on commit c17fef3

Please sign in to comment.