diff --git a/libp2p/src/main/java/io/libp2p/core/dsl/HostBuilder.java b/libp2p/src/main/java/io/libp2p/core/dsl/HostBuilder.java index 2915b5b2..6eba6a22 100644 --- a/libp2p/src/main/java/io/libp2p/core/dsl/HostBuilder.java +++ b/libp2p/src/main/java/io/libp2p/core/dsl/HostBuilder.java @@ -62,6 +62,12 @@ public final HostBuilder listen(String... addresses) { return this; } + public final HostBuilder builderModifier(Consumer builderModifier) { + this.builderModifier = builderModifier; + return this; + } + + @SuppressWarnings("unchecked") public Host build() { return BuilderJKt.hostJ( defaultMode_.asBuilderDefault(), @@ -74,6 +80,7 @@ public Host build() { muxers_.forEach(m -> b.getMuxers().add(m.get())); b.getProtocols().addAll(protocols_); listenAddresses_.forEach(a -> b.getNetwork().listen(a)); + builderModifier.accept(b); }); } // build @@ -84,4 +91,5 @@ public Host build() { private List> muxers_ = new ArrayList<>(); private List> protocols_ = new ArrayList<>(); private List listenAddresses_ = new ArrayList<>(); + private Consumer builderModifier = b -> {}; } diff --git a/libp2p/src/main/kotlin/io/libp2p/core/dsl/Builders.kt b/libp2p/src/main/kotlin/io/libp2p/core/dsl/Builders.kt index 971d35b0..ce1416df 100644 --- a/libp2p/src/main/kotlin/io/libp2p/core/dsl/Builders.kt +++ b/libp2p/src/main/kotlin/io/libp2p/core/dsl/Builders.kt @@ -23,6 +23,7 @@ import io.libp2p.core.security.SecureChannel import io.libp2p.core.transport.Transport import io.libp2p.etc.types.lazyVar import io.libp2p.etc.types.toProtobuf +import io.libp2p.etc.util.netty.LoggingHandlerShort import io.libp2p.host.HostImpl import io.libp2p.host.MemoryAddressBook import io.libp2p.network.NetworkImpl @@ -273,6 +274,10 @@ class DebugHandlerBuilder(var name: String) { fun addLogger(level: LogLevel, loggerName: String = name) { addNettyHandler(LoggingHandler(loggerName, level)) } + + fun addCompactLogger(level: LogLevel, loggerName: String = name) { + addNettyHandler(LoggingHandlerShort(loggerName, level)) + } } open class Enumeration(val values: MutableList = mutableListOf()) : MutableList by values { diff --git a/libp2p/src/test/java/io/libp2p/core/HostTestJava.java b/libp2p/src/test/java/io/libp2p/core/HostTestJava.java index 1f943159..bd4f509e 100644 --- a/libp2p/src/test/java/io/libp2p/core/HostTestJava.java +++ b/libp2p/src/test/java/io/libp2p/core/HostTestJava.java @@ -7,10 +7,11 @@ import io.libp2p.core.dsl.HostBuilder; import io.libp2p.core.multiformats.Multiaddr; import io.libp2p.core.mux.StreamMuxerProtocol; -import io.libp2p.protocol.Ping; -import io.libp2p.protocol.PingController; +import io.libp2p.protocol.*; +import io.libp2p.security.noise.*; import io.libp2p.security.tls.*; import io.libp2p.transport.tcp.TcpTransport; +import io.netty.handler.logging.LogLevel; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -153,6 +154,72 @@ void largePing() throws Exception { System.out.println("Server stopped"); } + @Test + void largeBlob() throws Exception { + int blobSize = 1024 * 1024; + String localListenAddress = "/ip4/127.0.0.1/tcp/40002"; + + Host clientHost = + new HostBuilder() + .transport(TcpTransport::new) + .secureChannel(NoiseXXSecureChannel::new) + .muxer(StreamMuxerProtocol::getYamux) + .builderModifier( + b -> b.getDebug().getMuxFramesHandler().addCompactLogger(LogLevel.ERROR, "client")) + .build(); + + Host serverHost = + new HostBuilder() + .transport(TcpTransport::new) + .secureChannel(NoiseXXSecureChannel::new) + .muxer(StreamMuxerProtocol::getYamux) + .protocol(new Blob(blobSize)) + .listen(localListenAddress) + .builderModifier( + b -> b.getDebug().getMuxFramesHandler().addCompactLogger(LogLevel.ERROR, "server")) + .build(); + + CompletableFuture clientStarted = clientHost.start(); + CompletableFuture serverStarted = serverHost.start(); + clientStarted.get(5, TimeUnit.SECONDS); + System.out.println("Client started"); + serverStarted.get(5, TimeUnit.SECONDS); + System.out.println("Server started"); + + Assertions.assertEquals(0, clientHost.listenAddresses().size()); + Assertions.assertEquals(1, serverHost.listenAddresses().size()); + Assertions.assertEquals( + localListenAddress + "/p2p/" + serverHost.getPeerId(), + serverHost.listenAddresses().get(0).toString()); + + StreamPromise blob = + clientHost + .getNetwork() + .connect(serverHost.getPeerId(), new Multiaddr(localListenAddress)) + .thenApply(it -> it.muxerSession().createStream(new Blob(blobSize))) + .join(); + + Stream blobStream = blob.getStream().get(5, TimeUnit.SECONDS); + System.out.println("Blob stream created"); + BlobController blobCtr = blob.getController().get(5, TimeUnit.SECONDS); + System.out.println("Blob controller created"); + + for (int i = 0; i < 10; i++) { + long latency = blobCtr.blob().join(); + System.out.println("Blob round trip is " + latency); + } + blobStream.close().get(5, TimeUnit.SECONDS); + System.out.println("Blob stream closed"); + + Assertions.assertThrows( + ExecutionException.class, () -> blobCtr.blob().get(5, TimeUnit.SECONDS)); + + clientHost.stop().get(5, TimeUnit.SECONDS); + System.out.println("Client stopped"); + serverHost.stop().get(5, TimeUnit.SECONDS); + System.out.println("Server stopped"); + } + @Test void addPingAfterHostStart() throws Exception { String localListenAddress = "/ip4/127.0.0.1/tcp/40002"; diff --git a/libp2p/src/test/kotlin/io/libp2p/protocol/Blob.kt b/libp2p/src/test/kotlin/io/libp2p/protocol/Blob.kt new file mode 100644 index 00000000..a763ee60 --- /dev/null +++ b/libp2p/src/test/kotlin/io/libp2p/protocol/Blob.kt @@ -0,0 +1,144 @@ +package io.libp2p.protocol + +import io.libp2p.core.BadPeerException +import io.libp2p.core.ConnectionClosedException +import io.libp2p.core.Libp2pException +import io.libp2p.core.Stream +import io.libp2p.core.multistream.StrictProtocolBinding +import io.libp2p.etc.types.completedExceptionally +import io.libp2p.etc.types.lazyVar +import io.libp2p.etc.types.toByteArray +import io.libp2p.etc.types.toHex +import io.netty.buffer.ByteBuf +import io.netty.channel.ChannelHandlerContext +import io.netty.handler.codec.ByteToMessageCodec +import java.time.Duration +import java.util.Collections +import java.util.Random +import java.util.concurrent.CompletableFuture +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit + +interface BlobController { + fun blob(): CompletableFuture +} + +class Blob(blobSize: Int) : BlobBinding(BlobProtocol(blobSize)) + +open class BlobBinding(blob: BlobProtocol) : + StrictProtocolBinding("/ipfs/blob-echo/1.0.0", blob) + +class BlobTimeoutException : Libp2pException() + +open class BlobProtocol(var blobSize: Int) : ProtocolHandler(Long.MAX_VALUE, Long.MAX_VALUE) { + var timeoutScheduler by lazyVar { Executors.newSingleThreadScheduledExecutor() } + var curTime: () -> Long = { System.currentTimeMillis() } + var random = Random() + var blobTimeout = Duration.ofSeconds(10) + + override fun onStartInitiator(stream: Stream): CompletableFuture { + val handler = BlobInitiator() + stream.pushHandler(BlobCodec()) + stream.pushHandler(handler) + stream.pushHandler(BlobCodec()) + return handler.activeFuture + } + + override fun onStartResponder(stream: Stream): CompletableFuture { + val handler = BlobResponder() + stream.pushHandler(BlobCodec()) + stream.pushHandler(BlobResponder()) + stream.pushHandler(BlobCodec()) + return CompletableFuture.completedFuture(handler) + } + + open class BlobCodec : ByteToMessageCodec() { + override fun encode(ctx: ChannelHandlerContext?, msg: ByteArray, out: ByteBuf) { + println("Codec::encode") + out.writeInt(msg.size) + out.writeBytes(msg) + } + + override fun decode(ctx: ChannelHandlerContext?, msg: ByteBuf, out: MutableList) { + println("Codec::decode " + msg.readableBytes()) + val readerIndex = msg.readerIndex() + if (msg.readableBytes() < 4) { + return + } + val len = msg.readInt() + if (msg.readableBytes() < len) { + // not enough data to read the full array + // will wait for more ... + msg.readerIndex(readerIndex) + return + } + val data = msg.readSlice(len) + out.add(data.toByteArray()) + } + } + + open inner class BlobResponder : ProtocolMessageHandler, BlobController { + override fun onMessage(stream: Stream, msg: ByteArray) { + println("Responder::onMessage") + stream.writeAndFlush(msg) + } + + override fun blob(): CompletableFuture { + throw Libp2pException("This is blob responder only") + } + } + + open inner class BlobInitiator : ProtocolMessageHandler, BlobController { + val activeFuture = CompletableFuture() + val requests = Collections.synchronizedMap(mutableMapOf>>()) + lateinit var stream: Stream + var closed = false + + override fun onActivated(stream: Stream) { + this.stream = stream + activeFuture.complete(this) + } + + override fun onMessage(stream: Stream, msg: ByteArray) { + println("Initiator::onMessage") + val dataS = msg.toHex() + val (sentT, future) = requests.remove(dataS) + ?: throw BadPeerException("Unknown or expired blob data in response: $dataS") + future.complete(curTime() - sentT) + } + + override fun onClosed(stream: Stream) { + synchronized(requests) { + closed = true + requests.values.forEach { it.second.completeExceptionally(ConnectionClosedException()) } + requests.clear() + timeoutScheduler.shutdownNow() + } + activeFuture.completeExceptionally(ConnectionClosedException()) + } + + override fun blob(): CompletableFuture { + val ret = CompletableFuture() + val arr = ByteArray(blobSize) + random.nextBytes(arr) + val dataS = arr.toHex() + + synchronized(requests) { + if (closed) return completedExceptionally(ConnectionClosedException()) + requests[dataS] = curTime() to ret + + timeoutScheduler.schedule( + { + requests.remove(dataS)?.second?.completeExceptionally(BlobTimeoutException()) + }, + blobTimeout.toMillis(), + TimeUnit.MILLISECONDS + ) + } + + println("Sender writing " + blobSize) + stream.writeAndFlush(arr) + return ret + } + } +}