Skip to content
This repository has been archived by the owner on Mar 26, 2023. It is now read-only.
/ rio Public archive

Commit

Permalink
Added optional subscriber executor service
Browse files Browse the repository at this point in the history
Closes #34 - added optional executor service for subscriber of
read channel publisher, and other implementers. Refactored by moving
IO exec from parameter to field, using fork join pool as a default
instead of cached thread pool for enabling job stealing.
  • Loading branch information
g4s8 committed Feb 21, 2021
1 parent 6f7d735 commit c4d9d22
Show file tree
Hide file tree
Showing 10 changed files with 180 additions and 55 deletions.
14 changes: 2 additions & 12 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ OTHER DEALINGS IN THE SOFTWARE.
<parent>
<groupId>com.artipie</groupId>
<artifactId>ppom</artifactId>
<version>0.3.9</version>
<version>0.4.5</version>
</parent>
<url>https://github.com/cqfn/rio</url>
<licenses>
Expand All @@ -40,16 +40,6 @@ OTHER DEALINGS IN THE SOFTWARE.
<url>https://github.com/cqfn/rio/blob/master/LICENSE.txt</url>
</license>
</licenses>
<repositories>
<repository>
<name>Artipie central</name>
<id>central.artipie.com</id>
<url>https://central.artipie.com/cqfn/maven</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<developers>
<developer>
<id>g4s8</id>
Expand Down Expand Up @@ -106,7 +96,7 @@ OTHER DEALINGS IN THE SOFTWARE.
<dependency>
<groupId>com.artipie</groupId>
<artifactId>asto</artifactId>
<version>0.29</version>
<version>0.33.7</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ OTHER DEALINGS IN THE SOFTWARE.
<limit>
<counter>BRANCH</counter>
<value>COVEREDRATIO</value>
<minimum>0.58</minimum>
<minimum>0.55</minimum>
</limit>
<limit>
<counter>COMPLEXITY</counter>
Expand Down
98 changes: 98 additions & 0 deletions src/main/java/org/cqfn/rio/channel/AsyncSubscriber.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* MIT License
*
* Copyright (c) 2020 cqfn.org
*
* Permission is hereby granted, free of charge, to any person obtaining
* a copy of this software and associated documentation files
* (the "Software"), to deal in the Software without restriction,
* including without limitation the rights * to use, copy, modify,
* merge, publish, distribute, sublicense, and/or sell copies of the Software,
* and to permit persons to whom the Software is furnished to do so,
* subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE,
* ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
* OTHER DEALINGS IN THE SOFTWARE.
*/
package org.cqfn.rio.channel;

import java.util.concurrent.ExecutorService;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;

/**
* Async subscriber delegates all events to origin subscriber using
* executor service.
* @param <T> Subscriber target type
* @since 0.2
*/
final class AsyncSubscriber<T> implements Subscriber<T> {

/**
* Origin subscriber.
*/
private final Subscriber<T> origin;

/**
* Executor service.
*/
private final ExecutorService execs;

/**
* Decorates subscriber.
* @param origin Subscriber to decorate
* @param exec Executor service
*/
AsyncSubscriber(final Subscriber<T> origin, final ExecutorService exec) {
this.origin = origin;
this.execs = exec;
}

@Override
public void onSubscribe(final Subscription subscription) {
this.exec(() -> this.origin.onSubscribe(subscription), false);
}

@Override
public void onNext(final T next) {
this.exec(() -> this.origin.onNext(next), false);
}

@Override
public void onError(final Throwable err) {
this.exec(() -> this.origin.onError(err), true);
}

@Override
public void onComplete() {
this.exec(() -> this.origin.onComplete(), true);
}

/**
* Execute with a service.
* @param task To execute
* @param close To shutdown a service after exec
*/
private void exec(final Runnable task, final boolean close) {
if (this.execs == null) {
task.run();
} else {
this.execs.submit(
() -> {
task.run();
if (close) {
this.execs.shutdown();
}
}
);
}
}
}
35 changes: 33 additions & 2 deletions src/main/java/org/cqfn/rio/channel/ReadableChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.nio.ByteBuffer;
import java.nio.channels.ReadableByteChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import org.cqfn.rio.Buffers;
import org.reactivestreams.Publisher;

Expand All @@ -37,27 +38,57 @@
*/
public final class ReadableChannel {

/**
* Default IO executor service.
*/
static final ExecutorService DEFAULT_IO = new ForkJoinPool();

/**
* Source channel.
*/
private final ChannelSource<? extends ReadableByteChannel> chan;

/**
* IO exec.
*/
private final ExecutorService ioexec;

/**
* Extends channel with publisher providers methods.
* @param chan Source channel
*/
public ReadableChannel(final ChannelSource<? extends ReadableByteChannel> chan) {
this(chan, ReadableChannel.DEFAULT_IO);
}

/**
* Extends channel with publisher providers methods.
* @param chan Source channel
* @param exec IO executor service
*/
public ReadableChannel(final ChannelSource<? extends ReadableByteChannel> chan,
final ExecutorService exec) {
this.chan = chan;
this.ioexec = exec;
}

/**
* Read channel reactively as a publisher.
* @param buf Buffer allocation strategy
* @param exec Executor service
* @param exec Executor service for subscriber, use same thread if null
* @return Publisher of byte buffers
*/
public Publisher<ByteBuffer> read(final Buffers buf, final ExecutorService exec) {
return new ReadableChannelPublisher(this.chan, buf, exec);
return new ReadableChannelPublisher(this.chan, buf, this.ioexec, exec);
}

/**
* Read channel reactively as a publisher.
* @param buf Buffer allocation strategy
* @return Publisher of byte buffers
*/
public Publisher<ByteBuffer> read(final Buffers buf) {
return new ReadableChannelPublisher(this.chan, buf, this.ioexec, null);
}
}

19 changes: 15 additions & 4 deletions src/main/java/org/cqfn/rio/channel/ReadableChannelPublisher.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
/**
* File read flow publisher.
* @since 0.2
* @checkstyle ParameterNumberCheck (500 lines)
*/
final class ReadableChannelPublisher implements Publisher<ByteBuffer> {

Expand Down Expand Up @@ -70,28 +71,38 @@ public void cancel() {
*/
private final ExecutorService exec;

/**
* Executor service for subscribers.
*/
private final ExecutorService subex;

/**
* Ctor.
* @param src Channel
* @param buffers Buffers allocation strategy
* @param exec Executor service for IO operations
* @param subex Executor service of subscriber
*/
ReadableChannelPublisher(final ReadableByteChannel src,
final Buffers buffers, final ExecutorService exec) {
this(() -> src, buffers, exec);
final Buffers buffers, final ExecutorService exec,
final ExecutorService subex) {
this(() -> src, buffers, exec, subex);
}

/**
* Ctor.
* @param src Source of channel
* @param buffers Buffers allocation strategy
* @param exec Executor service for IO operations
* @param subex Executor service of subscriber
*/
ReadableChannelPublisher(final ChannelSource<? extends ReadableByteChannel> src,
final Buffers buffers, final ExecutorService exec) {
final Buffers buffers, final ExecutorService exec,
final ExecutorService subex) {
this.src = src;
this.buffers = buffers;
this.exec = exec;
this.subex = subex;
}

@Override
Expand All @@ -106,7 +117,7 @@ public void subscribe(final Subscriber<? super ByteBuffer> subscriber) {
return;
}
final ReadSubscriberState<? super ByteBuffer> wrap =
new ReadSubscriberState<>(subscriber);
new ReadSubscriberState<>(new AsyncSubscriber<>(subscriber, this.subex));
wrap.onSubscribe(
new ReadSubscription(
wrap, this.buffers,
Expand Down
23 changes: 19 additions & 4 deletions src/main/java/org/cqfn/rio/channel/WritableChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,24 +43,39 @@ public final class WritableChannel {
*/
private final ChannelSource<? extends WritableByteChannel> src;

/**
* ExecutorService.
*/
private final ExecutorService exec;

/**
* Extend writable channel with methods to accept reactive publishers.
* @param src Writable channel source
*/
public WritableChannel(final ChannelSource<? extends WritableByteChannel> src) {
this(src, ReadableChannel.DEFAULT_IO);
}

/**
* Extend writable channel with methods to accept reactive publishers.
* @param src Writable channel source
* @param exec IO executor service
*/
public WritableChannel(final ChannelSource<? extends WritableByteChannel> src,
final ExecutorService exec) {
this.src = src;
this.exec = exec;
}

/**
* Write data from publisher into the channel.
* @param data Source
* @param greed Of data consumer
* @param exec Executor service
* @return Completable future for write operation and cancellation support
*/
public CompletionStage<Void> write(final Publisher<ByteBuffer> data,
final WriteGreed greed, final ExecutorService exec) {
final WritableChannelSubscriber sub = new WritableChannelSubscriber(this.src, greed, exec);
public CompletionStage<Void> write(final Publisher<ByteBuffer> data, final WriteGreed greed) {
final WritableChannelSubscriber sub =
new WritableChannelSubscriber(this.src, greed, this.exec);
sub.acceptAsync(data);
return sub;
}
Expand Down
29 changes: 5 additions & 24 deletions src/main/java/org/cqfn/rio/file/File.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import java.nio.file.StandardOpenOption;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.cqfn.rio.Buffers;
import org.cqfn.rio.WriteGreed;
import org.cqfn.rio.channel.ReadableChannel;
Expand All @@ -44,11 +43,6 @@
*/
public final class File {

/**
* Default executor.
*/
private static final ExecutorService EXEC_DEFAULT = Executors.newCachedThreadPool();

/**
* File path.
*/
Expand Down Expand Up @@ -76,7 +70,7 @@ public Publisher<ByteBuffer> content() {
* @return Content publisher
*/
public Publisher<ByteBuffer> content(final Buffers buf) {
return this.content(buf, File.EXEC_DEFAULT);
return this.content(buf, null);
}

/**
Expand All @@ -91,7 +85,7 @@ public Publisher<ByteBuffer> content(final ExecutorService exec) {
/**
* File's content.
* @param buf Buffers policy
* @param exec Executor service to perform IO operations
* @param exec Executor service to listen subscriber
* @return Content publisher
*/
public Publisher<ByteBuffer> content(final Buffers buf, final ExecutorService exec) {
Expand Down Expand Up @@ -120,35 +114,22 @@ public CompletionStage<Void> write(final Publisher<ByteBuffer> data, final OpenO
public CompletionStage<Void> write(final Publisher<ByteBuffer> data,
final ExecutorService exec,
final OpenOption... opts) {
return this.write(data, WriteGreed.SYSTEM, exec, opts);
}

/**
* Write data to file.
* @param data Data publisher
* @param greed Greed level of consumer
* @param opts Options
* @return Future
*/
public CompletionStage<Void> write(final Publisher<ByteBuffer> data, final WriteGreed greed,
final OpenOption... opts) {
return this.write(data, greed, File.EXEC_DEFAULT, opts);
return this.write(data, WriteGreed.SYSTEM, opts);
}

/**
* Write data to file.
* @param data Data publisher
* @param greed Greed level of consumer
* @param exec Executor service to perform IO operations
* @param opts Options
* @return Future
* @checkstyle ParameterNumberCheck (7 lines)
*/
@SuppressWarnings("PMD.OnlyOneReturn")
public CompletionStage<Void> write(final Publisher<ByteBuffer> data,
final WriteGreed greed, final ExecutorService exec, final OpenOption... opts) {
final WriteGreed greed, final OpenOption... opts) {
return new WritableChannel(() -> FileChannel.open(this.path, writeOpts(opts)))
.write(data, greed, exec);
.write(data, greed);
}

/**
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/org/cqfn/rio/stream/ReactiveInputStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ public ReactiveInputStream(final InputStream src) {
* @return Publisher of bute buffers
*/
public Publisher<ByteBuffer> read(final Buffers buf, final ExecutorService exec) {
return new ReadableChannel(() -> Channels.newChannel(this.src))
.read(buf, exec);
return new ReadableChannel(() -> Channels.newChannel(this.src)).read(buf, exec);
}
}
Loading

0 comments on commit c4d9d22

Please sign in to comment.