Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch 2.6 reverts #6344

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.Closeable;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
Expand Down Expand Up @@ -94,11 +93,4 @@ default CompletableFuture<Void> mutate(Mutation mutation) {
default long getPeriodicalFlushTimeout(TimeUnit unit) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import static org.apache.hadoop.hbase.client.ConnectionUtils.retries2Attempts;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -104,16 +103,6 @@ default AsyncBufferedMutatorBuilder setMaxRetries(int maxRetries) {
*/
AsyncBufferedMutatorBuilder setMaxKeyValueSize(int maxKeyValueSize);

/**
* Set a rpc request attribute.
*/
AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value);

/**
* Set multiple rpc request attributes.
*/
AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes);

/**
* Create the {@link AsyncBufferedMutator} instance.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.yetus.audience.InterfaceAudience;

Expand Down Expand Up @@ -79,20 +78,6 @@ public AsyncBufferedMutatorBuilder setStartLogErrorsCnt(int startLogErrorsCnt) {
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttribute(String key, byte[] value) {
tableBuilder.setRequestAttribute(key, value);
return this;
}

@Override
public AsyncBufferedMutatorBuilder setRequestAttributes(Map<String, byte[]> requestAttributes) {
for (Map.Entry<String, byte[]> requestAttribute : requestAttributes.entrySet()) {
tableBuilder.setRequestAttribute(requestAttribute.getKey(), requestAttribute.getValue());
}
return this;
}

@Override
public AsyncBufferedMutatorBuilder setWriteBufferSize(long writeBufferSize) {
Preconditions.checkArgument(writeBufferSize > 0, "writeBufferSize %d must be > 0",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -171,9 +170,4 @@ public long getWriteBufferSize() {
public long getPeriodicalFlushTimeout(TimeUnit unit) {
return unit.convert(periodicFlushTimeoutNs, TimeUnit.NANOSECONDS);
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return table.getRequestAttributes();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@
import static org.apache.hadoop.hbase.util.FutureUtils.allOf;

import com.google.protobuf.RpcChannel;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.apache.commons.lang3.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CompareOperator;
import org.apache.hadoop.hbase.TableName;
Expand Down Expand Up @@ -118,7 +117,7 @@ public interface AsyncTable<C extends ScanResultConsumerBase> {
* @return a map of request attributes supplied by the client
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
throw new NotImplementedException("Add an implementation!");
}

/**
Expand Down Expand Up @@ -677,47 +676,6 @@ interface CoprocessorCallback<R> {
void onError(Throwable error);
}

/**
* Some coprocessors may support the idea of "partial results." If for some reason a coprocessor
* cannot return all results for a given region in a single response, the client side can be
* designed to recognize this and continuing requesting more results until they are completely
* accumulated in the client.
* <p>
* It is up to a particular coprocessor implementation and its corresponding clients to agree on
* what it means for results to be incomplete, how this state is communicated, and how multiple
* incomplete results are accumulated together.
* <p>
* Use this callback when you want to execute a coprocessor call on a range of regions, and that
* coprocessor may return incomplete results for a given region. See also the docs for
* {@link CoprocessorCallback}, which all apply here to its child interface too.
*/
@InterfaceAudience.Public
interface PartialResultCoprocessorCallback<S, R> extends CoprocessorCallback<R> {
/**
* Subclasses should implement this to tell AsyncTable whether the given response is "final" or
* whether the AsyncTable should send another request to the coprocessor to fetch more results
* from the given region. This method of fetching more results can be used many times until
* there are no more results to fetch from the region.
* @param response The response received from the coprocessor
* @param region The region the response came from
* @return A ServiceCaller object if the response was not final and therefore another request is
* required to continuing fetching results. null if no more requests need to be sent to
* the region.
*/
ServiceCaller<S, R> getNextCallable(R response, RegionInfo region);

/**
* Subclasses should implement this such that, when the above method returns non-null, this
* method returns the duration that AsyncTable should wait before sending the next request to
* the given region. You can use this to create a back-off behavior to reduce load on the
* RegionServer. If that's not desired, you can always return {@link Duration.ZERO}.
* @param response The response received from the coprocessor
* @param region The region the response came from
* @return The duration to wait.
*/
Duration getWaitInterval(R response, RegionInfo region);
}

/**
* Helper class for sending coprocessorService request that executes a coprocessor call on regions
* which are covered by a range.
Expand Down Expand Up @@ -786,12 +744,4 @@ default CoprocessorServiceBuilder<S, R> toRow(byte[] endKey) {
*/
<S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, CoprocessorCallback<R> callback);

/**
* Similar to above. Use when your coprocessor client+endpoint supports partial results. If the
* server does not offer partial results, it is still safe to use this, assuming you implement
* your {@link PartialResultCoprocessorCallback#getNextCallable(Object, RegionInfo)} correctly.
*/
<S, R> CoprocessorServiceBuilder<S, R> coprocessorService(Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable, PartialResultCoprocessorCallback<S, R> callback);
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -297,69 +296,49 @@ public <S, R> CompletableFuture<R> coprocessorService(Function<RpcChannel, S> st
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
CoprocessorCallback<R> callback) {
return coprocessorService(stubMaker, callable,
new NoopPartialResultCoprocessorCallback<>(callback));
}

@Override
public <S, R> CoprocessorServiceBuilder<S, R> coprocessorService(
Function<RpcChannel, S> stubMaker, ServiceCaller<S, R> callable,
PartialResultCoprocessorCallback<S, R> callback) {
final Context context = Context.current();
PartialResultCoprocessorCallback<S, R> wrappedCallback =
new PartialResultCoprocessorCallback<S, R>() {

private final Phaser regionCompletesInProgress = new Phaser(1);

@Override
public void onRegionComplete(RegionInfo region, R resp) {
regionCompletesInProgress.register();
pool.execute(context.wrap(() -> {
try {
callback.onRegionComplete(region, resp);
} finally {
regionCompletesInProgress.arriveAndDeregister();
}
}));
}
CoprocessorCallback<R> wrappedCallback = new CoprocessorCallback<R>() {

@Override
public void onRegionError(RegionInfo region, Throwable error) {
regionCompletesInProgress.register();
pool.execute(context.wrap(() -> {
try {
callback.onRegionError(region, error);
} finally {
regionCompletesInProgress.arriveAndDeregister();
}
}));
}

@Override
public void onComplete() {
pool.execute(context.wrap(() -> {
// Guarantee that onComplete() is called after all onRegionComplete()'s are called
regionCompletesInProgress.arriveAndAwaitAdvance();
callback.onComplete();
}));
}
private final Phaser regionCompletesInProgress = new Phaser(1);

@Override
public void onError(Throwable error) {
pool.execute(context.wrap(() -> callback.onError(error)));
}
@Override
public void onRegionComplete(RegionInfo region, R resp) {
regionCompletesInProgress.register();
pool.execute(context.wrap(() -> {
try {
callback.onRegionComplete(region, resp);
} finally {
regionCompletesInProgress.arriveAndDeregister();
}
}));
}

@Override
public ServiceCaller<S, R> getNextCallable(R response, RegionInfo region) {
return callback.getNextCallable(response, region);
}
@Override
public void onRegionError(RegionInfo region, Throwable error) {
regionCompletesInProgress.register();
pool.execute(context.wrap(() -> {
try {
callback.onRegionError(region, error);
} finally {
regionCompletesInProgress.arriveAndDeregister();
}
}));
}

@Override
public Duration getWaitInterval(R response, RegionInfo region) {
return callback.getWaitInterval(response, region);
}
@Override
public void onComplete() {
pool.execute(context.wrap(() -> {
// Guarantee that onComplete() is called after all onRegionComplete()'s are called
regionCompletesInProgress.arriveAndAwaitAdvance();
callback.onComplete();
}));
}

};
@Override
public void onError(Throwable error) {
pool.execute(context.wrap(() -> callback.onError(error)));
}
};
CoprocessorServiceBuilder<S, R> builder =
rawTable.coprocessorService(stubMaker, callable, wrappedCallback);
return new CoprocessorServiceBuilder<S, R>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,7 @@

import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
Expand Down Expand Up @@ -196,13 +194,6 @@ default void setOperationTimeout(int timeout) {
"The BufferedMutator::setOperationTimeout has not been implemented");
}

/**
* Returns the rpc request attributes.
*/
default Map<String, byte[]> getRequestAttributes() {
return Collections.emptyMap();
}

/**
* Listens for asynchronous exceptions on a {@link BufferedMutator}.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.Timer;
import java.util.TimerTask;
Expand Down Expand Up @@ -90,7 +89,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
private final ExecutorService pool;
private final AtomicInteger rpcTimeout;
private final AtomicInteger operationTimeout;
private final Map<String, byte[]> requestAttributes;
private final boolean cleanupPoolOnClose;
private volatile boolean closed = false;
private final AsyncProcess ap;
Expand Down Expand Up @@ -137,9 +135,6 @@ public class BufferedMutatorImpl implements BufferedMutator {
this.operationTimeout = new AtomicInteger(params.getOperationTimeout() != UNSET
? params.getOperationTimeout()
: conn.getConnectionConfiguration().getOperationTimeout());

this.requestAttributes = params.getRequestAttributes();

this.ap = ap;
}

Expand Down Expand Up @@ -257,8 +252,7 @@ public synchronized void close() throws IOException {

private AsyncProcessTask createTask(QueueRowAccess access) {
return new AsyncProcessTask(AsyncProcessTask.newBuilder().setPool(pool).setTableName(tableName)
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE)
.setRequestAttributes(requestAttributes).build()) {
.setRowAccess(access).setSubmittedRows(AsyncProcessTask.SubmittedRows.AT_LEAST_ONE).build()) {
@Override
public int getRpcTimeout() {
return rpcTimeout.get();
Expand Down Expand Up @@ -397,11 +391,6 @@ public void setOperationTimeout(int operationTimeout) {
this.operationTimeout.set(operationTimeout);
}

@Override
public Map<String, byte[]> getRequestAttributes() {
return requestAttributes;
}

long getCurrentWriteBufferSize() {
return currentWriteBufferSize.get();
}
Expand Down
Loading