diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java index f74f3f1e3..035b9cfec 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/ControlPlanePlugin.java @@ -16,6 +16,9 @@ package com.linecorp.centraldogma.xds.internal; +import static com.linecorp.centraldogma.xds.internal.XdsServiceUtil.JSON_MESSAGE_MARSHALLER; + +import java.io.IOException; import java.util.HashSet; import java.util.Map; import java.util.Set; @@ -31,7 +34,6 @@ import com.fasterxml.jackson.databind.JsonNode; import com.google.common.base.MoreObjects; -import com.google.protobuf.InvalidProtocolBufferException; import com.linecorp.armeria.common.grpc.GrpcJsonMarshaller; import com.linecorp.armeria.common.util.UnmodifiableFuture; @@ -179,23 +181,23 @@ private void init0(PluginInitContext pluginInitContext) { } private void setXdsResources(String path, String contentAsText, String repoName) - throws InvalidProtocolBufferException { + throws IOException { if (path.startsWith(CLUSTERS_DIRECTORY)) { final Cluster.Builder builder = Cluster.newBuilder(); - JsonFormatUtil.parser().merge(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); centralDogmaXdsResources.setCluster(repoName, builder.build()); } else if (path.startsWith(ENDPOINTS_DIRECTORY)) { final ClusterLoadAssignment.Builder builder = ClusterLoadAssignment.newBuilder(); - JsonFormatUtil.parser().merge(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); centralDogmaXdsResources.setEndpoint(repoName, builder.build()); } else if (path.startsWith(LISTENERS_DIRECTORY)) { final Listener.Builder builder = Listener.newBuilder(); - JsonFormatUtil.parser().merge(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); centralDogmaXdsResources.setListener(repoName, builder.build()); } else if (path.startsWith(ROUTES_DIRECTORY)) { final RouteConfiguration.Builder builder = RouteConfiguration.newBuilder(); - JsonFormatUtil.parser().merge(contentAsText, builder); + JSON_MESSAGE_MARSHALLER.mergeValue(contentAsText, builder); centralDogmaXdsResources.setRoute(repoName, builder.build()); } else { // ignore diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/JsonFormatUtil.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/JsonFormatUtil.java deleted file mode 100644 index 79722b351..000000000 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/JsonFormatUtil.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Copyright 2023 LINE Corporation - * - * LINE Corporation licenses this file to you under the Apache License, - * version 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at: - * - * https://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations - * under the License. - */ - -package com.linecorp.centraldogma.xds.internal; - -import com.google.protobuf.util.JsonFormat; -import com.google.protobuf.util.JsonFormat.Parser; -import com.google.protobuf.util.JsonFormat.Printer; -import com.google.protobuf.util.JsonFormat.TypeRegistry; - -import io.envoyproxy.envoy.extensions.filters.http.router.v3.Router; -import io.envoyproxy.envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager; - -final class JsonFormatUtil { - - private static final Printer PRINTER; - private static final Parser PARSER; - - static { - final TypeRegistry typeRegistry = TypeRegistry.newBuilder() - .add(HttpConnectionManager.getDescriptor()) - .add(Router.getDescriptor()) - .build(); - PRINTER = JsonFormat.printer().usingTypeRegistry(typeRegistry); - PARSER = JsonFormat.parser().usingTypeRegistry(typeRegistry); - } - - static Printer printer() { - return PRINTER; - } - - static Parser parser() { - return PARSER; - } - - private JsonFormatUtil() {} -} diff --git a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsServiceUtil.java b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsServiceUtil.java index 506c163c8..0805a87fd 100644 --- a/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsServiceUtil.java +++ b/xds/src/main/java/com/linecorp/centraldogma/xds/internal/XdsServiceUtil.java @@ -52,6 +52,7 @@ public final class XdsServiceUtil { public static final String RESOURCE_ID_PATTERN_STRING = "[a-z](?:[a-z0-9-_/]*[a-z0-9])?"; public static final Pattern RESOURCE_ID_PATTERN = Pattern.compile('^' + RESOURCE_ID_PATTERN_STRING + '$'); + //TODO(minwoox): Automate the registration of the extension message types. public static final MessageMarshaller JSON_MESSAGE_MARSHALLER = MessageMarshaller.builder().omittingInsignificantWhitespace(true) .register(Listener.getDefaultInstance()) diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterServiceTest.java index 0f1253ccb..b716187a3 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/cluster/v1/XdsClusterServiceTest.java @@ -17,10 +17,12 @@ import static com.linecorp.centraldogma.xds.internal.XdsServiceUtil.JSON_MESSAGE_MARSHALLER; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.cluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createCluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.updateCluster; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.IOException; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -40,7 +42,6 @@ import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import com.linecorp.centraldogma.xds.cluster.v1.XdsClusterServiceGrpc.XdsClusterServiceBlockingStub; @@ -59,45 +60,29 @@ class XdsClusterServiceTest { @BeforeAll static void setup() { - final AggregatedHttpResponse response = createGroup("groups/foo"); + final AggregatedHttpResponse response = createGroup("groups/foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); } - private static AggregatedHttpResponse createGroup(String groupName) { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups").set( - HttpHeaderNames.AUTHORIZATION, "Bearer anonymous").contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, "{\"group\": {\"name\":\"" + groupName + "\"}}").aggregate() - .join(); - } - @Test void createClusterViaHttp() throws Exception { final Cluster cluster = cluster("this_cluster_name_will_be_ignored_and_replaced", 1); - AggregatedHttpResponse response = createCluster("foo", "@invalid_cluster_id", cluster); + AggregatedHttpResponse response = createCluster("groups/foo", "@invalid_cluster_id", cluster, + dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.BAD_REQUEST); - response = createCluster("non-existent-group", "foo-cluster/1", cluster); + response = createCluster("groups/non-existent-group", "foo-cluster/1", cluster, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createCluster("foo", "foo-cluster/1", cluster); + response = createCluster("groups/foo", "foo-cluster/1", cluster, dogma.httpClient()); assertOk(response); final Cluster.Builder clusterBuilder = Cluster.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), clusterBuilder); final Cluster actualCluster = clusterBuilder.build(); final String clusterName = "groups/foo/clusters/foo-cluster/1"; assertThat(actualCluster).isEqualTo(cluster.toBuilder().setName(clusterName).build()); - checkResourceViaDiscoveryRequest(actualCluster, clusterName, true); - } - - private static AggregatedHttpResponse createCluster(String groupName, String clusterId, Cluster cluster) - throws IOException { - final RequestHeaders headers = - RequestHeaders.builder(HttpMethod.POST, - "/api/v1/xds/groups/" + groupName + "/clusters?cluster_id=" + clusterId) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(cluster)) - .aggregate().join(); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualCluster, clusterName, true)); } private static void assertOk(AggregatedHttpResponse response) { @@ -142,21 +127,23 @@ public void onCompleted() {} @Test void updateClusterViaHttp() throws Exception { final Cluster cluster = cluster("this_cluster_name_will_be_ignored_and_replaced", 1); - AggregatedHttpResponse response = updateCluster("foo-cluster/2", cluster); + AggregatedHttpResponse response = updateCluster("groups/foo", "foo-cluster/2", + cluster, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createCluster("foo", "foo-cluster/2", cluster); + response = createCluster("groups/foo", "foo-cluster/2", cluster, dogma.httpClient()); assertOk(response); final Cluster.Builder clusterBuilder = Cluster.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), clusterBuilder); final Cluster actualCluster = clusterBuilder.build(); final String clusterName = "groups/foo/clusters/foo-cluster/2"; assertThat(actualCluster).isEqualTo(cluster.toBuilder().setName(clusterName).build()); - checkResourceViaDiscoveryRequest(actualCluster, clusterName, true); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualCluster, clusterName, true)); final Cluster updatingCluster = cluster.toBuilder().setConnectTimeout( Duration.newBuilder().setSeconds(2).build()).setName(clusterName).build(); - response = updateCluster("foo-cluster/2", updatingCluster); + response = updateCluster("groups/foo", "foo-cluster/2", updatingCluster, dogma.httpClient()); assertOk(response); final Cluster.Builder clusterBuilder2 = Cluster.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), clusterBuilder2); @@ -166,15 +153,6 @@ void updateClusterViaHttp() throws Exception { () -> checkResourceViaDiscoveryRequest(actualCluster2, clusterName, true)); } - private static AggregatedHttpResponse updateCluster(String clusterId, Cluster cluster) throws IOException { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.PATCH, - "/api/v1/xds/groups/foo/clusters/" + clusterId) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(cluster)) - .aggregate().join(); - } - @Test void deleteClusterViaHttp() throws Exception { final String clusterName = "groups/foo/clusters/foo-cluster/3/4"; @@ -182,11 +160,12 @@ void deleteClusterViaHttp() throws Exception { assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); final Cluster cluster = cluster("this_cluster_name_will_be_ignored_and_replaced", 1); - response = createCluster("foo", "foo-cluster/3/4", cluster); + response = createCluster("groups/foo", "foo-cluster/3/4", cluster, dogma.httpClient()); assertOk(response); final Cluster actualCluster = cluster.toBuilder().setName(clusterName).build(); - checkResourceViaDiscoveryRequest(actualCluster, clusterName, true); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualCluster, clusterName, true)); // Add permission test. diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointServiceTest.java index b730d5f8c..2dcdd3d90 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/endpoint/v1/XdsEndpointServiceTest.java @@ -16,6 +16,8 @@ package com.linecorp.centraldogma.xds.endpoint.v1; import static com.linecorp.centraldogma.xds.internal.XdsServiceUtil.JSON_MESSAGE_MARSHALLER; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createEndpoint; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.endpoint; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.loadAssignment; import static org.assertj.core.api.Assertions.assertThat; @@ -61,29 +63,22 @@ class XdsEndpointServiceTest { @BeforeAll static void setup() { - final AggregatedHttpResponse response = createGroup("groups/foo"); + final AggregatedHttpResponse response = createGroup("groups/foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); } - private static AggregatedHttpResponse createGroup(String groupName) { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups") - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, "{\"group\": {\"name\":\"" + groupName + "\"}}") - .aggregate().join(); - } - @Test void createEndpointViaHttp() throws Exception { final ClusterLoadAssignment endpoint = loadAssignment("this_endpoint_name_will_be_ignored_and_replaced", "127.0.0.1", 8080); - AggregatedHttpResponse response = createEndpoint("foo", "@invalid_endpoint_id", endpoint); + AggregatedHttpResponse response = + createEndpoint("groups/foo", "@invalid_endpoint_id", endpoint, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.BAD_REQUEST); - response = createEndpoint("non-existent-group", "foo-endpoint/1", endpoint); + response = createEndpoint("groups/non-existent-group", "foo-endpoint/1", endpoint, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createEndpoint("foo", "foo-endpoint/1", endpoint); + response = createEndpoint("groups/foo", "foo-endpoint/1", endpoint, dogma.httpClient()); assertOk(response); final ClusterLoadAssignment.Builder endpointBuilder = ClusterLoadAssignment.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), endpointBuilder); @@ -91,19 +86,8 @@ void createEndpointViaHttp() throws Exception { final String clusterName = "groups/foo/clusters/foo-endpoint/1"; assertThat(actualEndpoint).isEqualTo( endpoint.toBuilder().setClusterName(clusterName).build()); - checkResourceViaDiscoveryRequest(actualEndpoint, clusterName); - } - - private static AggregatedHttpResponse createEndpoint( - String groupName, String endpointId, ClusterLoadAssignment endpoint) throws IOException { - final RequestHeaders headers = - RequestHeaders.builder(HttpMethod.POST, - "/api/v1/xds/groups/" + groupName + "/endpoints?endpoint_id=" + - endpointId) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(endpoint)) - .aggregate().join(); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualEndpoint, clusterName)); } private static void assertOk(AggregatedHttpResponse response) { @@ -145,6 +129,7 @@ public void onCompleted() {} final DiscoveryResponse discoveryResponse = queue.poll(300, TimeUnit.MILLISECONDS); assertThat(discoveryResponse).isNull(); } + requestStreamObserver.onCompleted(); } @Test @@ -154,14 +139,15 @@ void updateEndpointViaHttp() throws Exception { AggregatedHttpResponse response = updateEndpoint("foo-endpoint/2", endpoint); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createEndpoint("foo", "foo-endpoint/2", endpoint); + response = createEndpoint("groups/foo", "foo-endpoint/2", endpoint, dogma.httpClient()); assertOk(response); final ClusterLoadAssignment.Builder endpointBuilder = ClusterLoadAssignment.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), endpointBuilder); final ClusterLoadAssignment actualEndpoint = endpointBuilder.build(); final String clusterName = "groups/foo/clusters/foo-endpoint/2"; assertThat(actualEndpoint).isEqualTo(endpoint.toBuilder().setClusterName(clusterName).build()); - checkResourceViaDiscoveryRequest(actualEndpoint, clusterName); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualEndpoint, clusterName)); final ClusterLoadAssignment updatingEndpoint = endpoint.toBuilder() @@ -198,20 +184,20 @@ void deleteEndpointViaHttp() throws Exception { final ClusterLoadAssignment endpoint = loadAssignment("this_endpoint_name_will_be_ignored_and_replaced", "127.0.0.1", 8080); - response = createEndpoint("foo", "foo-endpoint/3/4", endpoint); + response = createEndpoint("groups/foo", "foo-endpoint/3/4", endpoint, dogma.httpClient()); assertOk(response); final ClusterLoadAssignment actualEndpoint = endpoint.toBuilder().setClusterName(clusterName).build(); - checkResourceViaDiscoveryRequest(actualEndpoint, clusterName); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualEndpoint, clusterName)); // Add permission test. response = deleteEndpoint(endpointName); assertOk(response); assertThat(response.contentUtf8()).isEqualTo("{}"); - await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( - () -> checkResourceViaDiscoveryRequest(null, clusterName)); + checkResourceViaDiscoveryRequest(null, clusterName); } private static AggregatedHttpResponse deleteEndpoint(String endpointName) { @@ -230,7 +216,7 @@ void viaStub() throws Exception { .build(XdsEndpointServiceBlockingStub.class); final ClusterLoadAssignment endpoint = loadAssignment("this_endpoint_name_will_be_ignored_and_replaced", "127.0.0.1", 8080); - ClusterLoadAssignment response = client.createEndpoint( + final ClusterLoadAssignment response = client.createEndpoint( CreateEndpointRequest.newBuilder() .setParent("groups/foo") .setEndpointId("foo-endpoint/5/6") @@ -238,7 +224,8 @@ void viaStub() throws Exception { .build()); final String clusterName = "groups/foo/clusters/foo-endpoint/5/6"; assertThat(response).isEqualTo(endpoint.toBuilder().setClusterName(clusterName).build()); - checkResourceViaDiscoveryRequest(response, clusterName); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(response, clusterName)); final ClusterLoadAssignment updatingEndpoint = endpoint.toBuilder() @@ -247,20 +234,20 @@ void viaStub() throws Exception { .setClusterName(clusterName).build(); final String endpointName = "groups/foo/endpoints/foo-endpoint/5/6"; - response = client.updateEndpoint( + final ClusterLoadAssignment response2 = client.updateEndpoint( UpdateEndpointRequest.newBuilder() .setEndpointName(endpointName) .setEndpoint(updatingEndpoint) .build()); - assertThat(response).isEqualTo(updatingEndpoint); - checkResourceViaDiscoveryRequest(response, clusterName); + assertThat(response2).isEqualTo(updatingEndpoint); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(response2, clusterName)); // No exception is thrown. final Empty ignored = client.deleteEndpoint( DeleteEndpointRequest.newBuilder() .setName(endpointName) .build()); - await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( - () -> checkResourceViaDiscoveryRequest(null, clusterName)); + checkResourceViaDiscoveryRequest(null, clusterName); } } diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupServiceTest.java index d2fab1512..1d42b17c6 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/group/v1/XdsGroupServiceTest.java @@ -15,6 +15,8 @@ */ package com.linecorp.centraldogma.xds.group.v1; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.deleteGroup; import static net.javacrumbs.jsonunit.fluent.JsonFluentAssert.assertThatJson; import static org.assertj.core.api.Assertions.assertThat; @@ -26,16 +28,13 @@ import com.linecorp.armeria.client.grpc.GrpcClients; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpHeaderNames; -import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.common.MediaType; -import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import com.linecorp.centraldogma.xds.group.v1.XdsGroupServiceGrpc.XdsGroupServiceBlockingStub; import io.grpc.Status; -class XdsGroupServiceTest { +final class XdsGroupServiceTest { @RegisterExtension static final CentralDogmaExtension dogma = new CentralDogmaExtension(); @@ -43,53 +42,37 @@ class XdsGroupServiceTest { @Test void createGroupViaHttp() { // Invalid name. - AggregatedHttpResponse response = createGroup("invalid/foo"); + AggregatedHttpResponse response = createGroup("invalid/foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.BAD_REQUEST); - response = createGroup("groups/foo"); + response = createGroup("groups/foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); assertThat(response.headers().get("grpc-status")).isEqualTo("0"); assertThatJson(response.contentUtf8()).isEqualTo("{\"name\":\"groups/foo\"}"); // Cannot create with the same name. - response = createGroup("groups/foo"); + response = createGroup("groups/foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.CONFLICT); assertThat(response.headers().get("grpc-status")) .isEqualTo(Integer.toString(Status.ALREADY_EXISTS.getCode().value())); } - private static AggregatedHttpResponse createGroup(String groupName) { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups") - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, "{\"group\": {\"name\":\"" + groupName + "\"}}") - .aggregate().join(); - } - @Test void deleteGroupViaHttp() { - AggregatedHttpResponse response = deleteGroup("groups/bar"); + AggregatedHttpResponse response = deleteGroup("groups/bar", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createGroup("groups/bar"); + response = createGroup("groups/bar", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); // Add permission test. - response = deleteGroup("groups/bar"); + response = deleteGroup("groups/bar", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); assertThat(response.headers().get("grpc-status")).isEqualTo("0"); assertThat(response.contentUtf8()).isEqualTo("{}"); } - private static AggregatedHttpResponse deleteGroup(String groupName) { - final RequestHeaders headers = - RequestHeaders.builder(HttpMethod.DELETE, "/api/v1/xds/" + groupName) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .build(); - return dogma.httpClient().execute(headers).aggregate().join(); - } - @Test void createAndDeleteGroupViaStub() { final XdsGroupServiceBlockingStub client = diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingMultipleClientsTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingMultipleClientsTest.java index 7c83b5337..a0cb42a50 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingMultipleClientsTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingMultipleClientsTest.java @@ -18,29 +18,25 @@ import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.CONFIG_SOURCE_CLUSTER_NAME; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.bootstrap; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createClusterAndCommit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createEndpointAndCommit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createXdsProject; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.cluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createCluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createEndpoint; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.loadAssignment; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.updateCluster; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.File; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; +import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.xds.ClusterRoot; import com.linecorp.armeria.xds.ClusterSnapshot; import com.linecorp.armeria.xds.XdsBootstrap; -import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor; -import com.linecorp.centraldogma.server.management.ServerStatusManager; -import com.linecorp.centraldogma.server.metadata.MetadataService; -import com.linecorp.centraldogma.server.storage.project.ProjectManager; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; @@ -52,29 +48,16 @@ final class CdsStreamingMultipleClientsTest { @RegisterExtension static final CentralDogmaExtension dogma = new CentralDogmaExtension(); - @TempDir - static File tempDir; - - @SuppressWarnings("NotNullFieldNotInitialized") - static MetadataService metadataService; - - // This method will be remove once XdsProjectService is added in a follow-up PR. - @BeforeAll - static void setup() { - final StandaloneCommandExecutor executor = new StandaloneCommandExecutor( - dogma.projectManager(), ForkJoinPool.commonPool(), - new ServerStatusManager(tempDir), null, null, null); - executor.start().join(); - metadataService = new MetadataService(dogma.projectManager(), executor); - } - @Test void updateClusterRootWithCorrespondingResource() throws Exception { - final String fooXdsProjectName = "foo"; - final String barXdsProjectName = "bar"; - final String fooClusterName = "foo/cluster"; - final String barClusterName = "bar/cluster"; - final Bootstrap bootstrap = bootstrap(dogma.httpClient().uri(), CONFIG_SOURCE_CLUSTER_NAME); + final String fooGroupName = "groups/foo"; + final String barGroupName = "groups/bar"; + final String fooClusterId = "foo-cluster"; + final String barClusterId = "bar-cluster"; + final String fooClusterName = fooGroupName + "/clusters/" + fooClusterId; + final String barClusterName = barGroupName + "/clusters/" + barClusterId; + final WebClient webClient = dogma.httpClient(); + final Bootstrap bootstrap = bootstrap(webClient.uri(), CONFIG_SOURCE_CLUSTER_NAME); try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { final ClusterRoot fooClusterRoot = xdsBootstrap.clusterRoot(fooClusterName); final AtomicReference fooSnapshotCaptor = new AtomicReference<>(); @@ -89,11 +72,12 @@ void updateClusterRootWithCorrespondingResource() throws Exception { return fooSnapshotCaptor.get() == null && barSnapshotCaptor.get() == null; }); - final ProjectManager projectManager = dogma.projectManager(); - createXdsProject(projectManager, metadataService, fooXdsProjectName); - final ClusterLoadAssignment fooEndpoint = - createEndpointAndCommit(fooXdsProjectName, fooClusterName, projectManager); - Cluster fooCluster = createClusterAndCommit(fooXdsProjectName, fooClusterName, 1, projectManager); + createGroup(fooGroupName, webClient); + + final ClusterLoadAssignment fooEndpoint = loadAssignment(fooClusterName, "127.0.0.1", 8080); + createEndpoint(fooGroupName, fooClusterId, fooEndpoint, webClient); + Cluster fooCluster = cluster(fooClusterName, 1); + createCluster(fooGroupName, fooClusterId, fooCluster, webClient); await().until(() -> fooSnapshotCaptor.get() != null); ClusterSnapshot fooClusterSnapshot = fooSnapshotCaptor.getAndSet(null); @@ -102,11 +86,11 @@ void updateClusterRootWithCorrespondingResource() throws Exception { // bar is not updated. await().pollDelay(200, TimeUnit.MILLISECONDS).until(() -> barSnapshotCaptor.get() == null); - createXdsProject(projectManager, metadataService, barXdsProjectName); - final ClusterLoadAssignment barEndpoint = - createEndpointAndCommit(barXdsProjectName, barClusterName, projectManager); - final Cluster barCluster = createClusterAndCommit(barXdsProjectName, barClusterName, 1, - projectManager); + createGroup(barGroupName, webClient); + final ClusterLoadAssignment barEndpoint = loadAssignment(barClusterName, "127.0.0.1", 8081); + createEndpoint(barGroupName, barClusterId, barEndpoint, webClient); + final Cluster barCluster = cluster(barClusterName, 1); + createCluster(barGroupName, barClusterId, barCluster, webClient); await().until(() -> barSnapshotCaptor.get() != null); final ClusterSnapshot barClusterSnapshot = barSnapshotCaptor.getAndSet(null); assertThat(barClusterSnapshot.xdsResource().resource()).isEqualTo(barCluster); @@ -116,7 +100,8 @@ void updateClusterRootWithCorrespondingResource() throws Exception { await().pollDelay(200, TimeUnit.MILLISECONDS).until(() -> fooSnapshotCaptor.get() == null); // Change the configuration. - fooCluster = createClusterAndCommit(fooXdsProjectName, fooClusterName, 2, projectManager); + fooCluster = cluster(fooClusterName, 2); + updateCluster(fooGroupName, fooClusterId, fooCluster, webClient); await().until(() -> fooSnapshotCaptor.get() != null); fooClusterSnapshot = fooSnapshotCaptor.getAndSet(null); assertThat(fooClusterSnapshot.xdsResource().resource()).isEqualTo(fooCluster); diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java index 8325270a9..294908fed 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/CdsStreamingTest.java @@ -16,31 +16,26 @@ package com.linecorp.centraldogma.xds.internal; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createClusterAndCommit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createXdsProject; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.removeXdsProject; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.cluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createCluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.deleteGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.updateCluster; import static org.assertj.core.api.Assertions.assertThat; -import java.io.File; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; import com.google.protobuf.Any; import com.google.protobuf.InvalidProtocolBufferException; +import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.client.grpc.GrpcClients; -import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor; -import com.linecorp.centraldogma.server.management.ServerStatusManager; -import com.linecorp.centraldogma.server.metadata.MetadataService; -import com.linecorp.centraldogma.server.storage.project.ProjectManager; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import io.envoyproxy.controlplane.cache.Resources; @@ -55,32 +50,19 @@ final class CdsStreamingTest { @RegisterExtension static final CentralDogmaExtension dogma = new CentralDogmaExtension(); - @TempDir - static File tempDir; - - @SuppressWarnings("NotNullFieldNotInitialized") - static MetadataService metadataService; - - // This method will be remove once XdsProjectService is added in a follow-up PR. - @BeforeAll - static void setup() { - final StandaloneCommandExecutor executor = new StandaloneCommandExecutor( - dogma.projectManager(), ForkJoinPool.commonPool(), - new ServerStatusManager(tempDir), null, null, null); - executor.start().join(); - metadataService = new MetadataService(dogma.projectManager(), executor); - } - @Test void cdsStream() throws Exception { - final String fooXdsProjectName = "foo"; - final ProjectManager projectManager = dogma.projectManager(); - createXdsProject(projectManager, metadataService, fooXdsProjectName); + final String fooGroupName = "groups/foo"; + final WebClient webClient = dogma.httpClient(); + createGroup(fooGroupName, webClient); + + final String fooClusterId = "foo-cluster"; + final String fooClusterName = fooGroupName + "/clusters/" + fooClusterId; + Cluster fooCluster = cluster(fooClusterName, 1); + createCluster(fooGroupName, fooClusterId, fooCluster, webClient); - final String fooClusterName = "foo/cluster"; - Cluster fooCluster = createClusterAndCommit(fooXdsProjectName, fooClusterName, 1, projectManager); final ClusterDiscoveryServiceStub client = GrpcClients.newClient( - dogma.httpClient().uri(), ClusterDiscoveryServiceStub.class); + webClient.uri(), ClusterDiscoveryServiceStub.class); final BlockingQueue queue = new ArrayBlockingQueue<>(2); final StreamObserver requestStreamObserver = client.streamClusters( new StreamObserver() { @@ -107,7 +89,8 @@ public void onCompleted() {} assertThat(queue.poll(300, TimeUnit.MILLISECONDS)).isNull(); // Change the configuration. - fooCluster = createClusterAndCommit(fooXdsProjectName, fooClusterName, 2, projectManager); + fooCluster = cluster(fooClusterName, 2); + updateCluster(fooGroupName, fooClusterId, fooCluster, webClient); discoveryResponse = queue.take(); final String versionInfo2 = discoveryResponse.getVersionInfo(); assertThat(versionInfo2).isNotEqualTo(versionInfo1); @@ -118,11 +101,13 @@ public void onCompleted() {} assertThat(queue.poll(300, TimeUnit.MILLISECONDS)).isNull(); // Add another cluster - final String barXdsProjectName = "bar"; - createXdsProject(projectManager, metadataService, barXdsProjectName); - final String barClusterName = "bar/cluster"; - final Cluster barCluster = createClusterAndCommit(barXdsProjectName, barClusterName, 2, - projectManager); + final String barGroupName = "groups/bar"; + createGroup(barGroupName, webClient); + final String barClusterId = "bar-cluster"; + final String barClusterName = barGroupName + "/clusters/" + barClusterId; + final Cluster barCluster = cluster(barClusterName, 1); + createCluster(barGroupName, barClusterId, barCluster, webClient); + discoveryResponse = queue.take(); final String versionInfo3 = discoveryResponse.getVersionInfo(); assertThat(versionInfo3.length()).isEqualTo(64); @@ -142,8 +127,8 @@ public void onCompleted() {} // No more discovery response. assertThat(queue.poll(300, TimeUnit.MILLISECONDS)).isNull(); - // Remove bar xDS project. - removeXdsProject(projectManager, metadataService, barXdsProjectName); + // Remove bar group. + deleteGroup(barGroupName, webClient); discoveryResponse = queue.take(); final String versionInfo4 = discoveryResponse.getVersionInfo(); assertDiscoveryResponse(versionInfo4, discoveryResponse, fooCluster, queue, "3"); diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/DiscoveryServiceItTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/DiscoveryServiceItTest.java index 3dbd6195d..1eeb42d4e 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/DiscoveryServiceItTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/DiscoveryServiceItTest.java @@ -17,25 +17,23 @@ package com.linecorp.centraldogma.xds.internal; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.cluster; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.clusterFileName; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.commit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createXdsProject; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createCluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createListener; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createRoute; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.httpConnectionManager; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.listenerFileName; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.rdsConfigSource; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.routeConfiguration; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.routeFileName; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.updateRoute; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.File; -import java.util.concurrent.ForkJoinPool; +import java.io.IOException; import java.util.concurrent.TimeUnit; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; import org.testcontainers.containers.Network; @@ -47,9 +45,6 @@ import com.linecorp.armeria.client.WebClient; import com.linecorp.armeria.common.AggregatedHttpResponse; import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor; -import com.linecorp.centraldogma.server.management.ServerStatusManager; -import com.linecorp.centraldogma.server.metadata.MetadataService; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import io.envoyproxy.envoy.config.cluster.v3.Cluster; @@ -67,14 +62,14 @@ @Testcontainers(disabledWithoutDocker = true) final class DiscoveryServiceItTest { - private static final String FOO_XDS_PROJECT_NAME = "foo"; - private static final String ECHO_CLUSTER = "foo/echo_cluster"; + private static final String FOO_GROUP_NAME = "groups/foo"; + private static final String ECHO_CLUSTER = "groups/foo/clusters/echo-cluster"; private static final String ECHO_CLUSTER_ADDRESS = "echo_upstream"; - private static final String NO_ECHO_CLUSTER = "foo/no_echo_cluster"; + private static final String NO_ECHO_CLUSTER = "groups/foo/clusters/no-echo-cluster"; private static final String NO_ECHO_CLUSTER_ADDRESS = "no_echo_upstream"; - private static final String ECHO_ROUTE = "foo/echo_route"; - private static final String ECHO_LISTENER = "foo/echo_listener"; + private static final String ECHO_ROUTE = "groups/foo/routes/echo-route"; + private static final String ECHO_LISTENER = "groups/foo/listeners/echo-listener"; // Using 10000 is fine because this port is used in the container. The port is exposed with a different // port number which you can get via ContainerState.getMappedPort(). @@ -89,9 +84,6 @@ protected boolean runForEachTest() { } }; - @TempDir - static File tempDir; - private static final Network NETWORK = Network.newNetwork(); @Container @@ -105,25 +97,16 @@ protected boolean runForEachTest() { .withNetworkAliases(NO_ECHO_CLUSTER_ADDRESS); @BeforeEach - void setUp() { - final StandaloneCommandExecutor executor = new StandaloneCommandExecutor( - dogma.projectManager(), ForkJoinPool.commonPool(), - new ServerStatusManager(tempDir), null, null, null); - executor.start().join(); - final MetadataService metadataService = new MetadataService(dogma.projectManager(), executor); - createXdsProject(dogma.projectManager(), metadataService, FOO_XDS_PROJECT_NAME); - final Cluster echoCluster = createEchoCluster(true); - commit(echoCluster, dogma.projectManager(), FOO_XDS_PROJECT_NAME, ECHO_CLUSTER, - clusterFileName(FOO_XDS_PROJECT_NAME, ECHO_CLUSTER)); - final Cluster noEchoCluster = createEchoCluster(false); - commit(noEchoCluster, dogma.projectManager(), FOO_XDS_PROJECT_NAME, NO_ECHO_CLUSTER, - clusterFileName(FOO_XDS_PROJECT_NAME, NO_ECHO_CLUSTER)); + void setUp() throws IOException { + createGroup(FOO_GROUP_NAME, dogma.httpClient()); + final Cluster echoCluster = echoCluster(true); + createCluster(FOO_GROUP_NAME, "echo-cluster", echoCluster, dogma.httpClient()); + final Cluster noEchoCluster = echoCluster(false); + createCluster(FOO_GROUP_NAME, "no-echo-cluster", noEchoCluster, dogma.httpClient()); final RouteConfiguration route = createEchoRoute(true); - commit(route, dogma.projectManager(), FOO_XDS_PROJECT_NAME, ECHO_ROUTE, - routeFileName(FOO_XDS_PROJECT_NAME, ECHO_ROUTE)); + createRoute(FOO_GROUP_NAME, "echo-route", route, dogma.httpClient()); final Listener listener = createEchoListener(); - commit(listener, dogma.projectManager(), FOO_XDS_PROJECT_NAME, ECHO_LISTENER, - listenerFileName(FOO_XDS_PROJECT_NAME, ECHO_LISTENER)); + createListener(FOO_GROUP_NAME, "echo-listener", listener, dogma.httpClient()); } @AfterAll @@ -133,7 +116,7 @@ static void afterAll() { @CsvSource({ "envoy/xds.config.yaml", "envoy/ads.config.yaml" }) @ParameterizedTest - void validateTestRequestToEchoClusterViaEnvoy(String configFile) throws InterruptedException { + void validateTestRequestToEchoClusterViaEnvoy(String configFile) throws Exception { org.testcontainers.Testcontainers.exposeHostPorts(dogma.serverAddress().getPort()); try (EnvoyContainer envoy = new EnvoyContainer(configFile, () -> dogma.serverAddress().getPort()) @@ -148,11 +131,9 @@ void validateTestRequestToEchoClusterViaEnvoy(String configFile) throws Interrup assertThat(res.headers().status()).isSameAs(HttpStatus.OK); assertThat(res.contentUtf8()).contains("\"body\": \"Hello!\""); - // Change the route to noEchoCluster. + // Change the route to no-echo-cluster. final RouteConfiguration route = createEchoRoute(false); - commit(route, dogma.projectManager(), FOO_XDS_PROJECT_NAME, ECHO_ROUTE, - routeFileName(FOO_XDS_PROJECT_NAME, ECHO_ROUTE)); - + updateRoute(FOO_GROUP_NAME, "echo-route", route, dogma.httpClient()); await().atMost(5, TimeUnit.SECONDS) .ignoreExceptions() .untilAsserted(() -> { @@ -160,6 +141,7 @@ void validateTestRequestToEchoClusterViaEnvoy(String configFile) throws Interrup .post("/", "Hello!") .aggregate().join(); assertThat(res2.headers().status()).isSameAs(HttpStatus.OK); + System.err.println(res2.contentUtf8()); assertThat(res2.content().isEmpty()).isTrue(); }); // Envoy closes the connection so a ClosedSessionException is raised in the Server. @@ -167,7 +149,7 @@ void validateTestRequestToEchoClusterViaEnvoy(String configFile) throws Interrup } } - private static Cluster createEchoCluster(boolean echo) { + private static Cluster echoCluster(boolean echo) { final String clusterName = echo ? ECHO_CLUSTER : NO_ECHO_CLUSTER; final ClusterLoadAssignment loadAssignment = XdsTestUtil.loadAssignment(clusterName, echo ? ECHO_CLUSTER_ADDRESS : NO_ECHO_CLUSTER_ADDRESS, diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/LdsStreamingMultipleClientsTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/LdsStreamingMultipleClientsTest.java index 9495270d9..910c68502 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/LdsStreamingMultipleClientsTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/LdsStreamingMultipleClientsTest.java @@ -18,34 +18,31 @@ import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.CONFIG_SOURCE_CLUSTER_NAME; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.bootstrap; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createClusterAndCommit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createEndpointAndCommit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createListenerAndCommit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createRouteConfigurationAndCommit; -import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createXdsProject; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.cluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createCluster; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createEndpoint; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createListener; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createRoute; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.exampleListener; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.loadAssignment; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.routeConfiguration; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.updateListener; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.File; import java.util.List; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; -import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.RegisterExtension; -import org.junit.jupiter.api.io.TempDir; import com.linecorp.armeria.xds.ClusterSnapshot; import com.linecorp.armeria.xds.ListenerRoot; import com.linecorp.armeria.xds.ListenerSnapshot; import com.linecorp.armeria.xds.RouteSnapshot; import com.linecorp.armeria.xds.XdsBootstrap; -import com.linecorp.centraldogma.server.command.StandaloneCommandExecutor; -import com.linecorp.centraldogma.server.management.ServerStatusManager; -import com.linecorp.centraldogma.server.metadata.MetadataService; -import com.linecorp.centraldogma.server.storage.project.ProjectManager; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; @@ -59,39 +56,21 @@ final class LdsStreamingMultipleClientsTest { @RegisterExtension static final CentralDogmaExtension dogma = new CentralDogmaExtension(); - @TempDir - static File tempDir; - - @SuppressWarnings("NotNullFieldNotInitialized") - static MetadataService metadataService; - - // This method will be remove once XdsProjectService is added in a follow-up PR. - @BeforeAll - static void setup() { - final StandaloneCommandExecutor executor = new StandaloneCommandExecutor( - dogma.projectManager(), ForkJoinPool.commonPool(), - new ServerStatusManager(tempDir), null, null, null); - executor.start().join(); - metadataService = new MetadataService(dogma.projectManager(), executor); - } - @Test void updateListenerRootWithCorrespondingResource() throws Exception { - final String fooXdsProjectName = "foo"; - final String barXdsProjectName = "bar"; - final String fooListenerName = "foo/listener"; - final String fooRouteName = "foo/route"; - final String fooClusterName = "foo/cluster"; - final String barListenerName = "bar/listener"; - final ProjectManager projectManager = dogma.projectManager(); - createXdsProject(projectManager, metadataService, fooXdsProjectName); - final ClusterLoadAssignment fooEndpoint = - createEndpointAndCommit(fooXdsProjectName, fooClusterName, projectManager); - final Cluster fooCluster = createClusterAndCommit(fooXdsProjectName, fooClusterName, 1, projectManager); - final RouteConfiguration fooRoute = createRouteConfigurationAndCommit(fooXdsProjectName, - fooRouteName, - fooClusterName, - projectManager); + final String fooGroupName = "groups/foo"; + final String barGroupName = "groups/bar"; + final String fooListenerName = "groups/foo/listeners/foo-listener"; + final String fooRouteName = "groups/foo/routes/foo-route"; + final String fooClusterName = "groups/foo/clusters/foo-cluster"; + final String barListenerName = "groups/bar/listeners/bar-listener"; + createGroup(fooGroupName, dogma.httpClient()); + final ClusterLoadAssignment fooEndpoint = loadAssignment(fooClusterName, "127.0.0.1", 8080); + createEndpoint(fooGroupName, "foo-cluster", fooEndpoint, dogma.httpClient()); + final Cluster fooCluster = cluster(fooClusterName, 1); + createCluster(fooGroupName, "foo-cluster", fooCluster, dogma.httpClient()); + final RouteConfiguration fooRoute = routeConfiguration(fooRouteName, fooClusterName); + createRoute(fooGroupName, "foo-route", fooRoute, dogma.httpClient()); final Bootstrap bootstrap = bootstrap(dogma.httpClient().uri(), CONFIG_SOURCE_CLUSTER_NAME); try (XdsBootstrap xdsBootstrap = XdsBootstrap.of(bootstrap)) { @@ -108,9 +87,8 @@ void updateListenerRootWithCorrespondingResource() throws Exception { return fooSnapshotCaptor.get() == null && barSnapshotCaptor.get() == null; }); - Listener fooListener = - createListenerAndCommit(fooXdsProjectName, fooListenerName, fooRouteName, "a", - projectManager); + Listener fooListener = exampleListener(fooListenerName, fooRouteName, "stats-a"); + createListener(fooGroupName, "foo-listener", fooListener, dogma.httpClient()); await().until(() -> fooSnapshotCaptor.get() != null); ListenerSnapshot fooListenerSnapshot = fooSnapshotCaptor.getAndSet(null); @@ -125,18 +103,18 @@ void updateListenerRootWithCorrespondingResource() throws Exception { // bar is not updated. await().pollDelay(200, TimeUnit.MILLISECONDS).until(() -> barSnapshotCaptor.get() == null); - createXdsProject(projectManager, metadataService, barXdsProjectName); - final Listener barListener = - createListenerAndCommit(barXdsProjectName, barListenerName, fooRouteName, "a", - projectManager); + createGroup(barGroupName, dogma.httpClient()); + final Listener barListener = exampleListener(barListenerName, fooRouteName, "stats-a"); + createListener(barGroupName, "bar-listener", barListener, dogma.httpClient()); + await().until(() -> barSnapshotCaptor.get() != null); final ListenerSnapshot barListenerSnapshot = barSnapshotCaptor.getAndSet(null); assertThat(barListenerSnapshot.xdsResource().resource()).isEqualTo(barListener); assertThat(barListenerSnapshot.routeSnapshot().xdsResource().resource()).isEqualTo(fooRoute); // Change the configuration. - fooListener = createListenerAndCommit(fooXdsProjectName, fooListenerName, fooRouteName, "b", - projectManager); + fooListener = exampleListener(fooListenerName, fooRouteName, "stats-b"); + updateListener(fooGroupName, "foo-listener", fooListener, dogma.httpClient()); await().until(() -> fooSnapshotCaptor.get() != null); fooListenerSnapshot = fooSnapshotCaptor.getAndSet(null); assertThat(fooListenerSnapshot.xdsResource().resource()).isEqualTo(fooListener); diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java index 7c02d7f46..869698cf6 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/internal/XdsTestUtil.java @@ -15,27 +15,22 @@ */ package com.linecorp.centraldogma.xds.internal; -import static com.linecorp.centraldogma.xds.internal.ControlPlanePlugin.CLUSTERS_DIRECTORY; -import static com.linecorp.centraldogma.xds.internal.ControlPlanePlugin.ENDPOINTS_DIRECTORY; -import static com.linecorp.centraldogma.xds.internal.ControlPlanePlugin.LISTENERS_DIRECTORY; -import static com.linecorp.centraldogma.xds.internal.ControlPlanePlugin.ROUTES_DIRECTORY; -import static com.linecorp.centraldogma.xds.internal.ControlPlanePlugin.XDS_CENTRAL_DOGMA_PROJECT; +import static com.linecorp.centraldogma.xds.internal.XdsServiceUtil.JSON_MESSAGE_MARSHALLER; +import java.io.IOException; import java.net.URI; -import com.fasterxml.jackson.databind.JsonNode; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; import com.google.protobuf.Any; -import com.google.protobuf.InvalidProtocolBufferException; -import com.google.protobuf.MessageOrBuilder; import com.google.protobuf.util.Durations; -import com.linecorp.centraldogma.common.Author; -import com.linecorp.centraldogma.common.Change; -import com.linecorp.centraldogma.common.Revision; -import com.linecorp.centraldogma.server.metadata.MetadataService; -import com.linecorp.centraldogma.server.storage.project.ProjectManager; +import com.linecorp.armeria.client.WebClient; +import com.linecorp.armeria.common.AggregatedHttpResponse; +import com.linecorp.armeria.common.HttpHeaderNames; +import com.linecorp.armeria.common.HttpMethod; +import com.linecorp.armeria.common.MediaType; +import com.linecorp.armeria.common.RequestHeaders; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap; import io.envoyproxy.envoy.config.bootstrap.v3.Bootstrap.DynamicResources; @@ -72,14 +67,20 @@ public final class XdsTestUtil { static final String CONFIG_SOURCE_CLUSTER_NAME = "dogma/cluster"; - static void createXdsProject(ProjectManager pm, MetadataService metadataService, String xdsProjectName) { - pm.get(XDS_CENTRAL_DOGMA_PROJECT).repos().create(xdsProjectName, Author.SYSTEM); - metadataService.addRepo(Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, xdsProjectName).join(); + public static AggregatedHttpResponse createGroup(String groupName, WebClient webClient) { + final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups") + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, "{\"group\": {\"name\":\"" + groupName + "\"}}") + .aggregate().join(); } - static void removeXdsProject(ProjectManager pm, MetadataService metadataService, String xdsProjectName) { - pm.get(XDS_CENTRAL_DOGMA_PROJECT).repos().remove(xdsProjectName); - metadataService.removeRepo(Author.SYSTEM, XDS_CENTRAL_DOGMA_PROJECT, xdsProjectName).join(); + public static AggregatedHttpResponse deleteGroup(String groupName, WebClient webClient) { + final RequestHeaders headers = + RequestHeaders.builder(HttpMethod.DELETE, "/api/v1/xds/" + groupName) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .build(); + return webClient.execute(headers).aggregate().join(); } public static LbEndpoint endpoint(String address, int port) { @@ -256,72 +257,88 @@ static VirtualHost virtualHost(String name, String... clusterNames) { return builder.build(); } - static ClusterLoadAssignment createEndpointAndCommit(String xdsProjectName, String clusterName, - ProjectManager projectManager) { - final String fileName = endpointFileName(xdsProjectName, clusterName); - final ClusterLoadAssignment endpoint = loadAssignment(clusterName, "localhost", 1); - commit(endpoint, projectManager, xdsProjectName, clusterName, fileName); - return endpoint; + public static AggregatedHttpResponse createEndpoint( + String groupName, String endpointId, + ClusterLoadAssignment endpoint, WebClient webClient) throws IOException { + final RequestHeaders headers = + RequestHeaders.builder(HttpMethod.POST, + "/api/v1/xds/" + groupName + "/endpoints?endpoint_id=" + + endpointId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(endpoint)) + .aggregate().join(); } - static Cluster createClusterAndCommit(String xdsProjectName, String clusterName, int connectTimeoutSeconds, - ProjectManager projectManager) { - final String fileName = clusterFileName(xdsProjectName, clusterName); - final Cluster cluster = cluster(clusterName, connectTimeoutSeconds); - commit(cluster, projectManager, xdsProjectName, clusterName, fileName); - return cluster; + public static AggregatedHttpResponse createCluster( + String groupName, String clusterId, Cluster cluster, WebClient webClient) + throws IOException { + final RequestHeaders headers = + RequestHeaders.builder(HttpMethod.POST, + "/api/v1/xds/" + groupName + "/clusters?cluster_id=" + clusterId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(cluster)) + .aggregate().join(); } - static RouteConfiguration createRouteConfigurationAndCommit( - String xdsProjectName, String routeName, String clusterName, ProjectManager projectManager) { - final String fileName = routeFileName(xdsProjectName, routeName); - final RouteConfiguration routeConfiguration = routeConfiguration(routeName, clusterName); - commit(routeConfiguration, projectManager, xdsProjectName, routeName, fileName); - return routeConfiguration; + public static AggregatedHttpResponse updateCluster( + String groupName, String clusterId, Cluster cluster, WebClient webClient) throws IOException { + final RequestHeaders headers = + RequestHeaders.builder(HttpMethod.PATCH, + "/api/v1/xds/" + groupName + "/clusters/" + clusterId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(cluster)) + .aggregate().join(); } - static Listener createListenerAndCommit(String xdsProjectName, String listenerName, String routeName, - String statPrefix, ProjectManager projectManager) { - final String fileName = listenerFileName(xdsProjectName, listenerName); - final Listener listener = exampleListener(listenerName, routeName, statPrefix); - commit(listener, projectManager, xdsProjectName, routeName, fileName); - return listener; + public static AggregatedHttpResponse createListener( + String groupName, String listenerId, + Listener listener, WebClient webClient) throws IOException { + final RequestHeaders headers = + RequestHeaders.builder(HttpMethod.POST, + "/api/v1/xds/" + groupName + "/listeners?listener_id=" + + listenerId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(listener)) + .aggregate().join(); } - static String clusterFileName(String xdsProjectName, String clusterName) { - assert clusterName.startsWith(xdsProjectName + '/'); - return CLUSTERS_DIRECTORY + clusterName.substring(xdsProjectName.length() + 1) + ".json"; + public static AggregatedHttpResponse updateListener( + String groupName, String listenerId, Listener listener, WebClient webClient) throws IOException { + final RequestHeaders headers = + RequestHeaders.builder(HttpMethod.PATCH, + "/api/v1/xds/" + groupName + "/listeners/" + listenerId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(listener)) + .aggregate().join(); } - static String endpointFileName(String xdsProjectName, String clusterName) { - assert clusterName.startsWith(xdsProjectName + '/'); - return ENDPOINTS_DIRECTORY + clusterName.substring(xdsProjectName.length() + 1) + ".json"; + public static AggregatedHttpResponse createRoute( + String groupName, String routeId, + RouteConfiguration route, WebClient webClient) throws IOException { + final RequestHeaders headers = + RequestHeaders.builder(HttpMethod.POST, + "/api/v1/xds/" + groupName + "/routes?route_id=" + + routeId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(route)) + .aggregate().join(); } - static String listenerFileName(String xdsProjectName, String listenerName) { - assert listenerName.startsWith(xdsProjectName + '/'); - return LISTENERS_DIRECTORY + listenerName.substring(xdsProjectName.length() + 1) + ".json"; - } - - static String routeFileName(String xdsProjectName, String routeName) { - assert routeName.startsWith(xdsProjectName + '/'); - return ROUTES_DIRECTORY + routeName.substring(xdsProjectName.length() + 1) + ".json"; - } - - static void commit(MessageOrBuilder message, ProjectManager projectManager, - String repoName, String resourceName, String fileName) { - final String json; - try { - json = JsonFormatUtil.printer().print(message); - } catch (InvalidProtocolBufferException e) { - // Should never reach here. - throw new Error(e); - } - final Change xdsResource = Change.ofJsonUpsert(fileName, json); - projectManager.get(XDS_CENTRAL_DOGMA_PROJECT) - .repos() - .get(repoName) - .commit(Revision.HEAD, 0, Author.SYSTEM, "Add " + resourceName, xdsResource).join(); + public static AggregatedHttpResponse updateRoute( + String groupName, String routeId, + RouteConfiguration route, WebClient webClient) throws IOException { + final RequestHeaders headers = RequestHeaders.builder(HttpMethod.PATCH, + "/api/v1/xds/" + groupName + "/routes/" + routeId) + .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") + .contentType(MediaType.JSON_UTF_8).build(); + return webClient.execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(route)) + .aggregate().join(); } private XdsTestUtil() {} diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerServiceTest.java index 3d7b578a4..dce096ee9 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/listener/v1/XdsListenerServiceTest.java @@ -16,11 +16,13 @@ package com.linecorp.centraldogma.xds.listener.v1; import static com.linecorp.centraldogma.xds.internal.XdsServiceUtil.JSON_MESSAGE_MARSHALLER; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createListener; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.exampleListener; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.updateListener; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.IOException; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -39,7 +41,6 @@ import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import com.linecorp.centraldogma.xds.listener.v1.XdsListenerServiceGrpc.XdsListenerServiceBlockingStub; @@ -58,48 +59,30 @@ class XdsListenerServiceTest { @BeforeAll static void setup() { - final AggregatedHttpResponse response = createGroup("groups/foo"); + final AggregatedHttpResponse response = createGroup("groups/foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); } - private static AggregatedHttpResponse createGroup(String groupName) { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups") - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, "{\"group\": {\"name\":\"" + groupName + "\"}}") - .aggregate().join(); - } - @Test void createListenerViaHttp() throws Exception { final Listener listener = exampleListener("this_listener_name_will_be_ignored_and_replaced", "groups/foo/routes/foo-route", "stats"); - AggregatedHttpResponse response = createListener("foo", "@invalid_listener_id", listener); + AggregatedHttpResponse response = createListener("groups/foo", "@invalid_listener_id", + listener, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.BAD_REQUEST); - response = createListener("non-existent-group", "foo-listener/1", listener); + response = createListener("groups/non-existent-group", "foo-listener/1", listener, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createListener("foo", "foo-listener/1", listener); + response = createListener("groups/foo", "foo-listener/1", listener, dogma.httpClient()); assertOk(response); final Listener.Builder listenerBuilder = Listener.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), listenerBuilder); final Listener actualListener = listenerBuilder.build(); final String listenerName = "groups/foo/listeners/foo-listener/1"; assertThat(actualListener).isEqualTo(listener.toBuilder().setName(listenerName).build()); - checkResourceViaDiscoveryRequest(actualListener, listenerName, true); - } - - private static AggregatedHttpResponse createListener( - String groupName, String listenerId, Listener listener) throws IOException { - final RequestHeaders headers = - RequestHeaders.builder(HttpMethod.POST, - "/api/v1/xds/groups/" + groupName + "/listeners?listener_id=" + - listenerId) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(listener)) - .aggregate().join(); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualListener, listenerName, true)); } private static void assertOk(AggregatedHttpResponse response) { @@ -147,22 +130,24 @@ public void onCompleted() {} void updateListenerViaHttp() throws Exception { final Listener listener = exampleListener("this_listener_name_will_be_ignored_and_replaced", "groups/foo/routes/foo-route", "stats"); - AggregatedHttpResponse response = updateListener("foo-listener/2", listener); + AggregatedHttpResponse response = updateListener("groups/foo", "foo-listener/2", listener, + dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createListener("foo", "foo-listener/2", listener); + response = createListener("groups/foo", "foo-listener/2", listener, dogma.httpClient()); assertOk(response); final Listener.Builder listenerBuilder = Listener.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), listenerBuilder); final Listener actualListener = listenerBuilder.build(); final String listenerName = "groups/foo/listeners/foo-listener/2"; assertThat(actualListener).isEqualTo(listener.toBuilder().setName(listenerName).build()); - checkResourceViaDiscoveryRequest(actualListener, listenerName, true); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualListener, listenerName, true)); final Listener updatingListener = listener.toBuilder() .setStatPrefix("updated_stats") .setName(listenerName).build(); - response = updateListener("foo-listener/2", updatingListener); + response = updateListener("groups/foo", "foo-listener/2", updatingListener, dogma.httpClient()); assertOk(response); final Listener.Builder listenerBuilder2 = Listener.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), listenerBuilder2); @@ -172,16 +157,6 @@ void updateListenerViaHttp() throws Exception { () -> checkResourceViaDiscoveryRequest(actualListener2, listenerName, true)); } - private static AggregatedHttpResponse updateListener( - String listenerId, Listener listener) throws IOException { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.PATCH, - "/api/v1/xds/groups/foo/listeners/" + listenerId) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(listener)) - .aggregate().join(); - } - @Test void deleteListenerViaHttp() throws Exception { final String listenerName = "groups/foo/listeners/foo-listener/3/4"; @@ -190,11 +165,12 @@ void deleteListenerViaHttp() throws Exception { final Listener listener = exampleListener("this_listener_name_will_be_ignored_and_replaced", "groups/foo/routes/foo-route", "stats"); - response = createListener("foo", "foo-listener/3/4", listener); + response = createListener("groups/foo", "foo-listener/3/4", listener, dogma.httpClient()); assertOk(response); final Listener actualListener = listener.toBuilder().setName(listenerName).build(); - checkResourceViaDiscoveryRequest(actualListener, listenerName, true); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualListener, listenerName, true)); // Add permission test. diff --git a/xds/src/test/java/com/linecorp/centraldogma/xds/route/v1/XdsRouteServiceTest.java b/xds/src/test/java/com/linecorp/centraldogma/xds/route/v1/XdsRouteServiceTest.java index 826877bff..37ba33d6b 100644 --- a/xds/src/test/java/com/linecorp/centraldogma/xds/route/v1/XdsRouteServiceTest.java +++ b/xds/src/test/java/com/linecorp/centraldogma/xds/route/v1/XdsRouteServiceTest.java @@ -16,11 +16,13 @@ package com.linecorp.centraldogma.xds.route.v1; import static com.linecorp.centraldogma.xds.internal.XdsServiceUtil.JSON_MESSAGE_MARSHALLER; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createGroup; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.createRoute; import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.routeConfiguration; +import static com.linecorp.centraldogma.xds.internal.XdsTestUtil.updateRoute; import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; -import java.io.IOException; import java.util.List; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; @@ -39,7 +41,6 @@ import com.linecorp.armeria.common.HttpHeaderNames; import com.linecorp.armeria.common.HttpMethod; import com.linecorp.armeria.common.HttpStatus; -import com.linecorp.armeria.common.MediaType; import com.linecorp.armeria.common.RequestHeaders; import com.linecorp.centraldogma.testing.junit.CentralDogmaExtension; import com.linecorp.centraldogma.xds.route.v1.XdsRouteServiceGrpc.XdsRouteServiceBlockingStub; @@ -58,48 +59,30 @@ class XdsRouteServiceTest { @BeforeAll static void setup() { - final AggregatedHttpResponse response = createGroup("groups/foo"); + final AggregatedHttpResponse response = createGroup("groups/foo", dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.OK); } - private static AggregatedHttpResponse createGroup(String groupName) { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.POST, "/api/v1/xds/groups") - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, "{\"group\": {\"name\":\"" + groupName + "\"}}") - .aggregate().join(); - } - @Test void createRouteViaHttp() throws Exception { final RouteConfiguration route = routeConfiguration("this_route_name_will_be_ignored_and_replaced", "groups/foo/clusters/foo-cluster"); - AggregatedHttpResponse response = createRoute("foo", "@invalid_route_id", route); + AggregatedHttpResponse response = createRoute("groups/foo", "@invalid_route_id", + route, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.BAD_REQUEST); - response = createRoute("non-existent-group", "foo-route/1", route); + response = createRoute("groups/non-existent-group", "foo-route/1", route, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createRoute("foo", "foo-route/1", route); + response = createRoute("groups/foo", "foo-route/1", route, dogma.httpClient()); assertOk(response); final RouteConfiguration.Builder routeBuilder = RouteConfiguration.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), routeBuilder); final RouteConfiguration actualRoute = routeBuilder.build(); final String routeName = "groups/foo/routes/foo-route/1"; assertThat(actualRoute).isEqualTo(route.toBuilder().setName(routeName).build()); - checkResourceViaDiscoveryRequest(actualRoute, routeName, true); - } - - private static AggregatedHttpResponse createRoute( - String groupName, String routeId, RouteConfiguration route) throws IOException { - final RequestHeaders headers = - RequestHeaders.builder(HttpMethod.POST, - "/api/v1/xds/groups/" + groupName + "/routes?route_id=" + - routeId) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(route)) - .aggregate().join(); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualRoute, routeName, true)); } private static void assertOk(AggregatedHttpResponse response) { @@ -147,22 +130,23 @@ public void onCompleted() {} void updateRouteViaHttp() throws Exception { final RouteConfiguration route = routeConfiguration("this_route_name_will_be_ignored_and_replaced", "groups/foo/clusters/foo-cluster"); - AggregatedHttpResponse response = updateRoute("foo-route/2", route); + AggregatedHttpResponse response = updateRoute("groups/foo", "foo-route/2", route, dogma.httpClient()); assertThat(response.status()).isSameAs(HttpStatus.NOT_FOUND); - response = createRoute("foo", "foo-route/2", route); + response = createRoute("groups/foo", "foo-route/2", route, dogma.httpClient()); assertOk(response); final RouteConfiguration.Builder routeBuilder = RouteConfiguration.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), routeBuilder); final RouteConfiguration actualRoute = routeBuilder.build(); final String routeName = "groups/foo/routes/foo-route/2"; assertThat(actualRoute).isEqualTo(route.toBuilder().setName(routeName).build()); - checkResourceViaDiscoveryRequest(actualRoute, routeName, true); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualRoute, routeName, true)); final RouteConfiguration updatingRoute = route.toBuilder() .addInternalOnlyHeaders("internal") .setName(routeName).build(); - response = updateRoute("foo-route/2", updatingRoute); + response = updateRoute("groups/foo", "foo-route/2", updatingRoute, dogma.httpClient()); assertOk(response); final RouteConfiguration.Builder routeBuilder2 = RouteConfiguration.newBuilder(); JSON_MESSAGE_MARSHALLER.mergeValue(response.contentUtf8(), routeBuilder2); @@ -172,16 +156,6 @@ void updateRouteViaHttp() throws Exception { () -> checkResourceViaDiscoveryRequest(actualRoute2, routeName, true)); } - private static AggregatedHttpResponse updateRoute( - String routeId, RouteConfiguration route) throws IOException { - final RequestHeaders headers = RequestHeaders.builder(HttpMethod.PATCH, - "/api/v1/xds/groups/foo/routes/" + routeId) - .set(HttpHeaderNames.AUTHORIZATION, "Bearer anonymous") - .contentType(MediaType.JSON_UTF_8).build(); - return dogma.httpClient().execute(headers, JSON_MESSAGE_MARSHALLER.writeValueAsString(route)) - .aggregate().join(); - } - @Test void deleteRouteViaHttp() throws Exception { final String routeName = "groups/foo/routes/foo-route/3/4"; @@ -190,11 +164,12 @@ void deleteRouteViaHttp() throws Exception { final RouteConfiguration route = routeConfiguration("this_route_name_will_be_ignored_and_replaced", "groups/foo/clusters/foo-cluster"); - response = createRoute("foo", "foo-route/3/4", route); + response = createRoute("groups/foo", "foo-route/3/4", route, dogma.httpClient()); assertOk(response); final RouteConfiguration actualRoute = route.toBuilder().setName(routeName).build(); - checkResourceViaDiscoveryRequest(actualRoute, routeName, true); + await().pollInterval(100, TimeUnit.MILLISECONDS).untilAsserted( + () -> checkResourceViaDiscoveryRequest(actualRoute, routeName, true)); // Add permission test.