Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve scaling behavior of Mono<Void> and(Publisher<?> other) #3920

Open
PMacho opened this issue Nov 6, 2024 · 6 comments
Open

Improve scaling behavior of Mono<Void> and(Publisher<?> other) #3920

PMacho opened this issue Nov 6, 2024 · 6 comments
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/has-workaround This has a known workaround described status/need-user-input This needs user input to proceed

Comments

@PMacho
Copy link

PMacho commented Nov 6, 2024

Motivation

At the moment Mono's and(...) method scales O(N^2), slowing down when applied multiple times, e.g. like:

    @Test
    void combine() {
        int numberOfIterations = 200_000;

        Mono<Void> result = Mono.empty();

        for (int i = 0; i != numberOfIterations; ++i) {
            result = result.and(Mono.empty());
        }

        result.block();
    }

The reason is in:

	@Nullable
	Mono<Void> whenAdditionalSource(Publisher<?> source) {
		Publisher[] oldSources = sources;
		if (oldSources != null) {
			int oldLen = oldSources.length;
			Publisher<?>[] newSources = new Publisher[oldLen + 1];
			System.arraycopy(oldSources, 0, newSources, 0, oldLen);
			newSources[oldLen] = source;

			return new MonoWhen(delayError, newSources);
		}
		return null;
	}

Desired solution

I suggest, using an ArrayList for the sources field in MonoWhen, allowing to simply add new source much more efficient.

Simple comparison:

    @Test
    void addSpeedComparison() {
        test(new ArrayList<>(), 10_000_000);
        test(new LinkedList<>(), 10_000_000);
        testArray(200_000);
    }

    void test(List<Integer> list, int numberOfIterations) {
        long start = System.currentTimeMillis();

        for (int i = 0; i != numberOfIterations; i++) {
            list.add(i);
        }

        System.out.println(System.currentTimeMillis() - start);
    }

    void testArray(int count) {
        long start = System.currentTimeMillis();

        Integer[] sources = new Integer[]{};
        for (int i = 0; i != count; i++) {
            int oldLen = sources.length;
            Integer[] newSources = new Integer[oldLen + 1];
            System.arraycopy(sources, 0, newSources, 0, oldLen);
            newSources[oldLen] = i;
            sources = newSources;
        }

        System.out.println(System.currentTimeMillis() - start);
    }

, prints:

164
1184
16092

Considered alternatives

At the moment, I use Mono.fromFuture(CompletableFuture.allOf(mono1.toFuture(), mono2.toFuture())); which certainly is not the idea, I suppose.

Additional context

@PMacho PMacho added the type/enhancement A general enhancement label Nov 6, 2024
@chemicL
Copy link
Member

chemicL commented Nov 18, 2024

Hey, thanks for your interest in this subject. Can you please provide a use case where this would become a bottleneck? I wonder how practical it is to use 200K .and() operations chained together. The proposed solution with the ArrayList might actually perform better for such artificial scenarios but it would over-allocate memory for more real-life scenarios. Currently the amount of memory in use is precisely the size needed, at perhaps a higher cost of initial re-allocation of +1 higher arrays and a copy. With the proposed approach we're gaining some time benefit at the cost of extra memory allocation, which might be impractical for most applications.

@chemicL chemicL added status/need-user-input This needs user input to proceed and removed type/enhancement A general enhancement labels Nov 18, 2024
@PMacho
Copy link
Author

PMacho commented Nov 19, 2024

@chemicL

Hey, thanks for your response. I definitely understand your concern about memory footprint. However, I certainly can provide some real world use cases to support my suggestion.

I am the owner of keldysh - MessageFlow, a reactive messaging framework built on Project Reactor's core concepts to provide secure and scalable integration patterns, constructed as native reactive pipeline.
The Message abstraction is:

public interface Message<PAYLOAD> {

    PAYLOAD payload();

    Mono<Void> acknowledge();

}

This choice allows us to provide an API for the central MessageFlow extends Publisher<Message<T>> , adapted from Reactor's Flux, while acknowledgements are handled transparently in the background and of course do not block.

However, any kind of aggregation like .reduce(...) requires combining, i.e. .and(...), the original Message's acknowledge() methods into a resulting Message that holds the aggregated result, alongside the combined acknowledgements.

Similar, some messaging protocols enforce acknowledgements to be processed in the order of message receival. Since, message processing is reactive, and hence does not provide any ordering guarantees whatsoever, in such cases we maintain acknowledgements in a store that reconstructs ordering on demand, again combining acknowledgements with the .and(...) method.

Both examples are rather typical scenarios and easily pile up millions of messages. From our perspective, the increased memory footprint of an ArrayList as compared to a simple array would therefore be rather negligible.

@chemicL
Copy link
Member

chemicL commented Nov 20, 2024

Thanks for the additional context.

I played a bit with your benchmark. Please note, that's not the right way to do benchmarks in Java. Consider using JMH for more reliable tests in the future. Nevertheless, just to carry on with the brief feedback, I altered your example to depict the expected, common case of a chain of around 3 .and operators:

	public static void main(String[] args) {
		measure(() -> test(new ArrayList<>(), 3), 1_000_000);
		measure(() -> test(new LinkedList<>(), 3), 1_000_000);
		measure(() -> testArray(3), 1_000_000);
	}

	static void measure(Runnable r, int iterations) {
		long start = System.currentTimeMillis();
		for (int i = 0; i < iterations; i++) {
			r.run();
		}
		System.out.println(System.currentTimeMillis() - start);
	}

	static void test(List<Integer> list, int numberOfIterations) {
		for (int i = 0; i != numberOfIterations; i++) {
			list.add(i);
		}
	}

	static void testArray(int count) {
		Integer[] sources = new Integer[]{};
		for (int i = 0; i != count; i++) {
			int oldLen = sources.length;
			Integer[] newSources = new Integer[oldLen + 1];
			System.arraycopy(sources, 0, newSources, 0, oldLen);
			newSources[oldLen] = i;
			sources = newSources;
		}
	}

Output:

16
19
7

Not only the copy looks as if it's quicker, the use of it will also be much faster than going through the ArrayList proxy for access, but that's not included in the above pseudo-benchmark.

When it comes to chaining operators in such great lengths, the stack depth is growing enormously. Hence, the operator performs the macro fusion and replaces the current object with an aggregated one with an added source.

However, imagine what happens when your original example is altered:

    for (int i = 0; i != numberOfIterations; ++i) {
        result = result.and(Mono.empty()).log();
    }

The optimization won't even have the chance to be applied.

In such a case, since numberOfIterations is known, directly using Mono.when(iterableOfPublishers) will yield the best result.

Is it a possibility for your library to allow users to provide/work with an Iterable of these acknowledgements instead of having to chain them via .and?

@chemicL chemicL added the status/has-workaround This has a known workaround described label Nov 20, 2024
@PMacho
Copy link
Author

PMacho commented Nov 20, 2024

I mean, I called the suggestion "Improve scaling behavior ... ". With n=3 scaling behavior is not even tested.

Finally, it is a decision on your end if you want to squeeze out every bit of performance for simple cases, at the cost of potentially stalled applications at real-world scenarios. Or, if you are willing to compromize on a couple of milliseconds at 1 million times three operations, to guarantee a library that performs well under any circumstances.
For me, this decision would be a no brainer, given that Project Reactor is being used in millions of enterprize application servers where it is not always easy to trace down issues to the point we just did.

To answer the question: users do not work with the acknowledgements at all. That's actually one of the central points, to guarantee consistent processing.
Providing an iterableOfPublishers, however, essentially means, I am implementing the .and(...) method on my side, circumventing the one provided by Mono. Well, of course I could find a way do that. But i wonder if is this really the goal of the Mono API.

@chemicL
Copy link
Member

chemicL commented Nov 20, 2024

if you want to squeeze out every bit of performance for simple cases

Yes. We do. Especially since those simple cases are more common, e.g. in request-response scenarios where a bit of manipulation is performed here and there and if these add up, the latency increases reducing the scalability of the system. What's especially important to monitor is not just the assembly time but the actual runtime - with simple arrays now, the patterns are straightforward. Introducing an ArrayList can be a burden as each access needs to go through a level of indirection. More on that later.

at the cost of potentially stalled applications at real-world scenarios

You are free to use Mono.when() for your use case and avoid the issue altogether. It seems more like a special case rather than the norm to have both:

  1. The assumption .and is chained after another .and and no other operator is allowed between them.
  2. Thousands of these .and operators chained together.

willing to compromize on a couple of milliseconds at 1 million times three operations

That's a broader subject to be honest. The topics of a hot path, Amdahl's law, GC, etc. It's not only about milliseconds, yet they still do count.

For me, this decision would be a no brainer

Having an established and mature library, miniscule changes optimizing for specific use cases can harm the majority of usages so we need to be careful.

I am not opposing the change per se, by no means. I actually wonder whether the JIT compiler would optimize away some of the concerns I seem to have. During the subscription phase, a bare array can still be obtained and used.

To push the research forward we'd need:

  1. JMH benchmark showcasing a comparison of the existing implementation with the proposed one with allocation stats, GC stats.
  2. Have the benchmark execute against representative number of chainings, e.g. 1, 2, 5, 1000, 100_000.
  3. Measure the assembly time and resource utilization.
  4. Exercise the particular chain with an actual subscription to see the access patterns and measure that.
  5. Run with JDK8 and JDK21 for comparison.

If we see that the proposed change is not significantly impacting the current case (an order of magnitude change would be unaccaptable, a few % here and there should be ok.

Important things to note:

  • new ArrayList<>() allocates 10 items by default upon first addition - we don't want that overallocation.
  • Flux#zipWith uses the same type of optimization
  • Mono#then(Mono) as well

Having the above, there's some initial work to be done to begin considering the change. We can see if there's more community demand for improvements here and based on that we can spend some time on it or you can drive the discussion with the research. @PMacho Are you interested in pushing this forward?

@chemicL chemicL added the for/user-attention This issue needs user attention (feedback, rework, etc...) label Nov 20, 2024
@PMacho
Copy link
Author

PMacho commented Nov 23, 2024

The effect of the JIT can be anticipated simply with:

    static void measure(Runnable r, int iterations) {
        for (int i = 0; i != iterations; i++) {
            r.run();
        }
        long start = System.currentTimeMillis();
        for (int i = 0; i != iterations; i++) {
            r.run();
        }
        System.out.println(System.currentTimeMillis() - start);
    }

Above all, this optimizes LinkedList's bahavior. Little tweaking can be done with ArrayList's initial capacity. However, memory allocation does not scale linear.

From what we saw already, the difference between ArrayList and a plain array, for small chains like three, is rather a factor of two that an order of magnitude. Especially, both are still in the nano seconds range. While, for long chains, it becomes multiple orders of magnitude, easily scaling to infinite blocking. The latter is a simple mathematical fact, obviously resulting from the quadratic scaling behavior. I honestly don't see a need for a benchmark here. Mathematical laws can not be optimized away by the JIT.

However, yes, I like to push the issue, since I think a general purpose library like Project Reactor should perform well on every scale, rather then being optimized for simple cases while entirely braking at scale. Yet, no, at the moment I am not willing to implement benchmarks, proving obvious mathematical facts.

Don't get me wrong, I am a fan of optimizing, even nano optimizations like using arrays over Lists or != over <' in for` loops. However, optimization at cost of scaling behavior, in my opinion, is pretty much premature. Unless, of course, the scale is given at any time, which it is not in this particular case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
for/user-attention This issue needs user attention (feedback, rework, etc...) status/has-workaround This has a known workaround described status/need-user-input This needs user input to proceed
Projects
None yet
Development

No branches or pull requests

2 participants