Skip to content

Commit

Permalink
Merge pull request #1551 from smallrye/style/smallfixes-pre-2.6
Browse files Browse the repository at this point in the history
Style fixes before 2.6
  • Loading branch information
jponge authored Mar 18, 2024
2 parents e839a59 + 3cc9ac1 commit 01abb16
Show file tree
Hide file tree
Showing 9 changed files with 22 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public static <T> T[] doesNotContainNull(T[] array, String name) {
*/
public static <T extends Collection<?>> T isNotEmpty(T collection, String name) {
nonNull(collection, name);
if (collection.size() == 0) {
if (collection.isEmpty()) {
throw new IllegalArgumentException(String.format("`%s` must not be empty", name));
}
return collection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,16 +286,6 @@ public void onItem(T item) {
index = i + 1;
}

@Override
public void onFailure(Throwable t) {
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
if (subscription != CANCELLED) {
downstream.onFailure(t);
} else {
Infrastructure.handleDroppedException(t);
}
}

@Override
public void onCompletion() {
Subscription subscription = getAndSetUpstreamSubscription(CANCELLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import io.smallrye.mutiny.Context;
import io.smallrye.mutiny.Multi;
import io.smallrye.mutiny.helpers.Subscriptions;
import io.smallrye.mutiny.subscription.ContextSupport;
import io.smallrye.mutiny.subscription.DemandPacer;
import io.smallrye.mutiny.subscription.MultiSubscriber;

Expand Down Expand Up @@ -45,15 +43,6 @@ private static class MultiSubscriptionPacerProcessor<T> extends MultiOperatorPro
this.pacer = pacer;
}

@Override
public Context context() {
if (downstream instanceof ContextSupport) {
return ((ContextSupport) downstream).context();
} else {
return Context.empty();
}
}

private void demandAndSchedule(ScheduledExecutorService executor) {
if (upstream == Subscriptions.CANCELLED) {
return;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package io.smallrye.mutiny.operators.multi;

import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Predicate;

Expand Down Expand Up @@ -58,11 +57,6 @@ public RepeatProcessor(Multi<? extends T> upstream, MultiSubscriber<? super T> d
this.delay = delay;
}

@Override
public void onSubscribe(Subscription s) {
setOrSwitchUpstream(s);
}

/**
* Subscribes to the source again via trampolining.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public MultiZipOp(Iterable<? extends Publisher<?>> upstreams,

@Override
public void subscribe(MultiSubscriber<? super O> downstream) {
if (upstreams.size() == 0) {
if (upstreams.isEmpty()) {
Subscriptions.complete(downstream);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public void testEmptyQueue() {
assertThat(queue.contains(1)).isFalse();
assertThat(queue.iterator().hasNext()).isFalse();
assertThatThrownBy(queue::element).isInstanceOf(NoSuchElementException.class);
assertThatThrownBy(() -> queue.remove()).isInstanceOf(NoSuchElementException.class);
assertThatThrownBy(queue::remove).isInstanceOf(NoSuchElementException.class);
assertThat(queue.remove(1)).isFalse();
assertThat(queue.containsAll(values)).isFalse();
assertThat(queue.retainAll(values)).isFalse();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public void testAsListsWithDuration() {

@Test
public void testAsListsWithDurationWithNoItems() {
MultiOnCancellationSpy<Long> spy = Spy.onCancellation(Multi.createFrom().<Long> nothing());
MultiOnCancellationSpy<Long> spy = Spy.onCancellation(Multi.createFrom().nothing());
AssertSubscriber<List<Long>> subscriber = spy
.group().intoLists().every(Duration.ofMillis(100), true)
.subscribe().withSubscriber(AssertSubscriber.create(100));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,18 @@
import static org.assertj.core.api.Assertions.fail;

import java.io.IOException;
import java.util.concurrent.*;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.atomic.AtomicInteger;

import org.junit.jupiter.api.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;

import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.subscribers.TestSubscriber;
Expand Down Expand Up @@ -144,16 +151,9 @@ public void testCancellationBetweenSubscriptionAndRequest() {

@Test
public void testCancellationBetweenRequestAndValue() {
// TODO This is a very broken implementation of "delay" - to be replace once delay is implemented
executor = Executors.newFixedThreadPool(1);
Publisher<Integer> publisher = Uni.createFrom().item(1).emitOn(executor).map(x -> {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return x;
}).convert().toPublisher();
Publisher<Integer> publisher = Uni.createFrom().item(1)
.onItem().delayIt().by(Duration.ofMillis(100))
.convert().toPublisher();

assertThat(publisher).isNotNull();
TestSubscriber<Integer> test = Flowable.fromPublisher(AdaptersToReactiveStreams.publisher(publisher)).test(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ public void verifyOnNextOnErrorThreadSafety() {
.assertSubscribed()
.assertTerminated();

if (subscriber.getItems().size() != 0) {
if (!subscriber.getItems().isEmpty()) {
assertThat(subscriber.getItems()).containsExactly(1);
} else {
assertThat(subscriber.getFailure()).isEqualTo(failure);
Expand All @@ -201,7 +201,7 @@ public void verifyOnNextOnCompleteThreadSafety() {

subscriber.awaitSubscription().awaitCompletion();

if (subscriber.getItems().size() != 0) {
if (!subscriber.getItems().isEmpty()) {
assertThat(subscriber.getItems()).containsExactly(1);
}
}
Expand All @@ -226,7 +226,7 @@ public void verifyOnSubscribeOnCompleteThreadSafety() {

subscriber.awaitSubscription().awaitCompletion();

if (subscriber.getItems().size() != 0) {
if (!subscriber.getItems().isEmpty()) {
assertThat(subscriber.getItems()).containsExactly(1);
}
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public void verifyOnFailureOnCompleteThreadSafety() {

await().untilAsserted(subscriber::assertTerminated);

if (subscriber.getItems().size() != 0) {
if (!subscriber.getItems().isEmpty()) {
assertThat(subscriber.getItems()).containsExactly(1);
}
}
Expand Down Expand Up @@ -334,7 +334,7 @@ public void testRaceBetweenOnNextAndOnComplete() {

subscriber.awaitSubscription().awaitCompletion();

if (subscriber.getItems().size() != 0) {
if (!subscriber.getItems().isEmpty()) {
assertThat(subscriber.getItems()).contains(1);
}

Expand Down Expand Up @@ -363,7 +363,7 @@ public void testRaceBetweenOnNextAndOnSubscribe() {

subscriber.awaitSubscription().awaitCompletion();

if (subscriber.getItems().size() != 0) {
if (!subscriber.getItems().isEmpty()) {
assertThat(subscriber.getItems()).containsExactly(1, 2, 3, 4, 5);
}
}
Expand Down

0 comments on commit 01abb16

Please sign in to comment.