Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: track bulk get operation completions explicitly #161

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,13 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicReferenceArray;

import com.netflix.evcache.EVCacheGetOperationListener;
import com.netflix.evcache.util.Pair;
import net.spy.memcached.internal.BulkGetCompletionListener;
import net.spy.memcached.internal.CheckedOperationTimeoutException;
import net.spy.memcached.ops.GetOperation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -51,6 +54,7 @@ public class EVCacheBulkGetFuture<T> extends BulkGetFuture<T> {
private final CountDownLatch latch;
private final long start;
private final EVCacheClient client;
private AtomicReferenceArray<SingleOperationState> operationStates;

public EVCacheBulkGetFuture(Map<String, Future<T>> m, Collection<Operation> getOps, CountDownLatch l, ExecutorService service, EVCacheClient client) {
super(m, getOps, l, service);
Expand All @@ -59,19 +63,21 @@ public EVCacheBulkGetFuture(Map<String, Future<T>> m, Collection<Operation> getO
latch = l;
this.start = System.currentTimeMillis();
this.client = client;
this.operationStates = null;
}

public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, boolean hasZF)
throws InterruptedException, ExecutionException {
boolean status = latch.await(to, unit);
assert operationStates != null;

boolean allCompleted = latch.await(to, unit);
if(log.isDebugEnabled()) log.debug("Took " + (System.currentTimeMillis() - start)+ " to fetch " + rvMap.size() + " keys from " + client);
long pauseDuration = -1;
List<Tag> tagList = null;
Collection<Operation> timedoutOps = null;
String statusString = EVCacheMetricsFactory.SUCCESS;

try {
if (!status) {
if (!allCompleted) {
boolean gcPause = false;
tagList = new ArrayList<Tag>(7);
tagList.addAll(client.getTagList());
Expand All @@ -98,10 +104,10 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
}
// redo the same op once more since there was a chance of gc pause
if (gcPause) {
status = latch.await(to, unit);
allCompleted = latch.await(to, unit);
tagList.add(new BasicTag(EVCacheMetricsFactory.PAUSE_REASON, EVCacheMetricsFactory.GC));
if (log.isDebugEnabled()) log.debug("Retry status : " + status);
if (status) {
if (log.isDebugEnabled()) log.debug("Retry status : " + allCompleted);
if (allCompleted) {
tagList.add(new BasicTag(EVCacheMetricsFactory.FETCH_AFTER_PAUSE, EVCacheMetricsFactory.YES));
} else {
tagList.add(new BasicTag(EVCacheMetricsFactory.FETCH_AFTER_PAUSE, EVCacheMetricsFactory.NO));
Expand All @@ -113,29 +119,28 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
if (log.isDebugEnabled()) log.debug("Total duration due to gc event = " + (System.currentTimeMillis() - start) + " msec.");
}

for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
if (!status) {
MemcachedConnection.opTimedOut(op);
if(timedoutOps == null) timedoutOps = new HashSet<Operation>();
timedoutOps.add(op);
} else {
MemcachedConnection.opSucceeded(op);
}
boolean hadTimedoutOp = false;
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed && !allCompleted) {
MemcachedConnection.opTimedOut(state.op);
hadTimedoutOp = true;
} else {
MemcachedConnection.opSucceeded(op);
MemcachedConnection.opSucceeded(state.op);
}
}

if (!status && !hasZF && (timedoutOps != null && timedoutOps.size() > 0)) statusString = EVCacheMetricsFactory.TIMEOUT;
if (!allCompleted && !hasZF && hadTimedoutOp) statusString = EVCacheMetricsFactory.TIMEOUT;

for (Operation op : ops) {
if(op.isCancelled()) {
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (state.cancelled) {
if (hasZF) statusString = EVCacheMetricsFactory.CANCELLED;
if (throwException) throw new ExecutionException(new CancellationException("Cancelled"));
}
if (op.hasErrored() && throwException) throw new ExecutionException(op.getException());
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
}

Map<String, T> m = new HashMap<String, T>();
for (Map.Entry<String, Future<T>> me : rvMap.entrySet()) {
m.put(me.getKey(), me.getValue().get());
Expand All @@ -151,6 +156,28 @@ public Map<String, T> getSome(long to, TimeUnit unit, boolean throwException, bo
}
}

public void setExpectedCount(int size) {
assert operationStates == null;

operationStates = new AtomicReferenceArray<>(size);
}

// a lot of hoops we go through to avoid hitting the lock
static class SingleOperationState {
final Operation op;
final boolean completed;
final boolean cancelled;
final boolean errored;
final boolean timedOut;
public SingleOperationState(Operation op) {
this.op = op;
this.completed = op.getState() == OperationState.COMPLETE;
this.cancelled = op.isCancelled();
this.errored = op.hasErrored();
this.timedOut = op.isTimedOut();
}
}

public CompletableFuture<Map<String, T>> getSomeCompletableFuture(long to, TimeUnit unit, boolean throwException, boolean hasZF) {
CompletableFuture<Map<String, T>> completableFuture = new CompletableFuture<>();
try {
Expand Down Expand Up @@ -192,25 +219,26 @@ public CompletableFuture<Map<String, T>> getAsyncSome(long timeout, TimeUnit uni

public void handleBulkException() {
ExecutionException t = null;
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
if (op.isCancelled()) {
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed) {
if (state.cancelled) {
throw new RuntimeException(new ExecutionException(new CancellationException("Cancelled")));
}
else if (op.hasErrored()) {
throw new RuntimeException(new ExecutionException(op.getException()));
}
else {
op.timeOut();
MemcachedConnection.opTimedOut(op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", op));
} else if (state.errored) {
throw new RuntimeException(new ExecutionException(state.op.getException()));
} else {
state.op.timeOut();
MemcachedConnection.opTimedOut(state.op);
t = new ExecutionException(new CheckedOperationTimeoutException("Checked Operation timed out.", state.op));
}
} else {
MemcachedConnection.opSucceeded(op);
MemcachedConnection.opSucceeded(state.op);
}
}

throw new RuntimeException(t);
}

public void doAsyncGetSome(CompletableFuture<Map<String, T>> promise) {
this.addListener(future -> {
try {
Expand All @@ -229,26 +257,26 @@ public void doAsyncGetSome(CompletableFuture<Map<String, T>> promise) {
public Single<Map<String, T>> getSome(long to, TimeUnit units, boolean throwException, boolean hasZF, Scheduler scheduler) {
return observe().timeout(to, units, Single.create(subscriber -> {
try {
final Collection<Operation> timedoutOps = new HashSet<Operation>();
for (Operation op : ops) {
if (op.getState() != OperationState.COMPLETE) {
MemcachedConnection.opTimedOut(op);
timedoutOps.add(op);
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (!state.completed) {
MemcachedConnection.opTimedOut(state.op);
} else {
MemcachedConnection.opSucceeded(op);
MemcachedConnection.opSucceeded(state.op);
}
}

//if (!hasZF && timedoutOps.size() > 0) EVCacheMetricsFactory.getInstance().increment(client.getAppName() + "-getSome-CheckedOperationTimeout", client.getTagList());

for (Operation op : ops) {
if (op.isCancelled() && throwException) throw new ExecutionException(new CancellationException("Cancelled"));
if (op.hasErrored() && throwException) throw new ExecutionException(op.getException());
for (int i = 0; i < operationStates.length(); i++) {
SingleOperationState state = operationStates.get(i);
if (state.cancelled && throwException) throw new ExecutionException(new CancellationException("Cancelled"));
if (state.errored && throwException) throw new ExecutionException(state.op.getException());
}

Map<String, T> m = new HashMap<String, T>();
for (Map.Entry<String, Future<T>> me : rvMap.entrySet()) {
m.put(me.getKey(), me.getValue().get());
}

subscriber.onSuccess(m);
} catch (Throwable e) {
subscriber.onError(e);
Expand Down Expand Up @@ -276,6 +304,10 @@ public void signalComplete() {
super.signalComplete();
}

public void signalSingleOpComplete(int sequenceNo, GetOperation op) {
this.operationStates.set(sequenceNo, new SingleOperationState(op));
}

public boolean cancel(boolean ign) {
if(log.isDebugEnabled()) log.debug("Operation cancelled", new Exception());
return super.cancel(ign);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,27 @@ public <T> EVCacheBulkGetFuture<T> asyncGetBulk(Collection<String> keys,
final CountDownLatch latch = new CountDownLatch(initialLatchCount);
final Collection<Operation> ops = new ArrayList<Operation>(chunks.size());
final EVCacheBulkGetFuture<T> rv = new EVCacheBulkGetFuture<T>(m, ops, latch, executorService, client);
GetOperation.Callback cb = new GetOperation.Callback() {
rv.setExpectedCount(chunks.size());

final DistributionSummary dataSizeDS = getDataSizeDistributionSummary(
EVCacheMetricsFactory.BULK_OPERATION,
EVCacheMetricsFactory.READ,
EVCacheMetricsFactory.IPC_SIZE_INBOUND);

class EVCacheBulkGetSingleFutureCallback implements GetOperation.Callback {
final int thisOpId;
GetOperation op = null;
public EVCacheBulkGetSingleFutureCallback(int thisOpId) {
this.thisOpId = thisOpId;
}

void bindOp(GetOperation op) {
assert this.op == null;
assert op != null;

this.op = op;
}

@Override
public void receivedStatus(OperationStatus status) {
if (log.isDebugEnabled()) log.debug("GetBulk Keys : " + keys + "; Status : " + status.getStatusCode().name() + "; Message : " + status.getMessage() + "; Elapsed Time - " + (System.currentTimeMillis() - rv.getStartTime()));
Expand All @@ -223,13 +243,16 @@ public void receivedStatus(OperationStatus status) {
@Override
public void gotData(String k, int flags, byte[] data) {
if (data != null) {
getDataSizeDistributionSummary(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, EVCacheMetricsFactory.IPC_SIZE_INBOUND).record(data.length);
dataSizeDS.record(data.length);
}
m.put(k, tcService.decode(tc, new CachedData(flags, data, tc.getMaxSize())));
}

@Override
public void complete() {
assert op != null;

rv.signalSingleOpComplete(thisOpId, op);
if (pendingChunks.decrementAndGet() <= 0) {
latch.countDown();
getTimer(EVCacheMetricsFactory.BULK_OPERATION, EVCacheMetricsFactory.READ, rv.getStatus(), (m.size() == keys.size() ? EVCacheMetricsFactory.YES : EVCacheMetricsFactory.NO), null, getReadMetricMaxValue()).record((System.currentTimeMillis() - rv.getStartTime()), TimeUnit.MILLISECONDS);
Expand All @@ -241,10 +264,14 @@ public void complete() {
// Now that we know how many servers it breaks down into, and the latch
// is all set up, convert all of these strings collections to operations
final Map<MemcachedNode, Operation> mops = new HashMap<MemcachedNode, Operation>();
int thisOpId = 0;
for (Map.Entry<MemcachedNode, Collection<String>> me : chunks.entrySet()) {
Operation op = opFact.get(me.getValue(), cb);
EVCacheBulkGetSingleFutureCallback cb = new EVCacheBulkGetSingleFutureCallback(thisOpId);
GetOperation op = opFact.get(me.getValue(), cb);
cb.bindOp(op);
mops.put(me.getKey(), op);
ops.add(op);
thisOpId++;
}
assert mops.size() == chunks.size();
mconn.checkState();
Expand Down
Loading