From e9f84083ebdeceee17c40f143c2ced4bd5395896 Mon Sep 17 00:00:00 2001 From: David Handermann Date: Tue, 27 Aug 2024 12:38:58 -0500 Subject: [PATCH] NIFI-13684 Set Cluster Status HTTP Code based on Connection (#9202) --- .../ManagementServerBootstrapCommand.java | 1 + .../ManagementServerBootstrapCommandTest.java | 150 ++++++++++++++++++ .../runtime/HealthClusterHttpHandler.java | 16 +- .../runtime/StandardManagementServerTest.java | 56 ++++++- 4 files changed, 218 insertions(+), 5 deletions(-) create mode 100644 nifi-bootstrap/src/test/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommandTest.java diff --git a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommand.java b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommand.java index e462249ac2eb..296ee062945c 100644 --- a/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommand.java +++ b/nifi-bootstrap/src/main/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommand.java @@ -141,6 +141,7 @@ protected void onResponseStatus(final ProcessHandle applicationProcessHandle, fi } else { commandStatus = CommandStatus.COMMUNICATION_FAILED; getCommandLogger().warn("Application Process [{}] Command Status [{}] HTTP {}", pid, commandStatus, statusCode); + responseStreamHandler.onResponseStream(responseStream); } } diff --git a/nifi-bootstrap/src/test/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommandTest.java b/nifi-bootstrap/src/test/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommandTest.java new file mode 100644 index 000000000000..8c32b3a5050d --- /dev/null +++ b/nifi-bootstrap/src/test/java/org/apache/nifi/bootstrap/command/ManagementServerBootstrapCommandTest.java @@ -0,0 +1,150 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.bootstrap.command; + +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.nifi.bootstrap.command.io.LoggerResponseStreamHandler; +import org.apache.nifi.bootstrap.command.io.ResponseStreamHandler; +import org.apache.nifi.bootstrap.command.process.ProcessHandleProvider; +import org.apache.nifi.bootstrap.configuration.ManagementServerPath; +import org.apache.nifi.bootstrap.configuration.SystemProperty; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.OutputStream; +import java.io.UncheckedIOException; +import java.net.HttpURLConnection; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class ManagementServerBootstrapCommandTest { + private static final Logger logger = LoggerFactory.getLogger(ManagementServerBootstrapCommandTest.class); + + private static final String LOCALHOST = "127.0.0.1"; + + private static final int RANDOM_PORT = 0; + + private static final int BACKLOG = 10; + + private static final int STOP_DELAY = 0; + + private static final String ADDRESS_ARGUMENT = "-D%s=%s:%d"; + + private static final String RESPONSE_STATUS = "Status"; + + @Mock + private ProcessHandleProvider processHandleProvider; + + @Mock + private ProcessHandle processHandle; + + @Mock + private ProcessHandle.Info processHandleInfo; + + @Test + void testStopped() { + final ResponseStreamHandler responseStreamHandler = new LoggerResponseStreamHandler(logger); + + final ManagementServerBootstrapCommand command = new ManagementServerBootstrapCommand(processHandleProvider, ManagementServerPath.HEALTH_CLUSTER, responseStreamHandler); + + command.run(); + + final CommandStatus commandStatus = command.getCommandStatus(); + assertEquals(CommandStatus.STOPPED, commandStatus); + } + + @Test + void testClusterHealthStatusConnected() throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final ResponseStreamHandler responseStreamHandler = responseStream -> { + try { + responseStream.transferTo(outputStream); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + }; + + final HttpServer httpServer = startServer(HttpURLConnection.HTTP_OK); + + final ManagementServerBootstrapCommand command = new ManagementServerBootstrapCommand(processHandleProvider, ManagementServerPath.HEALTH_CLUSTER, responseStreamHandler); + command.run(); + + final CommandStatus commandStatus = command.getCommandStatus(); + assertEquals(CommandStatus.SUCCESS, commandStatus); + assertEquals(RESPONSE_STATUS, outputStream.toString()); + + httpServer.stop(STOP_DELAY); + } + + @Test + void testClusterHealthStatusDisconnected() throws IOException { + final ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + final ResponseStreamHandler responseStreamHandler = responseStream -> { + try { + responseStream.transferTo(outputStream); + } catch (final IOException e) { + throw new UncheckedIOException(e); + } + }; + + final HttpServer httpServer = startServer(HttpURLConnection.HTTP_UNAVAILABLE); + + final ManagementServerBootstrapCommand command = new ManagementServerBootstrapCommand(processHandleProvider, ManagementServerPath.HEALTH_CLUSTER, responseStreamHandler); + command.run(); + + final CommandStatus commandStatus = command.getCommandStatus(); + assertEquals(CommandStatus.COMMUNICATION_FAILED, commandStatus); + assertEquals(RESPONSE_STATUS, outputStream.toString()); + + httpServer.stop(STOP_DELAY); + } + + private HttpServer startServer(final int responseCode) throws IOException { + final InetSocketAddress bindAddress = new InetSocketAddress(LOCALHOST, RANDOM_PORT); + final HttpServer httpServer = HttpServer.create(bindAddress, BACKLOG); + final HttpHandler httpHandler = exchange -> { + exchange.sendResponseHeaders(responseCode, RESPONSE_STATUS.length()); + try (OutputStream responseBody = exchange.getResponseBody()) { + responseBody.write(RESPONSE_STATUS.getBytes(StandardCharsets.UTF_8)); + } + }; + httpServer.createContext(ManagementServerPath.HEALTH_CLUSTER.getPath(), httpHandler); + httpServer.start(); + + final InetSocketAddress serverAddress = httpServer.getAddress(); + final String addressArgument = ADDRESS_ARGUMENT.formatted(SystemProperty.MANAGEMENT_SERVER_ADDRESS.getProperty(), LOCALHOST, serverAddress.getPort()); + final String[] arguments = new String[]{addressArgument}; + + when(processHandleProvider.findApplicationProcessHandle()).thenReturn(Optional.of(processHandle)); + when(processHandle.info()).thenReturn(processHandleInfo); + when(processHandleInfo.arguments()).thenReturn(Optional.of(arguments)); + + return httpServer; + } +} diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java index 6a11031a1b10..a5d472701e41 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/main/java/org/apache/nifi/runtime/HealthClusterHttpHandler.java @@ -31,6 +31,7 @@ import static java.net.HttpURLConnection.HTTP_ACCEPTED; import static java.net.HttpURLConnection.HTTP_BAD_METHOD; import static java.net.HttpURLConnection.HTTP_OK; +import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; /** * HTTP Handler for Cluster Health status operations @@ -65,7 +66,8 @@ public void handle(final HttpExchange exchange) throws IOException { final ConnectionState connectionState = getConnectionState(); final String status = STATUS.formatted(connectionState); final byte[] response = status.getBytes(StandardCharsets.UTF_8); - exchange.sendResponseHeaders(HTTP_OK, response.length); + final int responseCode = getResponseCode(connectionState); + exchange.sendResponseHeaders(responseCode, response.length); responseBody.write(response); } else if (DELETE_METHOD.contentEquals(requestMethod)) { startDecommission(); @@ -91,6 +93,18 @@ private void startDecommission() { }); } + private int getResponseCode(final ConnectionState connectionState) { + final int responseCode; + + if (ConnectionState.CONNECTED == connectionState || ConnectionState.CONNECTING == connectionState) { + responseCode = HTTP_OK; + } else { + responseCode = HTTP_UNAVAILABLE; + } + + return responseCode; + } + private ConnectionState getConnectionState() { final ConnectionState connectionState; diff --git a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/runtime/StandardManagementServerTest.java b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/runtime/StandardManagementServerTest.java index 1a6711338da3..0167a1d364df 100644 --- a/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/runtime/StandardManagementServerTest.java +++ b/nifi-framework-bundle/nifi-framework/nifi-runtime/src/test/java/org/apache/nifi/runtime/StandardManagementServerTest.java @@ -17,6 +17,8 @@ package org.apache.nifi.runtime; import org.apache.nifi.NiFiServer; +import org.apache.nifi.cluster.ClusterDetailsFactory; +import org.apache.nifi.cluster.ConnectionState; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; @@ -34,6 +36,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.mockito.Mockito.when; @ExtendWith(MockitoExtension.class) class StandardManagementServerTest { @@ -41,6 +44,8 @@ class StandardManagementServerTest { private static final String HEALTH_URI = "http://%s:%d/health"; + private static final String HEALTH_CLUSTER_URI = "http://%s:%d/health/cluster"; + private static final String GET_METHOD = "GET"; private static final String DELETE_METHOD = "DELETE"; @@ -50,6 +55,9 @@ class StandardManagementServerTest { @Mock private NiFiServer server; + @Mock + private ClusterDetailsFactory clusterDetailsFactory; + @Test void testStartStop() { final InetSocketAddress initialBindAddress = new InetSocketAddress(LOCALHOST, 0); @@ -76,7 +84,47 @@ void testGetHealth() throws Exception { final InetSocketAddress serverAddress = standardManagementServer.getServerAddress(); assertNotSame(initialBindAddress.getPort(), serverAddress.getPort()); - assertResponseStatusCode(serverAddress, GET_METHOD, HttpURLConnection.HTTP_OK); + assertResponseStatusCode(HEALTH_URI, serverAddress, GET_METHOD, HttpURLConnection.HTTP_OK); + } finally { + standardManagementServer.stop(); + } + } + + @Test + void testGetHealthClusterDisconnected() throws Exception { + final InetSocketAddress initialBindAddress = new InetSocketAddress(LOCALHOST, 0); + final StandardManagementServer standardManagementServer = new StandardManagementServer(initialBindAddress, server); + + when(server.getClusterDetailsFactory()).thenReturn(clusterDetailsFactory); + when(clusterDetailsFactory.getConnectionState()).thenReturn(ConnectionState.DISCONNECTED); + + try { + standardManagementServer.start(); + + final InetSocketAddress serverAddress = standardManagementServer.getServerAddress(); + assertNotSame(initialBindAddress.getPort(), serverAddress.getPort()); + + assertResponseStatusCode(HEALTH_CLUSTER_URI, serverAddress, GET_METHOD, HttpURLConnection.HTTP_UNAVAILABLE); + } finally { + standardManagementServer.stop(); + } + } + + @Test + void testGetHealthClusterConnected() throws Exception { + final InetSocketAddress initialBindAddress = new InetSocketAddress(LOCALHOST, 0); + final StandardManagementServer standardManagementServer = new StandardManagementServer(initialBindAddress, server); + + when(server.getClusterDetailsFactory()).thenReturn(clusterDetailsFactory); + when(clusterDetailsFactory.getConnectionState()).thenReturn(ConnectionState.CONNECTED); + + try { + standardManagementServer.start(); + + final InetSocketAddress serverAddress = standardManagementServer.getServerAddress(); + assertNotSame(initialBindAddress.getPort(), serverAddress.getPort()); + + assertResponseStatusCode(HEALTH_CLUSTER_URI, serverAddress, GET_METHOD, HttpURLConnection.HTTP_OK); } finally { standardManagementServer.stop(); } @@ -94,14 +142,14 @@ void testDeleteHealth() throws Exception { final InetSocketAddress serverAddress = standardManagementServer.getServerAddress(); assertNotSame(initialBindAddress.getPort(), serverAddress.getPort()); - assertResponseStatusCode(serverAddress, DELETE_METHOD, HttpURLConnection.HTTP_BAD_METHOD); + assertResponseStatusCode(HEALTH_URI, serverAddress, DELETE_METHOD, HttpURLConnection.HTTP_BAD_METHOD); } finally { standardManagementServer.stop(); } } - private void assertResponseStatusCode(final InetSocketAddress serverAddress, final String method, final int responseStatusCode) throws IOException, InterruptedException { - final URI healthUri = URI.create(HEALTH_URI.formatted(serverAddress.getHostString(), serverAddress.getPort())); + private void assertResponseStatusCode(final String uri, final InetSocketAddress serverAddress, final String method, final int responseStatusCode) throws IOException, InterruptedException { + final URI healthUri = URI.create(uri.formatted(serverAddress.getHostString(), serverAddress.getPort())); try (HttpClient httpClient = HttpClient.newBuilder().connectTimeout(TIMEOUT).build()) { final HttpRequest request = HttpRequest.newBuilder(healthUri)