Skip to content

Commit

Permalink
Merge pull request googleapis#29 from garrettjonesgoogle/master
Browse files Browse the repository at this point in the history
Adding a blocking call count threshold
  • Loading branch information
garrettjonesgoogle committed Mar 21, 2016
2 parents 6672684 + fc5b21a commit bf25ddc
Show file tree
Hide file tree
Showing 20 changed files with 700 additions and 85 deletions.
12 changes: 9 additions & 3 deletions src/main/java/com/google/api/gax/bundling/BundlingThreshold.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,28 @@
/**
* The interface representing a threshold to be used in ThresholdBundler.
* Thresholds do not need to be thread-safe if they are only used inside
* of ThresholdBundler.
* ThresholdBundler.
*/
public interface BundlingThreshold<E> {

/**
* Presents the element to the threshold for the attribute of interest to be accumulated.
*
* Any calls into this function from ThresholdBundler will be under a lock.
*/
void accumulate(E e);

/**
* Any calls into this function from ThresholdBundler will be under a lock.
*
* @return whether the threshold has been reached.
*/
boolean isThresholdReached();

/**
* Reset the accumulated value back to zero.
* Make a copy of this threshold but with the accumulated value zeroed.
*
* Any calls into this function from ThresholdBundler will be under a lock.
*/
void reset();
BundlingThreshold<E> copyWithZeroedValue();
}
41 changes: 41 additions & 0 deletions src/main/java/com/google/api/gax/bundling/ExternalThreshold.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package com.google.api.gax.bundling;

/**
* The interface representing an external threshold to be used in ThresholdBundler.
*
* An external threshold is a threshold which depends on events external to the
* ThresholdBundler.
*
* Thresholds do not need to be thread-safe if they are only used inside
* ThresholdBundler.
*/
public interface ExternalThreshold<E> {

/**
* Called from ThresholdBundler when the first item in a bundle has been added.
*
* Any calls into this function from ThresholdBundler will be under a lock.
*/
void startBundle();

/**
* Called from ThresholdBundler.BundleHandle when externalThresholdEvent is called.
*
* Any calls into this function from ThresholdBundler will be under a lock.
*
* @param bundleHandle if the threshold has been reached, the external threshold
* should call BundleHandle.flushIfNotFlushedYet().
* @param event the event for the external threshold to handle. If not recognized,
* this external threshold should ignore it.
*/
void handleEvent(ThresholdBundleHandle bundleHandle, Object event);


/**
* Make a copy of this threshold but with the accumulated value zeroed.
*
* Any calls into this function from ThresholdBundler will be under a lock.
*/
ExternalThreshold<E> copyWithZeroedValue();

}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public boolean isThresholdReached() {
}

@Override
public void reset() {
sum = 0;
public BundlingThreshold<E> copyWithZeroedValue() {
return new NumericThreshold<E>(threshold, extractor);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.google.api.gax.bundling;

/**
* A handle to a bundle in a ThresholdBundler. Using this handle, code external
* to a ThresholdBundler can safely ensure that a particular bundle has been
* flushed without accidentally flushing a following bundle.
*/
public interface ThresholdBundleHandle {
/**
* Notifies the ThresholdBundler of an event for this threshold bundle, which
* is targeted at a particular ExternalThreshold.
*/
void externalThresholdEvent(Object event);

/**
* Flush this bundle if it hasn't been flushed yet.
*/
void flush();
}
167 changes: 153 additions & 14 deletions src/main/java/com/google/api/gax/bundling/ThresholdBundler.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,11 @@

package com.google.api.gax.bundling;

import com.google.api.client.util.Lists;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;

import org.joda.time.Duration;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -45,40 +44,113 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

import org.joda.time.Duration;

/**
* Queues up elements until either a duration of time has passed or any threshold in a given set of
* thresholds is breached, and then delivers the elements in a bundle to the consumer.
*/
public class ThresholdBundler<E> {

private final ImmutableList<BundlingThreshold<E>> thresholds;
private ImmutableList<BundlingThreshold<E>> thresholds;
private ImmutableList<ExternalThreshold<E>> externalThresholds;
private final Duration maxDelay;

private final Lock lock = new ReentrantLock();
private final Condition bundleCondition = lock.newCondition();
private boolean bundleReady = false;
private BundleHandle currentBundleHandle;

private Stopwatch bundleStopwatch;
private final List<E> data = new ArrayList<>();

public ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds) {
this(null, thresholds);
private ThresholdBundler(ImmutableList<BundlingThreshold<E>> thresholds,
ImmutableList<ExternalThreshold<E>> externalThresholds,
Duration maxDelay) {
this.thresholds = copyResetThresholds(Preconditions.checkNotNull(thresholds));
this.externalThresholds = copyResetExternalThresholds(
Preconditions.checkNotNull(externalThresholds));
this.maxDelay = maxDelay;
this.currentBundleHandle = new BundleHandle(externalThresholds);
}

public ThresholdBundler(Duration maxDelay) {
this(maxDelay, ImmutableList.<BundlingThreshold<E>>of());
/**
* Builder for a ThresholdBundler.
*/
public static class Builder<E> {
private List<BundlingThreshold<E>> thresholds;
private List<ExternalThreshold<E>> externalThresholds;
private Duration maxDelay;

private Builder() {
thresholds = Lists.newArrayList();
externalThresholds = Lists.newArrayList();
}

/**
* Set the max delay for a bundle. This is counted from the first item
* added to a bundle.
*/
public Builder<E> setMaxDelay(Duration maxDelay) {
this.maxDelay = maxDelay;
return this;
}

/**
* Set the thresholds for the ThresholdBundler.
*/
public Builder<E> setThresholds(List<BundlingThreshold<E>> thresholds) {
this.thresholds = thresholds;
return this;
}

/**
* Add a threshold to the ThresholdBundler.
*/
public Builder<E> addThreshold(BundlingThreshold<E> threshold) {
this.thresholds.add(threshold);
return this;
}

/**
* Set the external thresholds for the ThresholdBundler.
*/
public Builder<E> setExternalThresholds(List<ExternalThreshold<E>> externalThresholds) {
this.externalThresholds = externalThresholds;
return this;
}

/**
* Add an external threshold to the ThresholdBundler.
*/
public Builder<E> addExternalThreshold(ExternalThreshold<E> externalThreshold) {
this.externalThresholds.add(externalThreshold);
return this;
}

/**
* Build the ThresholdBundler.
*/
public ThresholdBundler<E> build() {
return new ThresholdBundler<E>(
ImmutableList.copyOf(thresholds),
ImmutableList.copyOf(externalThresholds),
maxDelay);
}
}

public ThresholdBundler(Duration maxDelay, ImmutableList<BundlingThreshold<E>> thresholds) {
this.thresholds = Preconditions.checkNotNull(thresholds);
this.maxDelay = maxDelay;
/**
* Get a new builder for a ThresholdBundler.
*/
public static <T> Builder<T> newBuilder() {
return new Builder<T>();
}

/**
* Adds an element to the bundler. If the element causes the collection to go past any of the
* thresholds, the bundle will be made available to consumers.
*/
public void add(E e) {
public ThresholdBundleHandle add(E e) {
final Lock lock = this.lock;
lock.lock();
try {
Expand All @@ -89,6 +161,9 @@ public void add(E e) {
// we want to trigger the signal so that we switch the await from an unbounded
// await to a time-bounded await.
signal = true;
for (ExternalThreshold<E> threshold : externalThresholds) {
threshold.startBundle();
}
}
data.add(e);
if (!bundleReady) {
Expand All @@ -104,6 +179,7 @@ public void add(E e) {
if (signal) {
bundleCondition.signalAll();
}
return currentBundleHandle;
} finally {
lock.unlock();
}
Expand Down Expand Up @@ -138,10 +214,10 @@ public int drainTo(Collection<? super E> bundle) {

bundle.addAll(data);
data.clear();
currentBundleHandle = new BundleHandle(externalThresholds);

for (BundlingThreshold<E> threshold : thresholds) {
threshold.reset();
}
thresholds = copyResetThresholds(thresholds);
externalThresholds = copyResetExternalThresholds(externalThresholds);

bundleStopwatch = null;
bundleReady = false;
Expand Down Expand Up @@ -221,4 +297,67 @@ private boolean shouldWait() {
private Duration getDelayLeft() {
return Duration.millis(maxDelay.getMillis() - bundleStopwatch.elapsed(TimeUnit.MILLISECONDS));
}

private static <E> ImmutableList<BundlingThreshold<E>> copyResetThresholds(
ImmutableList<BundlingThreshold<E>> thresholds) {
ImmutableList.Builder<BundlingThreshold<E>> resetThresholds =
ImmutableList.<BundlingThreshold<E>>builder();
for (BundlingThreshold<E> threshold : thresholds) {
resetThresholds.add(threshold.copyWithZeroedValue());
}
return resetThresholds.build();
}

private static <E> ImmutableList<ExternalThreshold<E>> copyResetExternalThresholds(
ImmutableList<ExternalThreshold<E>> thresholds) {
ImmutableList.Builder<ExternalThreshold<E>> resetThresholds =
ImmutableList.<ExternalThreshold<E>>builder();
for (ExternalThreshold<E> threshold : thresholds) {
resetThresholds.add(threshold.copyWithZeroedValue());
}
return resetThresholds.build();
}

/**
* This class represents a handle to a bundle that is being built up inside
* a ThresholdBundler. It can be used to perform certain operations on
* a ThresholdBundler, but only if the bundle referenced is still the active
* one.
*/
private class BundleHandle implements ThresholdBundleHandle {
private final ImmutableList<ExternalThreshold<E>> externalThresholds;

private BundleHandle(ImmutableList<ExternalThreshold<E>> externalThresholds) {
this.externalThresholds = externalThresholds;
}

@Override
public void externalThresholdEvent(Object event) {
final Lock lock = ThresholdBundler.this.lock;
lock.lock();

try {
for (ExternalThreshold<E> threshold : externalThresholds) {
threshold.handleEvent(this, event);
}
} finally {
lock.unlock();
}
}

@Override
public void flush() {
final Lock lock = ThresholdBundler.this.lock;
lock.lock();

try {
if (ThresholdBundler.this.currentBundleHandle != this) {
return;
}
ThresholdBundler.this.flush();
} finally {
lock.unlock();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ public void start() {
* on the inherent characteristics of the item), and then hands it off to
* the bundler.
*/
public void addToNextBundle(T item) {
public ThresholdBundleHandle addToNextBundle(T item) {
bundleReceiver.validateItem(item);
bundler.add(item);
return bundler.add(item);
}

@Override
Expand Down
Loading

0 comments on commit bf25ddc

Please sign in to comment.