Skip to content

Commit

Permalink
Merge #6240
Browse files Browse the repository at this point in the history
6240: [Backport stable/0.26] fix(broker): use correct address for binding and advertising r=npepinpe a=github-actions[bot]

# Description
Backport of #6234 to `stable/0.26`.

closes #5426 

Co-authored-by: Deepthi Devaki Akkoorath <deepthidevaki@gmail.com>
  • Loading branch information
zeebe-bors[bot] and deepthidevaki authored Feb 1, 2021
2 parents 3706cd4 + 6ec0ae7 commit 00f7ead
Show file tree
Hide file tree
Showing 11 changed files with 306 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import io.atomix.utils.net.Address;
import java.time.Duration;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.BiConsumer;
Expand All @@ -27,12 +28,20 @@
public interface MessagingService {

/**
* Returns the local messaging service address.
* Returns the local messaging service address. This is the address used by other nodes to
* communicate to this service.
*
* @return the local address
* @return the address remote nodes use to communicate to this node
*/
Address address();

/**
* Returns the interfaces to which the local messaging service is bind.
*
* @return the address the messaging service is bound to
*/
Collection<Address> bindingAddresses();

/**
* Sends a message asynchronously to the specified communication address. The message is specified
* using the type and payload.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import static io.atomix.utils.concurrent.Threads.namedThreads;

import com.google.common.base.Throwables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import io.atomix.cluster.messaging.ManagedMessagingService;
Expand Down Expand Up @@ -52,6 +51,8 @@
import io.netty.util.concurrent.Future;
import java.net.InetAddress;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -69,16 +70,17 @@
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Netty based MessagingService. */
public class NettyMessagingService implements ManagedMessagingService {

private final Logger log = LoggerFactory.getLogger(getClass());
private final Address returnAddress;
private final Address advertisedAddress;
private final Collection<Address> bindingAddresses = new ArrayList<>();
private final int preamble;
private final MessagingConfig config;
private final ProtocolVersion protocolVersion;
private final AtomicBoolean started = new AtomicBoolean(false);
private final HandlerRegistry handlers = new HandlerRegistry();
Expand All @@ -95,33 +97,52 @@ public class NettyMessagingService implements ManagedMessagingService {
private final List<CompletableFuture> openFutures;

public NettyMessagingService(
final String cluster, final Address address, final MessagingConfig config) {
this(cluster, address, config, ProtocolVersion.latest());
final String cluster, final Address advertisedAddress, final MessagingConfig config) {
this(cluster, advertisedAddress, config, ProtocolVersion.latest());
}

NettyMessagingService(
final String cluster,
final Address address,
final Address advertisedAddress,
final MessagingConfig config,
final ProtocolVersion protocolVersion) {
preamble = cluster.hashCode();
returnAddress = address;
this.config = config;
this.advertisedAddress = advertisedAddress;
this.protocolVersion = protocolVersion;
openFutures = new CopyOnWriteArrayList<>();
channelPool = new ChannelPool(this::openChannel, config.getConnectionPoolSize());
initAddresses(config);
}

private void initAddresses(final MessagingConfig config) {
final int port = config.getPort() != null ? config.getPort() : advertisedAddress.port();
if (config.getInterfaces().isEmpty()) {
bindingAddresses.add(Address.from(advertisedAddress.host(), port));
} else {
final List<Address> addresses =
config.getInterfaces().stream()
.map(iface -> Address.from(iface, port))
.collect(Collectors.toList());
bindingAddresses.addAll(addresses);
}
}

@Override
public Address address() {
return returnAddress;
return advertisedAddress;
}

@Override
public Collection<Address> bindingAddresses() {
return bindingAddresses;
}

@Override
public CompletableFuture<Void> sendAsync(
final Address address, final String type, final byte[] payload, final boolean keepAlive) {
final long messageId = messageIdGenerator.incrementAndGet();
final ProtocolRequest message = new ProtocolRequest(messageId, returnAddress, type, payload);
final ProtocolRequest message =
new ProtocolRequest(messageId, advertisedAddress, type, payload);
return executeOnPooledConnection(
address, type, c -> c.sendAsync(message), MoreExecutors.directExecutor());
}
Expand Down Expand Up @@ -167,7 +188,8 @@ public CompletableFuture<byte[]> sendAndReceive(
}

final long messageId = messageIdGenerator.incrementAndGet();
final ProtocolRequest message = new ProtocolRequest(messageId, returnAddress, type, payload);
final ProtocolRequest message =
new ProtocolRequest(messageId, advertisedAddress, type, payload);
if (keepAlive) {
return executeOnPooledConnection(
address, type, c -> c.sendAndReceive(message, timeout), executor);
Expand Down Expand Up @@ -237,7 +259,7 @@ public void unregisterHandler(final String type) {
@Override
public CompletableFuture<MessagingService> start() {
if (started.get()) {
log.warn("Already running at local address: {}", returnAddress);
log.warn("Already running at local address: {}", advertisedAddress);
return CompletableFuture.completedFuture(this);
}

Expand Down Expand Up @@ -370,7 +392,7 @@ private <T> void executeOnPooledConnection(
final Function<ClientConnection, CompletableFuture<T>> callback,
final Executor executor,
final CompletableFuture<T> future) {
if (address.equals(returnAddress)) {
if (address.equals(advertisedAddress)) {
callback
.apply(localConnection)
.whenComplete(
Expand Down Expand Up @@ -435,7 +457,7 @@ private <T> CompletableFuture<T> executeOnTransientConnection(
final Function<ClientConnection, CompletableFuture<T>> callback,
final Executor executor) {
final CompletableFuture<T> future = new CompletableFuture<>();
if (address.equals(returnAddress)) {
if (address.equals(advertisedAddress)) {
callback
.apply(localConnection)
.whenComplete(
Expand Down Expand Up @@ -574,46 +596,38 @@ private CompletableFuture<Void> bootstrapServer() {
*/
private CompletableFuture<Void> bind(final ServerBootstrap bootstrap) {
final CompletableFuture<Void> future = new CompletableFuture<>();
final int port = config.getPort() != null ? config.getPort() : returnAddress.port();
if (config.getInterfaces().isEmpty()) {
bind(bootstrap, Lists.newArrayList("0.0.0.0").iterator(), port, future);
} else {
bind(bootstrap, config.getInterfaces().iterator(), port, future);
}

bind(bootstrap, bindingAddresses.iterator(), future);

return future;
}

/**
* Recursively binds the given bootstrap to the given interfaces.
*
* @param bootstrap the bootstrap to bind
* @param ifaces an iterator of interfaces to which to bind
* @param port the port to which to bind
* @param addressIterator an iterator of Addresses to which to bind
* @param future the future to completed once the bootstrap has been bound to all provided
* interfaces
*/
private void bind(
final ServerBootstrap bootstrap,
final Iterator<String> ifaces,
final int port,
final Iterator<Address> addressIterator,
final CompletableFuture<Void> future) {
if (ifaces.hasNext()) {
final String iface = ifaces.next();
if (addressIterator.hasNext()) {
final Address address = addressIterator.next();
bootstrap
.bind(iface, port)
.bind(address.host(), address.port())
.addListener(
(ChannelFutureListener)
f -> {
if (f.isSuccess()) {
log.info("TCP server listening for connections on {}:{}", iface, port);
log.info("TCP server listening for connections on {}", address);
serverChannel = f.channel();
bind(bootstrap, ifaces, port, future);
bind(bootstrap, addressIterator, future);
} else {
log.warn(
"Failed to bind TCP server to port {}:{} due to {}",
iface,
port,
f.cause());
"Failed to bind TCP server to port {} due to {}", address, f.cause());
future.completeExceptionally(f.cause());
}
});
Expand Down Expand Up @@ -695,7 +709,7 @@ void activateProtocolVersion(
final ChannelHandlerContext context,
final Connection<M> connection,
final ProtocolVersion protocolVersion) {
final MessagingProtocol protocol = protocolVersion.createProtocol(returnAddress);
final MessagingProtocol protocol = protocolVersion.createProtocol(advertisedAddress);
context.pipeline().remove(this);
context.pipeline().addLast("encoder", protocol.newEncoder());
context.pipeline().addLast("decoder", protocol.newDecoder());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
import com.google.common.util.concurrent.Uninterruptibles;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.MessagingService;
import io.atomix.utils.net.Address;
import io.zeebe.test.util.socket.SocketUtil;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand Down Expand Up @@ -69,6 +71,7 @@ public class NettyMessagingServiceTest {
Address addressv21;
Address addressv22;
Address invalidAddress;
private MessagingService messagingService;

@Before
public void setUp() throws Exception {
Expand Down Expand Up @@ -424,4 +427,22 @@ public void testVersionNegotiation() throws Exception {
response = nettyv12.sendAndReceive(addressv22, subject, payload).get(10, TimeUnit.SECONDS);
assertArrayEquals(payload, response);
}

@Test
public void shouldNotBindToAdvertisedAddress() {
// given
final var bindingAddress = Address.from(SocketUtil.getNextAddress().getPort());
final MessagingConfig config = new MessagingConfig();
config.setInterfaces(List.of(bindingAddress.host()));
config.setPort(bindingAddress.port());

// when
final Address nonBindableAddress = new Address("invalid.host", 1);
final var startFuture = new NettyMessagingService("test", nonBindableAddress, config).start();

// then - should not fail by using advertisedAddress for binding
messagingService = startFuture.join();
assertThat(messagingService.bindingAddresses()).contains(bindingAddress);
assertThat(messagingService.address()).isEqualTo(nonBindableAddress);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.atomix.utils.net.Address;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -98,6 +100,11 @@ public Address address() {
return address;
}

@Override
public Collection<Address> bindingAddresses() {
return List.of(address);
}

@Override
public CompletableFuture<Void> sendAsync(
final Address address, final String type, final byte[] payload, final boolean keepAlive) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import io.atomix.utils.net.Address;
import java.net.ConnectException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
Expand Down Expand Up @@ -98,6 +100,11 @@ public Address address() {
return address;
}

@Override
public Collection<Address> bindingAddresses() {
return List.of(address);
}

@Override
public CompletableFuture<Void> sendAsync(
final Address address, final String type, final byte[] payload, final boolean keepAlive) {
Expand Down
41 changes: 28 additions & 13 deletions broker/src/main/java/io/zeebe/broker/Broker.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package io.zeebe.broker;

import io.atomix.cluster.MemberId;
import io.atomix.cluster.messaging.ManagedMessagingService;
import io.atomix.cluster.messaging.MessagingConfig;
import io.atomix.cluster.messaging.impl.NettyMessagingService;
import io.atomix.core.Atomix;
Expand All @@ -32,6 +33,7 @@
import io.zeebe.broker.system.configuration.ClusterCfg;
import io.zeebe.broker.system.configuration.DataCfg;
import io.zeebe.broker.system.configuration.NetworkCfg;
import io.zeebe.broker.system.configuration.SocketBindingCfg;
import io.zeebe.broker.system.configuration.backpressure.BackpressureCfg;
import io.zeebe.broker.system.management.BrokerAdminService;
import io.zeebe.broker.system.management.BrokerAdminServiceImpl;
Expand Down Expand Up @@ -206,7 +208,10 @@ private StartProcess initStart() {
startContext.addStep("actor scheduler", this::actorSchedulerStep);
startContext.addStep("membership and replication protocol", () -> atomixCreateStep(brokerCfg));
startContext.addStep(
"command api transport", () -> commandApiTransportStep(clusterCfg, localBroker));
"command api transport",
() ->
commandApiTransportStep(
clusterCfg, brokerCfg.getNetwork().getCommandApi(), localBroker));
startContext.addStep(
"command api handler", () -> commandApiHandlerStep(brokerCfg, localBroker));
startContext.addStep("subscription api", () -> subscriptionAPIStep(localBroker));
Expand Down Expand Up @@ -273,27 +278,37 @@ private AutoCloseable atomixCreateStep(final BrokerCfg brokerCfg) {
}

private AutoCloseable commandApiTransportStep(
final ClusterCfg clusterCfg, final BrokerInfo localBroker) {

final var nettyMessagingService =
new NettyMessagingService(
clusterCfg.getClusterName(),
Address.from(localBroker.getCommandApiAddress()),
new MessagingConfig());

nettyMessagingService.start().join();
LOG.debug("Bound command API to {} ", nettyMessagingService.address());
final ClusterCfg clusterCfg,
final SocketBindingCfg commpandApiConfig,
final BrokerInfo localBroker) {
final var messagingService = createMessagingService(clusterCfg, commpandApiConfig);
messagingService.start().join();
LOG.debug(
"Bound command API to {}, using advertised address {} ",
messagingService.bindingAddresses(),
messagingService.address());

final var transportFactory = new TransportFactory(scheduler);
serverTransport =
transportFactory.createServerTransport(localBroker.getNodeId(), nettyMessagingService);
transportFactory.createServerTransport(localBroker.getNodeId(), messagingService);

return () -> {
serverTransport.close();
nettyMessagingService.stop().join();
messagingService.stop().join();
};
}

private ManagedMessagingService createMessagingService(
final ClusterCfg clusterCfg, final SocketBindingCfg socketCfg) {
final var messagingConfig = new MessagingConfig();
messagingConfig.setInterfaces(List.of(socketCfg.getHost()));
messagingConfig.setPort(socketCfg.getPort());
return new NettyMessagingService(
clusterCfg.getClusterName(),
Address.from(socketCfg.getAdvertisedHost(), socketCfg.getAdvertisedPort()),
messagingConfig);
}

private AutoCloseable commandApiHandlerStep(
final BrokerCfg brokerCfg, final BrokerInfo localBroker) {

Expand Down
Loading

0 comments on commit 00f7ead

Please sign in to comment.