Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Host.getProtocols() and fix addProtocolHandler #299

Merged
merged 7 commits into from
Aug 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/core/Host.kt
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ interface Host {
*/
fun addProtocolHandler(protocolBinding: ProtocolBinding<Any>)

fun getProtocols(): List<ProtocolBinding<Any>>

/**
* Removes the handler added with [addProtocolHandler]
*/
Expand Down
6 changes: 4 additions & 2 deletions libp2p/src/main/kotlin/io/libp2p/core/dsl/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import io.libp2p.transport.tcp.TcpTransport
import io.netty.channel.ChannelHandler
import io.netty.handler.logging.LogLevel
import io.netty.handler.logging.LoggingHandler
import java.util.concurrent.CopyOnWriteArrayList

typealias TransportCtor = (ConnectionUpgrader) -> Transport
typealias SecureChannelCtor = (PrivKey, List<StreamMuxer>) -> SecureChannel
Expand Down Expand Up @@ -173,7 +174,8 @@ open class Builder {
}
}

val muxers = muxers.map { it.createMuxer(streamMultistreamProtocol, protocols.values) }
val updatableProtocols: MutableList<ProtocolBinding<Any>> = CopyOnWriteArrayList(protocols.values)
val muxers = muxers.map { it.createMuxer(streamMultistreamProtocol, updatableProtocols) }

val secureChannels = secureChannels.values.map { it(privKey, muxers) }

Expand Down Expand Up @@ -201,7 +203,7 @@ open class Builder {
networkImpl,
addressBook,
network.listen.map { Multiaddr(it) },
protocols.values,
updatableProtocols,
broadcastConnHandler,
streamVisitors
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,5 @@ interface Multistream<TController> : P2PChannelHandler<TController> {
* For _initiator_ role this is the list of protocols the initiator wants to instantiate.
* Basically this is either a single protocol or a protocol versions
*/
val bindings: MutableList<ProtocolBinding<TController>>
val bindings: List<ProtocolBinding<TController>>
}
4 changes: 4 additions & 0 deletions libp2p/src/main/kotlin/io/libp2p/host/HostImpl.kt
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,10 @@ class HostImpl(
protocolHandlers -= protocolBinding
}

override fun getProtocols(): List<ProtocolBinding<Any>> {
return protocolHandlers
}

override fun addConnectionHandler(handler: ConnectionHandler) {
connectionHandlers += handler
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,14 @@ import io.libp2p.core.multistream.Multistream
import io.libp2p.core.multistream.ProtocolBinding
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CopyOnWriteArrayList

class MultistreamImpl<TController>(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For simplicity can just add the bindings to the constructor

class MultistreamImpl<TController>(
    override val bindings: MutableList<ProtocolBinding<TController>>,
    val preHandler: P2PChannelHandler<*>? = null,
    val postHandler: P2PChannelHandler<*>? = null,
    val negotiationTimeLimit: Duration = DEFAULT_NEGOTIATION_TIME_LIMIT
)

and remove
override val bindings: List<ProtocolBinding<TController>> = initList

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, it's not that right to pass a List (which is implicitly immutable) which is mutated behind the scene and is expected to have effect

Copy link
Contributor Author

@ianopolous ianopolous Aug 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, I can make that change, though it ripples through the classes into a much bigger change.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I tried to do this and it goes down a long rabbit hole that I couldn't get to work.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think I have already come up with some more or less acceptable change. I'll try to polish it tomorrow

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh... TBH passing the MutableList also doesn't look like a right pattern. Passing MutableList makes sense only if the implementation is expected to update the list, which is not the case. Something like ObservableList would be the most appropriate imo 🤔
The overall design appears to be not quite good here, but that's the matter of large refactoring.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I would suggest to merge that variant with a couple of updates: Peergos#13

initList: List<ProtocolBinding<TController>> = listOf(),
override val bindings: List<ProtocolBinding<TController>>,
val preHandler: P2PChannelHandler<*>? = null,
val postHandler: P2PChannelHandler<*>? = null,
val negotiationTimeLimit: Duration = DEFAULT_NEGOTIATION_TIME_LIMIT
) : Multistream<TController> {

override val bindings: MutableList<ProtocolBinding<TController>> =
CopyOnWriteArrayList(initList)

override fun initChannel(ch: P2PChannel): CompletableFuture<TController> {
return with(ch) {
preHandler?.also {
Expand Down
63 changes: 63 additions & 0 deletions libp2p/src/test/java/io/libp2p/core/HostTestJava.java
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,69 @@ void largePing() throws Exception {
System.out.println("Server stopped");
}

@Test
void addPingAfterHostStart() throws Exception {
String localListenAddress = "/ip4/127.0.0.1/tcp/40002";

Host clientHost = new HostBuilder()
.transport(TcpTransport::new)
.secureChannel((k, m) -> new TlsSecureChannel(k, m, "ECDSA"))
.muxer(StreamMuxerProtocol::getYamux)
.build();

Host serverHost = new HostBuilder()
.transport(TcpTransport::new)
.secureChannel(TlsSecureChannel::new)
.muxer(StreamMuxerProtocol::getYamux)
.listen(localListenAddress)
.build();

CompletableFuture<Void> clientStarted = clientHost.start();
CompletableFuture<Void> serverStarted = serverHost.start();
clientStarted.get(5, TimeUnit.SECONDS);
System.out.println("Client started");
serverStarted.get(5, TimeUnit.SECONDS);
System.out.println("Server started");

Assertions.assertEquals(0, clientHost.listenAddresses().size());
Assertions.assertEquals(1, serverHost.listenAddresses().size());
Assertions.assertEquals(
localListenAddress + "/p2p/" + serverHost.getPeerId(),
serverHost.listenAddresses().get(0).toString()
);

serverHost.addProtocolHandler(new Ping());

StreamPromise<PingController> ping =
clientHost.getNetwork().connect(
serverHost.getPeerId(),
new Multiaddr(localListenAddress)
).thenApply(
it -> it.muxerSession().createStream(new Ping())
)
.get(5, TimeUnit.SECONDS);

Stream pingStream = ping.getStream().get(5, TimeUnit.SECONDS);
System.out.println("Ping stream created");
PingController pingCtr = ping.getController().get(5, TimeUnit.SECONDS);
System.out.println("Ping controller created");

for (int i = 0; i < 10; i++) {
long latency = pingCtr.ping().get(1, TimeUnit.SECONDS);
System.out.println("Ping is " + latency);
}
pingStream.close().get(5, TimeUnit.SECONDS);
System.out.println("Ping stream closed");

Assertions.assertThrows(ExecutionException.class, () ->
pingCtr.ping().get(5, TimeUnit.SECONDS));

clientHost.stop().get(5, TimeUnit.SECONDS);
System.out.println("Client stopped");
serverHost.stop().get(5, TimeUnit.SECONDS);
System.out.println("Server stopped");
}

@Test
void keyPairGeneration() {
Pair<PrivKey, PubKey> pair = KeyKt.generateKeyPair(KEY_TYPE.SECP256K1);
Expand Down
4 changes: 4 additions & 0 deletions libp2p/src/testFixtures/kotlin/io/libp2p/tools/NullHost.kt
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ open class NullHost : Host {
TODO("not implemented")
}

override fun getProtocols(): List<ProtocolBinding<Any>> {
TODO("not implemented")
}

override fun addConnectionHandler(handler: ConnectionHandler) {
TODO("not implemented")
}
Expand Down