Skip to content

Commit

Permalink
NIFI-13684 Set Cluster Status HTTP Code based on Connection (apache#9202
Browse files Browse the repository at this point in the history
)
  • Loading branch information
exceptionfactory authored Aug 27, 2024
1 parent d9c5a19 commit e9f8408
Show file tree
Hide file tree
Showing 4 changed files with 218 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ protected void onResponseStatus(final ProcessHandle applicationProcessHandle, fi
} else {
commandStatus = CommandStatus.COMMUNICATION_FAILED;
getCommandLogger().warn("Application Process [{}] Command Status [{}] HTTP {}", pid, commandStatus, statusCode);
responseStreamHandler.onResponseStream(responseStream);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.bootstrap.command;

import com.sun.net.httpserver.HttpHandler;
import com.sun.net.httpserver.HttpServer;
import org.apache.nifi.bootstrap.command.io.LoggerResponseStreamHandler;
import org.apache.nifi.bootstrap.command.io.ResponseStreamHandler;
import org.apache.nifi.bootstrap.command.process.ProcessHandleProvider;
import org.apache.nifi.bootstrap.configuration.ManagementServerPath;
import org.apache.nifi.bootstrap.configuration.SystemProperty;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.Optional;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class ManagementServerBootstrapCommandTest {
private static final Logger logger = LoggerFactory.getLogger(ManagementServerBootstrapCommandTest.class);

private static final String LOCALHOST = "127.0.0.1";

private static final int RANDOM_PORT = 0;

private static final int BACKLOG = 10;

private static final int STOP_DELAY = 0;

private static final String ADDRESS_ARGUMENT = "-D%s=%s:%d";

private static final String RESPONSE_STATUS = "Status";

@Mock
private ProcessHandleProvider processHandleProvider;

@Mock
private ProcessHandle processHandle;

@Mock
private ProcessHandle.Info processHandleInfo;

@Test
void testStopped() {
final ResponseStreamHandler responseStreamHandler = new LoggerResponseStreamHandler(logger);

final ManagementServerBootstrapCommand command = new ManagementServerBootstrapCommand(processHandleProvider, ManagementServerPath.HEALTH_CLUSTER, responseStreamHandler);

command.run();

final CommandStatus commandStatus = command.getCommandStatus();
assertEquals(CommandStatus.STOPPED, commandStatus);
}

@Test
void testClusterHealthStatusConnected() throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final ResponseStreamHandler responseStreamHandler = responseStream -> {
try {
responseStream.transferTo(outputStream);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
};

final HttpServer httpServer = startServer(HttpURLConnection.HTTP_OK);

final ManagementServerBootstrapCommand command = new ManagementServerBootstrapCommand(processHandleProvider, ManagementServerPath.HEALTH_CLUSTER, responseStreamHandler);
command.run();

final CommandStatus commandStatus = command.getCommandStatus();
assertEquals(CommandStatus.SUCCESS, commandStatus);
assertEquals(RESPONSE_STATUS, outputStream.toString());

httpServer.stop(STOP_DELAY);
}

@Test
void testClusterHealthStatusDisconnected() throws IOException {
final ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
final ResponseStreamHandler responseStreamHandler = responseStream -> {
try {
responseStream.transferTo(outputStream);
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
};

final HttpServer httpServer = startServer(HttpURLConnection.HTTP_UNAVAILABLE);

final ManagementServerBootstrapCommand command = new ManagementServerBootstrapCommand(processHandleProvider, ManagementServerPath.HEALTH_CLUSTER, responseStreamHandler);
command.run();

final CommandStatus commandStatus = command.getCommandStatus();
assertEquals(CommandStatus.COMMUNICATION_FAILED, commandStatus);
assertEquals(RESPONSE_STATUS, outputStream.toString());

httpServer.stop(STOP_DELAY);
}

private HttpServer startServer(final int responseCode) throws IOException {
final InetSocketAddress bindAddress = new InetSocketAddress(LOCALHOST, RANDOM_PORT);
final HttpServer httpServer = HttpServer.create(bindAddress, BACKLOG);
final HttpHandler httpHandler = exchange -> {
exchange.sendResponseHeaders(responseCode, RESPONSE_STATUS.length());
try (OutputStream responseBody = exchange.getResponseBody()) {
responseBody.write(RESPONSE_STATUS.getBytes(StandardCharsets.UTF_8));
}
};
httpServer.createContext(ManagementServerPath.HEALTH_CLUSTER.getPath(), httpHandler);
httpServer.start();

final InetSocketAddress serverAddress = httpServer.getAddress();
final String addressArgument = ADDRESS_ARGUMENT.formatted(SystemProperty.MANAGEMENT_SERVER_ADDRESS.getProperty(), LOCALHOST, serverAddress.getPort());
final String[] arguments = new String[]{addressArgument};

when(processHandleProvider.findApplicationProcessHandle()).thenReturn(Optional.of(processHandle));
when(processHandle.info()).thenReturn(processHandleInfo);
when(processHandleInfo.arguments()).thenReturn(Optional.of(arguments));

return httpServer;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import static java.net.HttpURLConnection.HTTP_ACCEPTED;
import static java.net.HttpURLConnection.HTTP_BAD_METHOD;
import static java.net.HttpURLConnection.HTTP_OK;
import static java.net.HttpURLConnection.HTTP_UNAVAILABLE;

/**
* HTTP Handler for Cluster Health status operations
Expand Down Expand Up @@ -65,7 +66,8 @@ public void handle(final HttpExchange exchange) throws IOException {
final ConnectionState connectionState = getConnectionState();
final String status = STATUS.formatted(connectionState);
final byte[] response = status.getBytes(StandardCharsets.UTF_8);
exchange.sendResponseHeaders(HTTP_OK, response.length);
final int responseCode = getResponseCode(connectionState);
exchange.sendResponseHeaders(responseCode, response.length);
responseBody.write(response);
} else if (DELETE_METHOD.contentEquals(requestMethod)) {
startDecommission();
Expand All @@ -91,6 +93,18 @@ private void startDecommission() {
});
}

private int getResponseCode(final ConnectionState connectionState) {
final int responseCode;

if (ConnectionState.CONNECTED == connectionState || ConnectionState.CONNECTING == connectionState) {
responseCode = HTTP_OK;
} else {
responseCode = HTTP_UNAVAILABLE;
}

return responseCode;
}

private ConnectionState getConnectionState() {
final ConnectionState connectionState;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.nifi.runtime;

import org.apache.nifi.NiFiServer;
import org.apache.nifi.cluster.ClusterDetailsFactory;
import org.apache.nifi.cluster.ConnectionState;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.Mock;
Expand All @@ -34,13 +36,16 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.mockito.Mockito.when;

@ExtendWith(MockitoExtension.class)
class StandardManagementServerTest {
private static final String LOCALHOST = "127.0.0.1";

private static final String HEALTH_URI = "http://%s:%d/health";

private static final String HEALTH_CLUSTER_URI = "http://%s:%d/health/cluster";

private static final String GET_METHOD = "GET";

private static final String DELETE_METHOD = "DELETE";
Expand All @@ -50,6 +55,9 @@ class StandardManagementServerTest {
@Mock
private NiFiServer server;

@Mock
private ClusterDetailsFactory clusterDetailsFactory;

@Test
void testStartStop() {
final InetSocketAddress initialBindAddress = new InetSocketAddress(LOCALHOST, 0);
Expand All @@ -76,7 +84,47 @@ void testGetHealth() throws Exception {
final InetSocketAddress serverAddress = standardManagementServer.getServerAddress();
assertNotSame(initialBindAddress.getPort(), serverAddress.getPort());

assertResponseStatusCode(serverAddress, GET_METHOD, HttpURLConnection.HTTP_OK);
assertResponseStatusCode(HEALTH_URI, serverAddress, GET_METHOD, HttpURLConnection.HTTP_OK);
} finally {
standardManagementServer.stop();
}
}

@Test
void testGetHealthClusterDisconnected() throws Exception {
final InetSocketAddress initialBindAddress = new InetSocketAddress(LOCALHOST, 0);
final StandardManagementServer standardManagementServer = new StandardManagementServer(initialBindAddress, server);

when(server.getClusterDetailsFactory()).thenReturn(clusterDetailsFactory);
when(clusterDetailsFactory.getConnectionState()).thenReturn(ConnectionState.DISCONNECTED);

try {
standardManagementServer.start();

final InetSocketAddress serverAddress = standardManagementServer.getServerAddress();
assertNotSame(initialBindAddress.getPort(), serverAddress.getPort());

assertResponseStatusCode(HEALTH_CLUSTER_URI, serverAddress, GET_METHOD, HttpURLConnection.HTTP_UNAVAILABLE);
} finally {
standardManagementServer.stop();
}
}

@Test
void testGetHealthClusterConnected() throws Exception {
final InetSocketAddress initialBindAddress = new InetSocketAddress(LOCALHOST, 0);
final StandardManagementServer standardManagementServer = new StandardManagementServer(initialBindAddress, server);

when(server.getClusterDetailsFactory()).thenReturn(clusterDetailsFactory);
when(clusterDetailsFactory.getConnectionState()).thenReturn(ConnectionState.CONNECTED);

try {
standardManagementServer.start();

final InetSocketAddress serverAddress = standardManagementServer.getServerAddress();
assertNotSame(initialBindAddress.getPort(), serverAddress.getPort());

assertResponseStatusCode(HEALTH_CLUSTER_URI, serverAddress, GET_METHOD, HttpURLConnection.HTTP_OK);
} finally {
standardManagementServer.stop();
}
Expand All @@ -94,14 +142,14 @@ void testDeleteHealth() throws Exception {
final InetSocketAddress serverAddress = standardManagementServer.getServerAddress();
assertNotSame(initialBindAddress.getPort(), serverAddress.getPort());

assertResponseStatusCode(serverAddress, DELETE_METHOD, HttpURLConnection.HTTP_BAD_METHOD);
assertResponseStatusCode(HEALTH_URI, serverAddress, DELETE_METHOD, HttpURLConnection.HTTP_BAD_METHOD);
} finally {
standardManagementServer.stop();
}
}

private void assertResponseStatusCode(final InetSocketAddress serverAddress, final String method, final int responseStatusCode) throws IOException, InterruptedException {
final URI healthUri = URI.create(HEALTH_URI.formatted(serverAddress.getHostString(), serverAddress.getPort()));
private void assertResponseStatusCode(final String uri, final InetSocketAddress serverAddress, final String method, final int responseStatusCode) throws IOException, InterruptedException {
final URI healthUri = URI.create(uri.formatted(serverAddress.getHostString(), serverAddress.getPort()));

try (HttpClient httpClient = HttpClient.newBuilder().connectTimeout(TIMEOUT).build()) {
final HttpRequest request = HttpRequest.newBuilder(healthUri)
Expand Down

0 comments on commit e9f8408

Please sign in to comment.