-
Notifications
You must be signed in to change notification settings - Fork 119
Bundling restructure #214
Bundling restructure #214
Conversation
Codecov Report
@@ Coverage Diff @@
## master #214 +/- ##
============================================
+ Coverage 70.25% 70.48% +0.23%
- Complexity 483 490 +7
============================================
Files 67 68 +1
Lines 2538 2558 +20
Branches 266 264 -2
============================================
+ Hits 1783 1803 +20
+ Misses 657 656 -1
- Partials 98 99 +1
Continue to review full report at Codecov.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have a few comments but looks good in general.
I earlier sent a WIP PR to change ThresholdBundlingForwarder
from pulling to pushing logic. With this change, that PR becomes much less important. I still think it's valuable because it lets us avoid manual thread-synchronization. If we want the change, I think it should land after this change but before more work on logging flush. WDYT?
@@ -1,5 +1,5 @@ | |||
/* | |||
* Copyright 2016, Google Inc. All rights reserved. | |||
* Copyright 2017, Google Inc. All rights reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
package com.google.api.gax.bundling; | ||
|
||
public interface BundleFactory<T> { | ||
T createBundle(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
private final Lock lock = new ReentrantLock(); | ||
private final Condition bundleCondition = lock.newCondition(); | ||
private Bundle currentOpenBundle; | ||
private List<Bundle> closedBundles = new ArrayList<>(); | ||
private TrackedBundle currentOpenTrackedBundle; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
final Lock lock = this.lock; | ||
lock.lock(); | ||
try { | ||
Bundle outBundle = null; | ||
E outBundle = null; | ||
if (closedBundles.size() > 0) { | ||
outBundle = closedBundles.remove(0); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that we still want the change to a pushing bundler, and that it makes sense to do it before any more work on logging flush.
@@ -1,5 +1,5 @@ | |||
/* | |||
* Copyright 2016, Google Inc. All rights reserved. | |||
* Copyright 2017, Google Inc. All rights reserved. |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
package com.google.api.gax.bundling; | ||
|
||
public interface BundleFactory<T> { | ||
T createBundle(); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
final Lock lock = this.lock; | ||
lock.lock(); | ||
try { | ||
Bundle outBundle = null; | ||
E outBundle = null; | ||
if (closedBundles.size() > 0) { | ||
outBundle = closedBundles.remove(0); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
*/ | ||
void setResponse(ResponseT response); | ||
public interface Bundle<T> { | ||
void merge(T t); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
*/ | ||
void processBundle(List<T> bundle); | ||
/** Process the given bundle. */ | ||
void processBundle(T bundle); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* referenced is still the active one. | ||
*/ | ||
private class Bundle { | ||
private class TrackedBundle { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
private final Lock lock = new ReentrantLock(); | ||
private final Condition bundleCondition = lock.newCondition(); | ||
private Bundle currentOpenBundle; | ||
private List<Bundle> closedBundles = new ArrayList<>(); | ||
private TrackedBundle currentOpenTrackedBundle; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
private ImmutableList<BundlingThreshold<E>> thresholdPrototypes; | ||
private final Duration maxDelay; | ||
private final BundlingFlowController<E> flowController; | ||
private final BundleSupplier<E> bundleFactory; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -182,56 +189,48 @@ public void flush() { | |||
* | |||
* @return the number of items added to 'bundle'. | |||
*/ | |||
public int drainNextBundleTo(Collection<? super E> outputCollection) { | |||
public E takeBundle() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class BundleImpl<RequestT, ResponseT> implements Bundle<BundleImpl<RequestT, ResponseT>> { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed PR feedback. I have pushed here to help the review process, but I will be closing this PR and reopening against https://github.com/googleapis/gax-java/tree/bundling-flowcontrol-branch
*/ | ||
void setResponse(ResponseT response); | ||
public interface Bundle<T> { | ||
void merge(T t); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
*/ | ||
void processBundle(List<T> bundle); | ||
/** Process the given bundle. */ | ||
void processBundle(T bundle); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
private ImmutableList<BundlingThreshold<E>> thresholdPrototypes; | ||
private final Duration maxDelay; | ||
private final BundlingFlowController<E> flowController; | ||
private final BundleSupplier<E> bundleFactory; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
||
private final Lock lock = new ReentrantLock(); | ||
private final Condition bundleCondition = lock.newCondition(); | ||
private Bundle currentOpenBundle; | ||
private List<Bundle> closedBundles = new ArrayList<>(); | ||
private TrackedBundle currentOpenTrackedBundle; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
@@ -182,56 +189,48 @@ public void flush() { | |||
* | |||
* @return the number of items added to 'bundle'. | |||
*/ | |||
public int drainNextBundleTo(Collection<? super E> outputCollection) { | |||
public E takeBundle() { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
* referenced is still the active one. | ||
*/ | ||
private class Bundle { | ||
private class TrackedBundle { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
import java.util.ArrayList; | ||
import java.util.List; | ||
|
||
public class BundleImpl<RequestT, ResponseT> implements Bundle<BundleImpl<RequestT, ResponseT>> { |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
Replacement PR: #226 |
Updates bundling to accumulate requests in a request builder, rather than a list of requests. The major change is to introduce the Bundle interface and the BundleImpl class, which replaces a list of BundlingContext objects.