Skip to content
This repository has been archived by the owner on Mar 26, 2023. It is now read-only.
/ rio Public archive

Commit

Permalink
#27 - using asto rx-file as vertx provider
Browse files Browse the repository at this point in the history
  • Loading branch information
g4s8 committed Sep 28, 2020
1 parent 3ba0178 commit ccee546
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 97 deletions.
66 changes: 39 additions & 27 deletions benchmarks/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ T_RIO := org.cqfn.rio.bench.RioTarget
T_VTX := org.cqfn.rio.bench.VertxTarget
TEST_FILES = test.1000 test.10000 test.100000 test.1000000

.PHONY: all clean bench_dummy bench_read bench_copy bench_write bench_all

all: benchmarks

$(TARGET):
Expand All @@ -22,62 +24,72 @@ clean:
rm -vf test.*
mvn clean

define bench_dummy
define _bench_dummy
@$(J) $(TARGET) -p $(T_RIO) --size $(1) -c $(2) -w $(3)
@$(J) $(TARGET) -p $(T_VTX) --size $(1) -c $(2) -w $(3)
endef

define bench_read
define _bench_read
@$(J) $(TARGET) -p $(T_RIO) -s test.$(1) -c $(2) -w $(3)
@$(J) $(TARGET) -p $(T_VTX) -s test.$(1) -c $(2) -w $(3)
endef

define bench_copy
define _bench_copy
@$(J) $(TARGET) -p $(T_RIO) -s test.$(1) -d test.$(1).cpy.rio -c $(2) -w $(3)
@$(J) $(TARGET) -p $(T_VTX) -s test.$(1) -d test.$(1).cpy.vtx -c $(2) -w $(3)
endef

define bench_write
define _bench_write
@$(J) $(TARGET) -p $(T_RIO) --size $(1) -d test.$(1).write.rio -c $(2) -w $(3)
@$(J) $(TARGET) -p $(T_VTX) --size $(1) -d test.$(1).write.vtx -c $(2) -w $(3)
endef

benchmarks: $(TARGET) $(TEST_FILES)
bench_dummy: $(TARGET) $(TEST_FILES)
@echo "=============== Dummy benchmarks =============="
@echo "Dummy 1M"
$(call bench_dummy,1000,1000,100)
$(call _bench_dummy,1000,1000,100)
@echo "Dummy 10M"
$(call bench_dummy,10000,1000,100)
$(call _bench_dummy,10000,1000,100)
@echo "Dummy 100M"
$(call bench_dummy,100000,500,50)
$(call _bench_dummy,100000,500,50)
@echo "Dummy 1G"
$(call bench_dummy,1000000,100,10)
$(call _bench_dummy,1000000,100,10)

bench_read: $(TARGET) $(TEST_FILES)
@echo "=============== Read benchmarks ==============="
@echo "Read 1M"
$(call bench_read,1000,1000,100)
$(call _bench_read,1000,1000,100)
@echo "Read 10M"
$(call bench_read,10000,1000,100)
$(call _bench_read,10000,1000,100)
@echo "Read 100M"
$(call bench_read,100000,500,50)
$(call _bench_read,100000,500,50)
@echo "Read 1G"
$(call bench_read,1000000,100,10)
@echo "=============== Copy benchmarks ==============="
@echo "Copy 1M"
$(call bench_copy,1000,1000,100)
@echo "Copy 10M"
$(call bench_copy,10000,1000,100)
@echo "Copy 100M"
$(call bench_copy,100000,500,50)
@echo "Copy 1G"
$(call bench_copy,1000000,100,10)
$(call _bench_read,1000000,100,10)

bench_write: $(TARGET) $(TEST_FILES)
@echo "=============== Write benchmarks =============="
@echo "Write 1M"
$(call bench_write,1000,1000,100)
$(call _bench_write,1000,1000,100)
@echo "Wrte 10M"
$(call bench_write,10000,1000,100)
$(call _bench_write,10000,1000,100)
@echo "Write 100M"
$(call bench_write,100000,500,50)
$(call _bench_write,100000,500,50)
@echo "Write 1G"
$(call bench_write,1000000,100,10)
@echo "===================== End ====================="
$(call _bench_write,1000000,100,10)


bench_copy: $(TARGET) $(TEST_FILES)
@echo "=============== Copy benchmarks ==============="
@echo "Copy 1M"
$(call _bench_copy,1000,1000,100)
@echo "Copy 10M"
$(call _bench_copy,10000,1000,100)
@echo "Copy 100M"
$(call _bench_copy,100000,500,50)
@echo "Copy 1G"
$(call _bench_copy,1000000,100,10)

bench_all: bench_dummy bench_read bench_write bench_copy

benchmarks: bench_all

16 changes: 3 additions & 13 deletions benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,19 +104,9 @@ OTHER DEALINGS IN THE SOFTWARE.

<!-- io.vertx.vertx provider -->
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-reactive-streams</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<version>${vertx.version}</version>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
<version>${vertx.version}</version>
<groupId>com.artipie</groupId>
<artifactId>asto</artifactId>
<version>0.29</version>
</dependency>
<dependency>
<groupId>com.github.akarnokd</groupId>
Expand Down
63 changes: 6 additions & 57 deletions benchmarks/src/main/java/org/cqfn/rio/bench/VertxTarget.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package org.cqfn.rio.bench;

import com.artipie.asto.fs.VertxRxFile;
import hu.akarnokd.rxjava2.interop.CompletableInterop;
import io.reactivex.Completable;
import io.reactivex.Flowable;
import io.vertx.core.file.OpenOptions;
import io.vertx.reactivex.core.Promise;
import io.vertx.reactivex.core.Vertx;
import io.vertx.reactivex.core.buffer.Buffer;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
Expand All @@ -21,62 +18,14 @@ public final class VertxTarget implements BenchmarkTarget {

@Override
public Publisher<ByteBuffer> read(final Path path) {
return this.vertx.fileSystem()
.rxOpen(path.toString(), new OpenOptions().setRead(true))
.flatMapPublisher(
file -> {
final Promise<Void> promise = Promise.promise();
final Completable completable = Completable.create(
emitter ->
promise.future().onComplete(
event -> {
if (event.succeeded()) {
emitter.onComplete();
} else {
emitter.onError(event.cause());
}
}
)
);
return file.toFlowable().map(
buffer -> ByteBuffer.wrap(buffer.getBytes())
).doOnTerminate(() -> file.rxClose().subscribe(promise::complete))
.mergeWith(completable);
}
);
return new VertxRxFile(path, this.vertx).flow();
}

@Override
public CompletableFuture<?> write(final Path path, final Publisher<ByteBuffer> data) {
return this.vertx.fileSystem().rxOpen(
path.toString(),
new OpenOptions()
.setCreate(true)
.setWrite(true)
.setTruncateExisting(true)
)
.flatMapCompletable(
file -> Completable.create(
emitter -> Flowable.fromPublisher(data)
.map(buf -> Buffer.buffer(bytes(buf)))
.onErrorResumeNext(
thr -> {
return file.rxClose().andThen(Flowable.error(thr));
}
)
.subscribe(file.toSubscriber()
.onWriteStreamEnd(emitter::onComplete)
.onWriteStreamError(emitter::onError)
.onWriteStreamEndError(emitter::onError)
.onError(emitter::onError)
)
)
).to(CompletableInterop.await()).toCompletableFuture();
}

private static byte[] bytes(final ByteBuffer buf) {
final byte[] bytes = new byte[buf.remaining()];
buf.get(bytes);
return bytes;
return new VertxRxFile(path, this.vertx)
.save(Flowable.fromPublisher(data))
.to(CompletableInterop.await())
.toCompletableFuture();
}
}

0 comments on commit ccee546

Please sign in to comment.