Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow concurrency in concatMap #5519

Open
JosephSilber opened this issue Jun 22, 2020 · 13 comments
Open

Allow concurrency in concatMap #5519

JosephSilber opened this issue Jun 22, 2020 · 13 comments

Comments

@JosephSilber
Copy link

Feature Request

Is your feature request related to a problem? Please describe.

We have mergeMap which can map values concurrently, but the new values it produces are not in the original order. If you want to maintain order, you must use concatMap, but concatMap doesn't run concurrently (it's equivalent to calling mergeMap with concurrent set to 1).

Describe the solution you'd like

It would be great if concatMap could take a concurrent parameter, just like mergeMap already does:

concatMap(projection, concurrent)

Describe alternatives you've considered

I don't really have a full-on alternative. In my particular use-case the inner observables only produced a single value (they're Promises), so I was able to build a convoluted workaround. See the next section.

Additional context

Here's my use-case:

I'm shipping a bunch of orders, and then printing their shipping label.

Here are those functions:

function shipOrder(orderId: number) : Promise<ShippingLabel> {
    return api.orders.ship(orderId);
}

function printLabel(label: ShippingLabel) : Promise<void> {
    return printer.print(label);
}

Now, I want to ship multiple orders at once:

from(orderNumbers)
    .pipe(mergeMap(shipOrder, 5))
    .pipe(printLabel);

...but I want the shipping labels to be printed in their original sort order.

This is where concatMap with concurrency would help:

from(orderNumbers)
    .pipe(concatMap(shipOrder, 5))
    .pipe(printLabel);

The orders would still be shipped in parallel, but the labels will all print in the correct order.


Here's a StackOverflow question and answer with way more details:

https://stackoverflow.com/questions/57045892/rxjs-mergemap-with-original-order

@josepot
Copy link
Contributor

josepot commented Jun 23, 2020

Hi @JosephSilber (and sorry if I misunderstood your problem), but would the following code solve your issue?

from(orderNumbers.map((order, idx) => [order, idx]))
  .pipe(mergeMap(([order, idx]) => shipOrder(order).then(x => [x, idx]), 5))
  .pipe(toArray())
  .pipe(
    mergeMap(labelsByIdx =>
      labelsByIdx.sort((a, b) => a[1] - b[1]).map(([label]) => label)
    )
  )
  .pipe(printLabel);

@JosephSilber
Copy link
Author

JosephSilber commented Jun 23, 2020

@josepot that would wait for all orders to have shipped, before printing the very first label. That could take quite a while.

I want the labels to be printed as soon as possible, while maintaining their original sort order.


Here's a simulation of what I'm after:

https://codepen.io/JosephSilber/pen/rNxmpWp?editors=1010

@josepot
Copy link
Contributor

josepot commented Jun 25, 2020

@JosephSilber I've been thinking about this today and these are my thoughts:

(FTR: I'm not a maintainer of this library and I don't pretend to be an authority on these topics. I'm just a RxJS aficionado sharing my thoughts 🙂 )

If the concatMap operator accepted the concurrent parameter that you are suggesting, and that made the operator behave in the way that you are describing, then concatAll and concatMapTo would also have to change their signature and their current behavior.

After thinking quite a bit about this I've gone from: "nah, this is too confusing" to "this actually makes sense".

What made me change my mind was the fact that I realized that understood concatMap as sugar for a very common use-case of mergeMap (I thought about it taking into account its current implementation).

However, if we consider the "concatenation" behavior, without thinking about how it's implemented, then I think that you are suggesting makes a lot of sense.

Perhaps the best way to convince the maintainers of RxJS that these changes are worth it, would be to first implement these operators in user-land, show that there is a need for them (ppl are using them) and after we've got a good implementation and a good set of tests in user-land, then open a PR against RxJS.

In that sense, perhaps it would make sense to send a PR to rxjs-etc (a library maintained by @cartant) with these operators and try to prove with examples and tests that it's worth it to bring those changes to core.

I just quickly implemented these operators in a personal library of mine. Whenever I have some time I will add some tests and I will send a PR to rxjs-etc. By all means, feel free to do that yourself, because I don't know when I will have the time to add the tests and send the PR.

@cartant
Copy link
Collaborator

cartant commented Jun 25, 2020

I see that as a mergeMap - with a constrained concurrency - followed by a buffered 'sort' that ensures latter mappings are not emitted before earlier mappings. You could use scan to do this. I don't see this as concatenation. IMO, in RxJS concatenation implies the sequential subscription to numerous sources. I'd suggest implementing a user-land operator that is an order-preserving mergeMap with a concurrency. It should be reasonably straightforward to use mergeMap followed by a scan operator to accumulate the buffer and emit values in the correct order. Actually, you'd need another mergeMap after the scan because scan will need to emit its buffer of value/index pairs.

@jakovljevic-mladen
Copy link
Member

Hi. I'm not sure that I have understood everything (I wasn't reading carefully though), but, if I'm right, you may want to have something like RxJava's concatEager. If this is something you need, and if you implement it, please post your solution here as that's something I may need. If I'm wrong, I'm sorry, just ignore this.

@josepot
Copy link
Contributor

josepot commented Jun 25, 2020

I'd suggest implementing a user-land operator that is an order-preserving mergeMap with a concurrency

💯

It should be reasonably straightforward to use mergeMap followed by a scan operator to accumulate the buffer and emit values in the correct order.

🤔 I guess... I tried, and it seems to work, but I think that I would rather have a custom operator using new Observable, instead. I mean, I'm sure that there is room for improvement, but still...

This is what I came up with using @cartant 's suggestion (please don't judge me, I did that while being in a rush 🙈 ):

const DONE = Symbol("DONE");
const DONE$ = of(DONE);
const sortedMergeMap = <I, O>(
  mapper: (i: I) => ObservableInput<O>,
  concurrent = 1
) => (source$: Observable<I>) =>
  source$.pipe(
    mergeMap(
      (value, idx) =>
        concat(mapper(value), DONE$).pipe(map(x => [x, idx] as const)),
      concurrent
    ),
    scan(
      (acc, [value, idx]) => {
        if (idx === acc.currentIdx) {
          if (value === DONE) {
            let currentIdx = idx;
            const valuesToEmit = [];
            do {
              currentIdx++;
              const nextValues = acc.buffer.get(currentIdx);
              if (!nextValues) {
                break;
              }
              valuesToEmit.push(...nextValues);
              acc.buffer.delete(currentIdx);
            } while (valuesToEmit[valuesToEmit.length - 1] === DONE);
            return {
              ...acc,
              currentIdx,
              valuesToEmit: valuesToEmit.filter(x => x !== DONE) as O[]
            };
          } else {
            return {
              ...acc,
              valuesToEmit: [value]
            };
          }
        } else {
          if (!acc.buffer.has(idx)) {
            acc.buffer.set(idx, []);
          }
          acc.buffer.get(idx)!.push(value);
          if (acc.valuesToEmit.length > 0) {
            acc.valuesToEmit = [];
          }
          return acc;
        }
      },
      {
        currentIdx: 0,
        valuesToEmit: [] as O[],
        buffer: new Map<number, (O | typeof DONE)[]>([[0, []]])
      }
    ),
    mergeMap(scannedValues => scannedValues.valuesToEmit)
  );

@josepot
Copy link
Contributor

josepot commented Jun 26, 2020

@jakovljevic-mladen yes, that's almost exactly what we are talking about.

The only difference is that concatEager does not take as a parameter the maximum number of input Observables being subscribed to concurrently, while the operator that we are describing it does.

Yesterday night I took some time to carefully implement a version of this operator for JS. Today I've added a few tests and I've opened this PR against rxjs-etc. That operator has Infinity as the default number of concurrently opened subscriptions, which means that by default it behaves like the java concatEager operator. However, if we passed a 1 as the argument, then it would behave exactly like concatMap... Meaning that the operator enables the behavior that @JosephSilber is looking for.

@cartant
Copy link
Collaborator

cartant commented Jun 28, 2020

Closing this because there is now a user-land equivalent of RxJava's concatMapEager in rxjs-etc. Developers wanting this functionality can either use rxjs-etc as a dependency or can copy the operator therein - and the MIT license - and paste it into their own projects (the operator is relatively small).

ATM, inclusion of an operator like concatMapEager into the core is not under consideration.

@cartant cartant closed this as completed Jun 28, 2020
@benlesh benlesh reopened this Aug 12, 2020
@benlesh
Copy link
Member

benlesh commented Aug 12, 2020

Core Team Meeting: This has been asked for quite a few times. Although collateMap might be a better name?

@zakhenry
Copy link
Contributor

zakhenry commented Aug 21, 2020

Should this be a new operator though? or a variant of concatMap/concatAll ? It seems to have the same semantic as the concurrent?: number argument of mergeMap.

I guess there might be some confusion as the concatMap/concatAll would need to have a default concurrency value of 1, in order to be a non-breaking change (and a sensible default for many use cases). This would be in contrast with the Infinity default of mergeMap/mergeAll.

I cannot find it unfortunately, but there was discussion last year about restructuring the implementation of these operators to a single operator with concurrency, switching and ordering capabilities, and have the concatMap/concatAll/mergeMap/mergeAll/switchMap/switchAll derive from that one core concept. Hopefully someone else in this discussion recalls it and can provide a link as it seemed like a sound strategy.

@cartant
Copy link
Collaborator

cartant commented Aug 21, 2020

@zakhenry @benlesh

Should this be a new operator though?

Yeah, I've changed my mind on this. IMO, the OP's initial suggestion is the best solution. concatMap et al. should accept a concurrency. An implementation of collate is going to need a concurrency parameter to manage the unbounded buffer of values, so it might as well be added to concat et al.

@zakhenry
Copy link
Contributor

Yea agreed whatever it ends up as, it should have restrictive concurrency capabilities - my actual use case is with webworkers, I want the inner subscriptions to all be run in parallel up to a maximum number (thread count), but have the combination of the results of those workers to be in the original order that the data was passed to them.

With the currently implementation of concatAll, only one worker is used at a time, which doesn't help with performance :)

@zakhenry
Copy link
Contributor

zakhenry commented Mar 23, 2021

@benlesh with mergeInternal now in v7 beta, is it in a good state to be extended to refactor concatMap & concatAll take a concurrency parameter?

I'd be happy to have a crack at it and raise a PR if there is appetite to extend the API.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants