Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a proof-of-concept for "Observer-like" batch loading #148

Conversation

AlexandreCarlton
Copy link
Collaborator

Note: This pull request, as is, is not (yet) intended for merge. It is created to provide a proof of concept and gauge interest as polishing/testing this requires a non-trivial amount of effort.

Motivation

The current DataLoader mechanism completes the corresponding CompletableFuture for a given key when the corresponding value is returned. Currently, DataLoader's BatchLoader assumes that the underlying batch function can only return all of its requested items at once (as an example, a SQL database query).

However, the batch function may be a service that can return items progressively using a subscription-like architecture. Some examples include:

Streaming results in this fashion offers several advantages:

  • Certain values may be returned earlier than others (for example, the batch function may have cached values it can return early).
  • Memory load is lessened on the batch function (which may be an external service), as it does not need to keep hold of the retrieved values before it can send them out at once.
  • We are able to save the need to stream individual error values by providing an onError function to terminate the stream early.

Proposal

We provide two new BatchLoaders and support for them in java-dataloader:

  • ObserverBatchLoader, with a load function that accepts:
    • a list of keys.
    • a BatchObserver intended as a delegate for publisher-like structures found in Project Reactor and Rx Java. This obviates the need to depend on external libraries.
  • MappedObserverBatchLoader, similar to ObserverBatchLoader but with an onNext that accepts a key and value (to allow for early termination of streams without needing to process nulls).
  • *WithContext variants for the above.

The key value-add is that the implementation of BatchObserver (provided to the load functions) will immediately complete the queued future for a given key when onNext is called with a value. This means that if we have a batch function that can deliver values progressively, we can continue evaluating the query as the values arrive. As an arbitrary example, let's have a batch function that serves both the reporter and project fields on a Jira issue:

query {
  issue {
    project {
      issueTypes { ... }
    }
    reporter { ... }
  }
}

If the batch function can return a project immediately but is delayed in when it can reporter, then our batch loader can return project and start evaluating the issueTypes immediately while we load the reporter in parallel. This would provide a more performant query evaluation.

As mentioned above, this is not in a state to be merged - this is intended to gauge whether this is something the maintainers would be interested in owning. Should this be the case, the author is willing to test/polish this pull request so that it may be merged.

**Note**: This commit, as-is, is not (yet) intended for merge. It is
created to provide a proof-of-concept and gauge interest as
polishing/testing this requires a non-trivial amount of effort.

Motivation
==========

The current DataLoader mechanism completes the corresponding
`CompletableFuture` for a given key when the corresponding value is
returned. However, DataLoader's `BatchLoader` assumes that the
underlying batch function can only return all of its requested items at
once (as an example, a SQL database query).

However, the batch function may be a service that can return items
progressively using a subscription-like architecture. Some examples
include:

 - Project Reactor's [Subscriber](https://www.reactive-streams.org/reactive-streams-1.0.4-javadoc/org/reactivestreams/Subscriber.html).
 - gRPC's [StreamObserver](https://grpc.github.io/grpc-java/javadoc/io/grpc/stub/StreamObserver.html).
 - RX Java's [Flowable](https://reactivex.io/RxJava/3.x/javadoc/io/reactivex/rxjava3/core/Flowable.html).

Streaming results in this fashion offers several advantages:

 - Certain values may be returned earlier than others (for example, the
   batch function may have cached values it can return early).
 - Memory load is lessened on the batch function (which may be an
   external service), as it does not need to keep hold of the retrieved
   values before it can send them out at once.
 - We are able to save the need to stream individual error values by
   providing an `onError` function to terminate the stream early.

Proposal
========

We provide two new `BatchLoader`s and support for them in
`java-dataloader`:

 - `ObserverBatchLoader`, with a load function that accepts:
   - a list of keys.
   - a `BatchObserver` intended as a delegate for publisher-like
     structures found in Project Reactor and Rx Java. This obviates the
     need to depend on external libraries.
 - `MappedObserverBatchLoader`, similar to `ObserverBatchLoader` but
   with an `onNext` that accepts a key _and_ value (to allow for early
   termination of streams without needing to process `null`s).
 - `*WithContext` variants for the above.

The key value-add is that the implementation of `BatchObserver`
(provided to the load functions) will immediately complete the queued
future for a given key when `onNext` is called with a value. This means
that if we have a batch function that can deliver values progressively,
we can continue evaluating the query as the values arrive. As an
arbitrary example, let's have a batch function that serves both the
reporter and project fields on a Jira issue:

```graphql
query {
  issue {
    project {
      issueTypes { ... }
    }
    reporter { ... }
  }
}
```

If the batch function can return a `project` immediately but is delayed
in when it can `reporter`, then our batch loader can return `project`
and start evaluating the `issueTypes` immediately while we load the
`reporter` in parallel. This would provide a more performant query
evaluation.

As mentioned above, this is not in a state to be merged - this is
intended to gauge whether this is something the maintainers would be
interested in owning. Should this be the case, the author is willing to
test/polish this pull request so that it may be merged.
* This {@link BatchObserver} should not have any method invoked after this is called.
*/
void onError(Throwable e);
}
Copy link
Member

@bbakerman bbakerman May 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost exactly java.util.concurrent.Flow.Subscriber but without the subscription part. I guess one can say that the data loader code IS the subscriber

But I think if this was backed under the covers but reactive code publishing "new items" then something has to subscribe to them - should we expose that and use a Java native class?

Maybe not - because we have other variations of this "observer" such as your MappedBatchedObserver which is the same but its onNext is K+V

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ps if we did use Flow we would have to move to Java 11 - right now data loader is still java 8 - not a big deal but still

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other graphql-java libs are on Java 11, is there anything blocking dataloader getting upgraded to Java 11 too?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No blocking reason to go to 11 - just that we havent

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also - maybe all these new classes should be in their own package - we have 6 new classes that are all variations of what is now called Observer should I think that should be in their observer package

Also.... I hate the name Observer as stated elsewhere

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's easier to do a Java 11 upgrade ahead of this PR I can help out

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #150

}

// A simple wrapper class intended as a proof external libraries can leverage this.
private static class Publisher<K, V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can put say project reactor as a test dependency into this code base.

That way you can test with real Publisher / Subscriber s

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - though I spent more time than I'd like to admit trying to figure out why my Subscriber wouldn't run (until I triggered subscriber.request(...)).

return (keys, observer) -> {
Publisher<K, K> publisher = new Publisher<>(observer);
Map<K, K> valueByKey = keys.stream().collect(toMap(identity(), identity()));
publisher.subscribe(valueByKey);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see this idea of subscription as a key aspect to be addressed.

Imagine some one had a reactive real Publisher of events. At what point is a subscription made?

Do people have to write this own wrappers so that they can move between a Reactor stream of items and this ObserverBatchLoader / MappedObserverBatchLoader say?

This could be tedious. In theory this library COULD become dependent on reactive streams like graphql-java is

    def reactiveStreamsVersion = '1.0.3'
    api 'org.reactivestreams:reactive-streams:' + reactiveStreamsVersion

This would allow you to have interfaces that can take a official Publisher of events and you could write the Subscription interfacing code that made your actual ObserverBatchLoader / MappedObserverBatchLoader

I think if we allowed this PR then I would have this extra dependency - not to a implementation like Reactor - but to the raw streams api

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imagine adator code like

return (keys, observer) -> {
     Publisher reactiveImplPublisher = createReactivePublisherFromKeys(keys);
     ReactiveAdapater.subscribeTo(reactiveImplPublisher, observer);
}

This would allow you to call put to a real reactive system and then adapt the publisher back to this observer via this helper code we supply. Rather than writing a subscription youself that ends up fowarding everything to the observer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, these now are based on reactive-stream's Subscribers.

await().until(() -> future1.isDone() && future2.isDone());
assertThat(future1.get(), equalTo(1));
assertThat(future2.get(), equalTo(2));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need tests to show how you may ask for N keys bit only some of them complete. We should be able to see that dl.load(k1) has completed but dl.load(k2) is not completed say

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting to indicate that I have not yet completed this but I have not forgotten this.

return batchLoad
.thenApply(values -> {
assertResultSize(keys, values);
if (isObserverLoader() || isMapObserverLoader()) {
// We have already completed the queued futures by the time the overall batchLoad future has completed.
return values;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So with you new batcher loaders they complete the CF as they go ??

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand now as I have seen all the PR

@@ -546,4 +615,188 @@ private CompletableFuture<List<V>> setToValueCache(List<V> assembledValues, List
private static <T> DispatchResult<T> emptyDispatchResult() {
return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
}

private class BatchObserverImpl implements BatchObserver<V> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The class is already a MONSTER - this should be there own package level classes.

We should move them to their own class - package protected say

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

except you use inner methods so..... maybe they need to stay...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I initially did make them static but it turned out to be very convenient to keep them as inner classes. Happy to revisit this because, yes, this is a beast of a class.


@Override
public void onNext(V value) {
assert !onErrorCalled && !onCompletedCalled;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert in java is blurrg - use a real assert!

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - have fixed up with assertState.


private final List<K> clearCacheKeys = new ArrayList<>();
private final List<V> completedValues = new ArrayList<>();
private int idx = 0;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in a reactive world, the onNext callback can happen at any time on any thread. There is no happens before here for values inherently... So these fields need to be synchronised in some manner.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've lazily chucked a synchronized on this until we work out a better way to manage this.

public void onNext(V value) {
assert !onErrorCalled && !onCompletedCalled;

K key = keys.get(idx);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So the assumption here is that the Publisher of events does so in key order - which makes sense - the other BatchLoader requires it to be in key order - but this would need to be documented STRONGLY.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we manage this order with the knowledge #onNext can be invoked by multiple threads? Do we need to add extra guardrails against this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can know - its part of the contract - given the List<K> of keys each value returned will be matched with index in the order they arrive. I dont think we can have guard rails beyond strong documentation.

K key = keys.get(idx);
Object callContext = callContexts.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
if (value instanceof Throwable) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure this works in reactive land. The contract that you get a Throwable via onNext is not something to cover and I dont think this could should either

We should expect a value that can be cast to type V - or do we do this in other BartchLoader code ;)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this was me being (too) defensive and not knowing Reactive idioms. Removed.

Copy link
Member

@bbakerman bbakerman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very supportive of these changes. I like the idea of a way to make a batch really be a Publisher stream of events

I would like to take this code further.

I think I have a few thoughts

Naming is harder :

BatchObserver ?? Observer as a name has been poisoned by Java Observer for me.

  • BatchPublisher
  • BatchFlow
  • BatchReactiveStream

I have ordered my suggested names

I think we should put reactive steams as a dependency and write "adapters" from a Publisher to BatchedObserverXXX

That way we can take a real stream of reactive data from day one - people can write their own adaptors but we provide one that can ready a Publisher out of the box

The README should be updated for real when this gets more real.

The tests should show more error cases - more half completed cases

We should use a real reactive impl in the tests - eg reactor say

We should test for "key order" for the BatchedObserverXXX test cases

/**
* To be called by the {@link MappedObserverBatchLoader} to process a new key/value pair.
*/
void onNext(K key, V value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the same class as BatchObserver<K, V> except onNext has a key and value

How about making it

public interface MappedBatchObserver<K, V> extends BatchObserver<K, V>  {

    void onNext(java.util.Map.Entry<K,V> nextItem);

?? (if that compiles)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This maybe reduce the amount of code you need to handle the two AND also maps on reactive streams Publisher better which is only ever 1 value

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I've elected to deprecate both in favour of:

  • Publisher<V>
  • Publisher<Map.Entry<K, V>>

@bbakerman bbakerman changed the base branch from master to reactive-streams-branch May 17, 2024 11:15
@bbakerman
Copy link
Member

I have created a new branch reactive-streams-branch that can hold these changes. We can build it up progessively on that branch - once its complete enough to be on master, we can merge it.

This will allow N people to work on this code idea

@dondonz dondonz added this to the Next release 3.4.0 milestone May 17, 2024
…er-proof-of-concept

* origin/master:
  Bump to Java 11
`reactive-streams` has become the de-facto standard for reactive
frameworks; we thus use this as a base to allow seamless interop (rather
than prompt an extra adapter layer).
This gives us more workable exceptions.
Passing an exception into `onNext` is not typically done in
reactive-land - we would instead call `onError(Throwable)`. We can thus
avoid handling this case.
This is keeping in line with the other methods found in
`DataLoaderFactory`.
Given the large number of existing tests, we copy across this existing
set for our publisher tests.

What this really indicates is that we should invest in parameterised
testing, but this is a bit painful in JUnit 4 - so we'll bump to JUnit 5
independently and parameterise when we have this available.

This is important because re-using the existing test suite reveals a
failure that we'll need to address.
This keeps in line with the original suggestion (because yours truly
couldn't read, apparently).

We also purge any remaining mention of 'observer', which was the first
swing at this code.
Multiple threads may call `onNext` - we thus (lazily) chuck a
`synchronized` to ensure correctness at the cost of speed.

In future, we should examine how we should manage this concurrency
better.
Copy link
Collaborator Author

@AlexandreCarlton AlexandreCarlton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've made various updates in response to the thorough review by @bbakerman. There are some points I've yet to address, which include:

  • synchronisation on #onNext.
  • test coverage.

Running the existing DataLoader tests on the *Publisher variants have revealed a flaw in their implementations. I would therefore like to parameterise the existing tests based on individual DataLoaders with different backends. However, doing this in JUnit 4 is rather painful, so I will raise a separate PR raising the existing suite to JUnit 5 to see if this is palatable.

* This {@link BatchObserver} should not have any method invoked after this is called.
*/
void onError(Throwable e);
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done: #150

@@ -546,4 +615,188 @@ private CompletableFuture<List<V>> setToValueCache(List<V> assembledValues, List
private static <T> DispatchResult<T> emptyDispatchResult() {
return (DispatchResult<T>) EMPTY_DISPATCH_RESULT;
}

private class BatchObserverImpl implements BatchObserver<V> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I initially did make them static but it turned out to be very convenient to keep them as inner classes. Happy to revisit this because, yes, this is a beast of a class.


private final List<K> clearCacheKeys = new ArrayList<>();
private final List<V> completedValues = new ArrayList<>();
private int idx = 0;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've lazily chucked a synchronized on this until we work out a better way to manage this.


@Override
public void onNext(V value) {
assert !onErrorCalled && !onCompletedCalled;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - have fixed up with assertState.

public void onNext(V value) {
assert !onErrorCalled && !onCompletedCalled;

K key = keys.get(idx);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do we manage this order with the knowledge #onNext can be invoked by multiple threads? Do we need to add extra guardrails against this?

K key = keys.get(idx);
Object callContext = callContexts.get(idx);
CompletableFuture<V> future = queuedFutures.get(idx);
if (value instanceof Throwable) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, this was me being (too) defensive and not knowing Reactive idioms. Removed.

/**
* To be called by the {@link MappedObserverBatchLoader} to process a new key/value pair.
*/
void onNext(K key, V value);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, I've elected to deprecate both in favour of:

  • Publisher<V>
  • Publisher<Map.Entry<K, V>>

}

// A simple wrapper class intended as a proof external libraries can leverage this.
private static class Publisher<K, V> {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - though I spent more time than I'd like to admit trying to figure out why my Subscriber wouldn't run (until I triggered subscriber.request(...)).

return (keys, observer) -> {
Publisher<K, K> publisher = new Publisher<>(observer);
Map<K, K> valueByKey = keys.stream().collect(toMap(identity(), identity()));
publisher.subscribe(valueByKey);
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, these now are based on reactive-stream's Subscribers.

await().until(() -> future1.isDone() && future2.isDone());
assertThat(future1.get(), equalTo(1));
assertThat(future2.get(), equalTo(2));
}
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting to indicate that I have not yet completed this but I have not forgotten this.

AlexandreCarlton added a commit to AlexandreCarlton/java-dataloader that referenced this pull request May 18, 2024
This change is being made to faciliate parameterised tests which should
greatly aid in adding coverage for graphql-java#148.

This was largely powered by the [OpenRewrite recipe](https://docs.openrewrite.org/recipes/java/testing/junit5/junit4to5migration) for JUnit 4.x to
Jupiter migration, with a few extra tweaks:

 - imports optimised to be single-class (i.e. no `import foo.*;`).
 - removed `test_` prefix from legacy JUnit 3 methods.

Notably, this pulls in `org.hamcrest` for `MatcherAssert.assertThat`,
which is recommended by both the recipe (which handled this migration)
and IntelliJ.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is almost an exact copy of DataLoaderTest, just using the BatchPublisher as the batch function.

Rather than have this duplication, I would like to invest in parameterised testing (parameterising on the DataLoader), and as such I have raised #152 to ease this.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#152 is now merged

@Override
public void onSubscribe(Subscription subscription) {
subscription.request(keys.size());
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the most sensible thing - to ask for all keys - I had the thought that setup might want to control how many get asked at a time BUT we need to get key.size() amount and they will all be held in memory anyway so getting them all is the most sensible thing to do

}

completedValues.add(value);
idx++;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we might want another call to subscription.request() here just in case?? I mean I know we asked for key.size() earlier. You are allowed to ask for more than there are... Hmmm....

}
}

private class DataLoaderMapEntrySubscriber implements Subscriber<Map.Entry<K, V>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

@Override
public void onNext(Map.Entry<K, V> entry) {
assertState(!onErrorCalled, () -> "onError has already been called; onNext may not be invoked.");
assertState(!onCompleteCalled, () -> "onComplete has already been called; onNext may not be invoked.");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

its ok to just track "finished" whether that be via onError or onComplete - you cant give a an error message as good as these but it saves a boolean in memory terms and simplifies the class

private final List<K> clearCacheKeys = new ArrayList<>();
private final Map<K, V> completedValuesByKey = new HashMap<>();
private boolean onErrorCalled = false;
private boolean onCompleteCalled = false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

        private final CompletableFuture<List<V>> valuesFuture;
        private final List<K> keys;
        private final List<Object> callContexts;
        private final List<CompletableFuture<V>> queuedFutures;

        private final List<K> clearCacheKeys = new ArrayList<>();
        private final List<V> completedValues = new ArrayList<>();
        private int idx = 0;

        private boolean onErrorCalled = false;
        private boolean onCompleteCalled = false;

90% of the variables required at the same - can we not derive from a common class and hence get some code sharing?

return newMappedPublisherDataLoader((MappedBatchPublisher<K, V>) (keys, subscriber) -> {
loadCalls.add(new ArrayList<>(keys));
Flux.<Map.Entry<K, V>>error(new IllegalStateException("Error")).subscribe(subscriber);
}, options);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need a test for the case where say 1/2 the values are sent AND then it blows up. So that we error out ONLY those that have not completed.

ps. you may have this but I am wrting this comment as I think of the use case.

* (rather than when all values have been retrieved).
* <p>
* <b>NOTE:</b> It is <b>required </b> that {@link Subscriber#onNext(V)} is invoked on each value in the same order as
* the provided keys.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe some more explanatory text here as its so important

*/
public interface BatchPublisher<K, V> {
void load(List<K> keys, Subscriber<V> subscriber);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the name now that we have the reactive streams Subscriber in play. The name makes more sense now - its a Publisher of batched keys

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And less interfaces needed

}

@Override
public void onComplete() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be thread protected since its change the same structures as onNext() and hence can have a thread indifferent views of this state

}

@Override
public void onError(Throwable ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be synchronised as well

}

@Override
public void onNext(Map.Entry<K, V> entry) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronised all the onXXXX methods

}

@Override
public void onComplete() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronised

}

@Override
public void onError(Throwable ex) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

synchronised

@bbakerman bbakerman merged commit 288be41 into graphql-java:reactive-streams-branch May 19, 2024
@bbakerman
Copy link
Member

This has now been merged in to the reactive streams branch... please target future tweaks to that branch and once is all ready to go - we can merge it as one in master

AlexandreCarlton added a commit to AlexandreCarlton/java-dataloader that referenced this pull request May 19, 2024
The existing `DataLoaderTest` covers a very large range of test cases
which will be very useful to validate what is being added in graphql-java#148.

To allow new `*Publisher` `DataLoader`s to leverage this without
copy-pasting, we make `DataLoaderTest` a parameterised test, using two
`DataLoader` variants:

 - the stock 'List' `DataLoader`.
 - the 'Mapped' `DataLoader`.

Most of the tests in `DataLoaderTest` have been retrofitted for this
parameterisation, resulting in:

 - deduplicated code (many of the `MappedDataLoader` tests have the same
   test cases, almost line-for-line).
 - increased coverage of the `MappedDataLoader`, bolstering confidence
   in subsequent changes (rather than relying on the author
   understanding how everything is put together internally).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants