Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multi defaultIfEmpty #3592

Merged
merged 1 commit into from
Nov 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,17 @@ default Multi<T> defaultIfEmpty(T defaultItem) {
return new MultiDefaultIfEmpty<>(this, defaultItem);
}

/**
* Signals the default item supplied by specified supplier if the upstream is empty.
* @param supplier of the default value
* @return Multi
* @throws NullPointerException if {@code supplier} is {@code null}
*/
default Multi<T> defaultIfEmpty(Supplier<? extends T> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return new MultiDeferredDefaultIfEmpty<>(this, supplier);
}

/**
* Filter out all duplicates.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Signal an item if the source is empty.
* @param <T> the element type
*/
final class MultiDeferredDefaultIfEmpty<T> implements Multi<T> {

private final Multi<T> source;

private final Supplier<? extends T> defaultItemSupplier;

MultiDeferredDefaultIfEmpty(Multi<T> source, Supplier<? extends T> defaultItem) {
this.source = source;
this.defaultItemSupplier = defaultItem;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
source.subscribe(new DefaultIfEmptySubscriber<>(subscriber, defaultItemSupplier));
}

static final class DefaultIfEmptySubscriber<T>
implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;

private final Supplier<? extends T> defaultItemSupplier;

private final AtomicLong requested;

private final AtomicReference<Flow.Subscription> fallback;

private Flow.Subscription upstream;

private boolean nonEmpty;

DefaultIfEmptySubscriber(Flow.Subscriber<? super T> downstream, Supplier<? extends T> defaultItem) {
this.downstream = downstream;
this.defaultItemSupplier = defaultItem;
this.requested = new AtomicLong();
this.fallback = new AtomicReference<>();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
nonEmpty = true;
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
public void onComplete() {
if (nonEmpty) {
downstream.onComplete();
} else {
SubscriptionHelper.deferredSetOnce(fallback, requested,
new SingleDeferredSubscription<>(defaultItemSupplier, downstream));
}
}

@Override
public void request(long n) {
upstream.request(n);
SubscriptionHelper.deferredRequest(fallback, requested, n);
}

@Override
public void cancel() {
upstream.cancel();
SubscriptionHelper.cancel(fallback);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,18 @@ static <T> Single<T> just(T item) {
return new SingleJust<>(item);
}

/**
* Create a {@link Single} instance that publishes result of the given supplier to its subscriber(s).
*
* @param <T> item type
* @param supplier item supplier to publish
* @return Single
* @throws NullPointerException if item is {@code null}
*/
static <T> Single<T> create(Supplier<? extends T> supplier) {
return new SingleDeferredJust<>(supplier);
}

/**
* Get a {@link Single} instance that never completes.
*
Expand Down Expand Up @@ -296,6 +308,17 @@ default Single<T> defaultIfEmpty(T defaultItem) {
return new SingleDefaultIfEmpty<>(this, defaultItem);
}

/**
* Signals the default item supplied by specified supplier if the upstream is empty.
* @param supplier of the default value
* @return Multi
* @throws NullPointerException if {@code supplier} is {@code null}
*/
default Single<T> defaultIfEmpty(Supplier<? extends T> supplier) {
Objects.requireNonNull(supplier, "supplier is null");
return new SingleDeferredDefaultIfEmpty<>(this, supplier);
}

/**
* Map this {@link Single} instance to a publisher using the given {@link Mapper}.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package io.helidon.common.reactive;

import java.util.concurrent.Flow;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

/**
* Signal an item if the source is empty.
* @param <T> the element type
*/
final class SingleDeferredDefaultIfEmpty<T> extends CompletionSingle<T> {

private final Single<T> source;

private final Supplier<? extends T> defaultItem;

SingleDeferredDefaultIfEmpty(Single<T> source, Supplier<? extends T> defaultItem) {
this.source = source;
this.defaultItem = defaultItem;
}

@Override
public void subscribe(Flow.Subscriber<? super T> subscriber) {
source.subscribe(new DefaultIfEmptySubscriber<>(subscriber, defaultItem));
}

static final class DefaultIfEmptySubscriber<T> implements Flow.Subscriber<T>, Flow.Subscription {

private final Flow.Subscriber<? super T> downstream;

private final Supplier<? extends T> defaultItemSupplier;

private final AtomicLong requested;

private final AtomicReference<Flow.Subscription> fallback;

private Flow.Subscription upstream;

DefaultIfEmptySubscriber(Flow.Subscriber<? super T> downstream, Supplier<? extends T> defaultItem) {
this.downstream = downstream;
this.defaultItemSupplier = defaultItem;
this.requested = new AtomicLong();
this.fallback = new AtomicReference<>();
}

@Override
public void onSubscribe(Flow.Subscription subscription) {
SubscriptionHelper.validate(upstream, subscription);
upstream = subscription;
downstream.onSubscribe(this);
}

@Override
public void onNext(T item) {
upstream = SubscriptionHelper.CANCELED;
downstream.onNext(item);
}

@Override
public void onError(Throwable throwable) {
downstream.onError(throwable);
}

@Override
public void onComplete() {
if (upstream == SubscriptionHelper.CANCELED) {
downstream.onComplete();
} else {
SubscriptionHelper.deferredSetOnce(fallback, requested,
new SingleDeferredSubscription<>(defaultItemSupplier, downstream));
}
}

@Override
public void request(long n) {
upstream.request(n);
SubscriptionHelper.deferredRequest(fallback, requested, n);
}

@Override
public void cancel() {
upstream.cancel();
SubscriptionHelper.cancel(fallback);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

import java.util.Objects;
import java.util.concurrent.Flow.Subscriber;
import java.util.function.Supplier;

/**
* Implementation of {@link Single} that represents a non {@code null} value.
*
* @param <T> item type
*/
final class SingleDeferredJust<T> extends CompletionSingle<T> {

private final Supplier<? extends T> supplier;

SingleDeferredJust(Supplier<? extends T> supplier) {
this.supplier = Objects.requireNonNull(supplier, "supplier cannot be null!");
}

@Override
public void subscribe(Subscriber<? super T> subscriber) {
subscriber.onSubscribe(new SingleDeferredSubscription<>(supplier, subscriber));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Copyright (c) 2021 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.common.reactive;

import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

/**
* Single item subscription.
*
* {@code this} represents the current state changed atomically upon interacting with the Subscription interface.
*/
class SingleDeferredSubscription<T> extends AtomicInteger implements Subscription {

private final Supplier<? extends T> value;
private final Subscriber<? super T> subscriber;

static final int FRESH = 0;
static final int REQUESTED = 1;
static final int COMPLETED = 2;
static final int CANCELED = 3;
static final int ERRORED = 4;

SingleDeferredSubscription(Supplier<? extends T> valueSupplier, Subscriber<? super T> subscriber) {
super(FRESH);
this.value = valueSupplier;
this.subscriber = subscriber;
}

@Override
public void request(long n) {
if (n <= 0L) {
cancel();
subscriber.onError(new IllegalArgumentException("Rule §3.9 violated: non-positive requests are forbidden."));
} else {
if (compareAndSet(FRESH, REQUESTED)) {
try {
subscriber.onNext(value.get());
} catch (Throwable t) {
set(ERRORED);
subscriber.onError(t);
}
if (compareAndSet(REQUESTED, COMPLETED)) {
subscriber.onComplete();
}
}
}
}

@Override
public void cancel() {
set(CANCELED);
}
}
Loading