Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
DuplexConnection exposes remoteAddress()
Browse files Browse the repository at this point in the history
Closes gh-735

Signed-off-by: Rossen Stoyanchev <rstoyanchev@vmware.com>
rstoyanchev committed Sep 10, 2020

Verified

This commit was signed with the committer’s verified signature. The key has expired.
kylekurz Kyle Kurz
1 parent 6959390 commit 2d0efed
Showing 14 changed files with 143 additions and 17 deletions.
14 changes: 13 additions & 1 deletion rsocket-core/src/main/java/io/rsocket/DuplexConnection.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -86,6 +87,17 @@ default Mono<Void> sendOne(ByteBuf frame) {
*/
ByteBufAllocator alloc();

/**
* Return the remote address that this connection is connected to. The returned {@link
* SocketAddress} varies by transport type and should be downcast to obtain more detailed
* information. For TCP and WebSocket, the address type is {@link java.net.InetSocketAddress}. For
* local transport, it is {@link io.rsocket.transport.local.LocalSocketAddress}.
*
* @return the address
* @since 1.1
*/
SocketAddress remoteAddress();

@Override
default double availability() {
return isDisposed() ? 0.0 : 1.0;
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import io.rsocket.frame.FrameUtil;
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
@@ -372,6 +373,11 @@ public ByteBufAllocator alloc() {
return source.alloc();
}

@Override
public SocketAddress remoteAddress() {
return source.remoteAddress();
}

@Override
public void dispose() {
source.dispose();
Original file line number Diff line number Diff line change
@@ -24,6 +24,7 @@
import io.rsocket.frame.FrameUtil;
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
import io.rsocket.plugins.InitializingInterceptorRegistry;
import java.net.SocketAddress;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -221,6 +222,11 @@ public ByteBufAllocator alloc() {
return source.alloc();
}

@Override
public SocketAddress remoteAddress() {
return source.remoteAddress();
}

@Override
public void dispose() {
source.dispose();
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2019 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@
import io.rsocket.Closeable;
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameHeaderCodec;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.time.Duration;
import java.util.Queue;
@@ -111,6 +112,11 @@ public ByteBufAllocator alloc() {
return curConnection.alloc();
}

@Override
public SocketAddress remoteAddress() {
return curConnection.remoteAddress();
}

public void disconnect() {
DuplexConnection c = this.curConnection;
if (c != null) {
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import java.net.SocketAddress;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
@@ -93,6 +94,11 @@ public ByteBufAllocator alloc() {
return allocator;
}

@Override
public SocketAddress remoteAddress() {
return new TestLocalSocketAddress(name);
}

@Override
public void dispose() {
onClose.onComplete();
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import java.net.SocketAddress;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.reactivestreams.Publisher;
@@ -111,6 +112,11 @@ public ByteBufAllocator alloc() {
return allocator;
}

@Override
public SocketAddress remoteAddress() {
return new TestLocalSocketAddress("TestDuplexConnection");
}

@Override
public double availability() {
return availability;
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.rsocket.test.util;

import java.net.SocketAddress;
import java.util.Objects;

public final class TestLocalSocketAddress extends SocketAddress {

private static final long serialVersionUID = 2608695156052100164L;

private final String name;

/**
* Creates a new instance.
*
* @param name the name representing the address
* @throws NullPointerException if {@code name} is {@code null}
*/
public TestLocalSocketAddress(String name) {
this.name = Objects.requireNonNull(name, "name must not be null");
}

/** Return the name for this connection. */
public String getName() {
return name;
}

@Override
public String toString() {
return "[local address] " + name;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
import io.rsocket.frame.FrameHeaderCodec;
import io.rsocket.frame.FrameType;
import io.rsocket.plugins.DuplexConnectionInterceptor.Type;
import java.net.SocketAddress;
import java.util.Objects;
import java.util.function.Consumer;
import org.reactivestreams.Publisher;
@@ -88,6 +89,11 @@ public ByteBufAllocator alloc() {
return delegate.alloc();
}

@Override
public SocketAddress remoteAddress() {
return delegate.remoteAddress();
}

@Override
public void dispose() {
delegate.dispose();
6 changes: 6 additions & 0 deletions rsocket-test/src/main/java/io/rsocket/test/TransportTest.java
Original file line number Diff line number Diff line change
@@ -33,6 +33,7 @@
import io.rsocket.util.ByteBufPayload;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Executors;
@@ -568,6 +569,11 @@ public ByteBufAllocator alloc() {
return duplexConnection.alloc();
}

@Override
public SocketAddress remoteAddress() {
return duplexConnection.remoteAddress();
}

@Override
public Mono<Void> onClose() {
return duplexConnection.onClose();
Original file line number Diff line number Diff line change
@@ -82,10 +82,13 @@ public Mono<DuplexConnection> connect() {
UnboundedProcessor<ByteBuf> out = new UnboundedProcessor<>();
MonoProcessor<Void> closeNotifier = MonoProcessor.create();

server.apply(new LocalDuplexConnection(allocator, out, in, closeNotifier)).subscribe();
server
.apply(new LocalDuplexConnection(name, allocator, out, in, closeNotifier))
.subscribe();

return Mono.just(
(DuplexConnection) new LocalDuplexConnection(allocator, in, out, closeNotifier));
(DuplexConnection)
new LocalDuplexConnection(name, allocator, in, out, closeNotifier));
});
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -19,6 +19,7 @@
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.rsocket.DuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
@@ -33,6 +34,7 @@
/** An implementation of {@link DuplexConnection} that connects inside the same JVM. */
final class LocalDuplexConnection implements DuplexConnection {

private final LocalSocketAddress address;
private final ByteBufAllocator allocator;
private final Flux<ByteBuf> in;

@@ -43,16 +45,19 @@ final class LocalDuplexConnection implements DuplexConnection {
/**
* Creates a new instance.
*
* @param name the name assigned to this local connection
* @param in the inbound {@link ByteBuf}s
* @param out the outbound {@link ByteBuf}s
* @param onClose the closing notifier
* @throws NullPointerException if {@code in}, {@code out}, or {@code onClose} are {@code null}
*/
LocalDuplexConnection(
String name,
ByteBufAllocator allocator,
Flux<ByteBuf> in,
Subscriber<ByteBuf> out,
MonoProcessor<Void> onClose) {
this.address = new LocalSocketAddress(name);
this.allocator = Objects.requireNonNull(allocator, "allocator must not be null");
this.in = Objects.requireNonNull(in, "in must not be null");
this.out = Objects.requireNonNull(out, "out must not be null");
@@ -100,6 +105,11 @@ public ByteBufAllocator alloc() {
return allocator;
}

@Override
public SocketAddress remoteAddress() {
return address;
}

static class ByteBufReleaserOperator
implements CoreSubscriber<ByteBuf>, Subscription, Fuseable.QueueSubscription<ByteBuf> {

Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,7 +20,7 @@
import java.util.Objects;

/** An implementation of {@link SocketAddress} representing a local connection. */
final class LocalSocketAddress extends SocketAddress {
public final class LocalSocketAddress extends SocketAddress {

private static final long serialVersionUID = -7513338854585475473L;

@@ -32,16 +32,17 @@ final class LocalSocketAddress extends SocketAddress {
* @param name the name representing the address
* @throws NullPointerException if {@code name} is {@code null}
*/
LocalSocketAddress(String name) {
public LocalSocketAddress(String name) {
this.name = Objects.requireNonNull(name, "name must not be null");
}

@Override
public String toString() {
return "[local server] " + name;
/** Return the name for this connection. */
public String getName() {
return name;
}

String getName() {
return name;
@Override
public String toString() {
return "[local address] " + name;
}
}
Original file line number Diff line number Diff line change
@@ -21,6 +21,7 @@
import io.rsocket.DuplexConnection;
import io.rsocket.frame.FrameLengthCodec;
import io.rsocket.internal.BaseDuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -54,6 +55,11 @@ public ByteBufAllocator alloc() {
return connection.channel().alloc();
}

@Override
public SocketAddress remoteAddress() {
return connection.channel().remoteAddress();
}

@Override
protected void doOnClose() {
if (!connection.isDisposed()) {
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2018 the original author or authors.
* Copyright 2015-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.rsocket.DuplexConnection;
import io.rsocket.internal.BaseDuplexConnection;
import java.net.SocketAddress;
import java.util.Objects;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
@@ -59,6 +60,11 @@ public ByteBufAllocator alloc() {
return connection.channel().alloc();
}

@Override
public SocketAddress remoteAddress() {
return connection.channel().remoteAddress();
}

@Override
protected void doOnClose() {
if (!connection.isDisposed()) {

0 comments on commit 2d0efed

Please sign in to comment.