Skip to content

Commit

Permalink
Multi defaultIfEmpty (#3592)
Browse files Browse the repository at this point in the history
Signed-off-by: Daniel Kec <daniel.kec@oracle.com>
  • Loading branch information
danielkec authored Nov 3, 2021
1 parent 3b95350 commit dd06bad
Show file tree
Hide file tree
Showing 15 changed files with 808 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,17 @@ default Multi<T> defaultIfEmpty(T defaultItem) {
return new MultiDefaultIfEmpty<>(this, defaultItem);
}

/**
* Signals the default item supplied by specified supplier if the upstream is empty.
* @param supplier of the default value
* @return Multi
* @throws NullPointerException if {@code supplier} is {@code null}
*/
default Multi<T> defaultIfEmpty(Supplier<? extends T> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return new MultiDeferredDefaultIfEmpty<>(this, supplier);
}

/**
* Filter out all duplicates.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Signal an item if the source is empty.
* @param <T> the element type
*/
final class MultiDeferredDefaultIfEmpty<T> implements Multi<T> {

private final Multi<T> source;

private final Supplier<? extends T> defaultItemSupplier;

MultiDeferredDefaultIfEmpty(Multi<T> source, Supplier<? extends T> defaultItem) {
this.source = source;
this.defaultItemSupplier = defaultItem;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
source.subscribe(new DefaultIfEmptySubscriber<>(subscriber, defaultItemSupplier));
}

static final class DefaultIfEmptySubscriber<T>
implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;

private final Supplier<? extends T> defaultItemSupplier;

private final AtomicLong requested;

private final AtomicReference<Flow.Subscription> fallback;

private Flow.Subscription upstream;

private boolean nonEmpty;

DefaultIfEmptySubscriber(Flow.Subscriber<? super T> downstream, Supplier<? extends T> defaultItem) {
this.downstream = downstream;
this.defaultItemSupplier = defaultItem;
this.requested = new AtomicLong();
this.fallback = new AtomicReference<>();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
nonEmpty = true;
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
public void onComplete() {
if (nonEmpty) {
downstream.onComplete();
} else {
SubscriptionHelper.deferredSetOnce(fallback, requested,
new SingleDeferredSubscription<>(defaultItemSupplier, downstream));
}
}

@Override
public void request(long n) {
upstream.request(n);
SubscriptionHelper.deferredRequest(fallback, requested, n);
}

@Override
public void cancel() {
upstream.cancel();
SubscriptionHelper.cancel(fallback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ static <T> Single<T> just(T item) {
return new SingleJust<>(item);
}

/**
* Create a {@link Single} instance that publishes result of the given supplier to its subscriber(s).
*
* @param <T> item type
* @param supplier item supplier to publish
* @return Single
* @throws NullPointerException if item is {@code null}
*/
static <T> Single<T> create(Supplier<? extends T> supplier) {
return new SingleDeferredJust<>(supplier);
}

/**
* Get a {@link Single} instance that never completes.
*
Expand Down Expand Up @@ -296,6 +308,17 @@ default Single<T> defaultIfEmpty(T defaultItem) {
return new SingleDefaultIfEmpty<>(this, defaultItem);
}

/**
* Signals the default item supplied by specified supplier if the upstream is empty.
* @param supplier of the default value
* @return Multi
* @throws NullPointerException if {@code supplier} is {@code null}
*/
default Single<T> defaultIfEmpty(Supplier<? extends T> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return new SingleDeferredDefaultIfEmpty<>(this, supplier);
}

/**
* Map this {@link Single} instance to a publisher using the given {@link Mapper}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Signal an item if the source is empty.
* @param <T> the element type
*/
final class SingleDeferredDefaultIfEmpty<T> extends CompletionSingle<T> {

private final Single<T> source;

private final Supplier<? extends T> defaultItem;

SingleDeferredDefaultIfEmpty(Single<T> source, Supplier<? extends T> defaultItem) {
this.source = source;
this.defaultItem = defaultItem;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
source.subscribe(new DefaultIfEmptySubscriber<>(subscriber, defaultItem));
}

static final class DefaultIfEmptySubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;

private final Supplier<? extends T> defaultItemSupplier;

private final AtomicLong requested;

private final AtomicReference<Flow.Subscription> fallback;

private Flow.Subscription upstream;

DefaultIfEmptySubscriber(Flow.Subscriber<? super T> downstream, Supplier<? extends T> defaultItem) {
this.downstream = downstream;
this.defaultItemSupplier = defaultItem;
this.requested = new AtomicLong();
this.fallback = new AtomicReference<>();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
upstream = SubscriptionHelper.CANCELED;
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
public void onComplete() {
if (upstream == SubscriptionHelper.CANCELED) {
downstream.onComplete();
} else {
SubscriptionHelper.deferredSetOnce(fallback, requested,
new SingleDeferredSubscription<>(defaultItemSupplier, downstream));
}
}

@Override
public void request(long n) {
upstream.request(n);
SubscriptionHelper.deferredRequest(fallback, requested, n);
}

@Override
public void cancel() {
upstream.cancel();
SubscriptionHelper.cancel(fallback);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow.Subscriber;
import java.util.function.Supplier;

/**
* Implementation of {@link Single} that represents a non {@code null} value.
*
* @param <T> item type
*/
final class SingleDeferredJust<T> extends CompletionSingle<T> {

private final Supplier<? extends T> supplier;

SingleDeferredJust(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier, "supplier cannot be null!");
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new SingleDeferredSubscription<>(supplier, subscriber));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
* Single item subscription.
*
* {@code this} represents the current state changed atomically upon interacting with the Subscription interface.
*/
class SingleDeferredSubscription<T> extends AtomicInteger implements Subscription {

private final Supplier<? extends T> value;
private final Subscriber<? super T> subscriber;

static final int FRESH = 0;
static final int REQUESTED = 1;
static final int COMPLETED = 2;
static final int CANCELED = 3;
static final int ERRORED = 4;

SingleDeferredSubscription(Supplier<? extends T> valueSupplier, Subscriber<? super T> subscriber) {
super(FRESH);
this.value = valueSupplier;
this.subscriber = subscriber;
}

@Override
public void request(long n) {
if (n <= 0L) {
cancel();
subscriber.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden."));
} else {
if (compareAndSet(FRESH, REQUESTED)) {
try {
subscriber.onNext(value.get());
} catch (Throwable t) {
set(ERRORED);
subscriber.onError(t);
}
if (compareAndSet(REQUESTED, COMPLETED)) {
subscriber.onComplete();
}
}
}
}

@Override
public void cancel() {
set(CANCELED);
}
}
Loading

0 comments on commit dd06bad

Please sign in to comment.