diff --git a/servicetalk-examples/docs/modules/ROOT/pages/_partials/nav-versioned.adoc b/servicetalk-examples/docs/modules/ROOT/pages/_partials/nav-versioned.adoc index 22ff8df5a5..1994cbcb2f 100644 --- a/servicetalk-examples/docs/modules/ROOT/pages/_partials/nav-versioned.adoc +++ b/servicetalk-examples/docs/modules/ROOT/pages/_partials/nav-versioned.adoc @@ -21,6 +21,7 @@ ** xref:{page-version}@servicetalk-examples::grpc/index.adoc#Compression[Compression] ** xref:{page-version}@servicetalk-examples::grpc/index.adoc#Deadlines[Deadlines] ** xref:{page-version}@servicetalk-examples::grpc/index.adoc#Observer[Observer] +** xref:{page-version}@servicetalk-examples::grpc/index.adoc#Health[Health Checking] ** xref:{page-version}@servicetalk-examples::grpc/index.adoc#errors[Application Errors] ** xref:{page-version}@servicetalk-examples::grpc/index.adoc#execution-strategy[Execution Strategy] ** xref:{page-version}@servicetalk-examples::grpc/index.adoc#route-guide[Route Guide] diff --git a/servicetalk-examples/docs/modules/ROOT/pages/grpc/index.adoc b/servicetalk-examples/docs/modules/ROOT/pages/grpc/index.adoc index 444225dc5f..961770886c 100644 --- a/servicetalk-examples/docs/modules/ROOT/pages/grpc/index.adoc +++ b/servicetalk-examples/docs/modules/ROOT/pages/grpc/index.adoc @@ -106,7 +106,6 @@ Extends the blocking "Hello World" example to demonstrate configuration of debug * link:{source-root}/servicetalk-examples/grpc/debugging/src/main/java/io/servicetalk/examples/grpc/debugging/DebuggingClient.java[DebuggingClient] – Sends hello requests to the server and receives a greeting response. - [#Observer] == Observer This example demonstrates the following: @@ -122,6 +121,20 @@ on the server builder. link:{source-root}/servicetalk-grpc-api/src/main/java/io/servicetalk/grpc/api/GrpcLifecycleObserver.java[GrpcLifecycleObserver] on via a client filter on the client builder. +[#Health] +== Health Checking +This example demonstrates the following: +- Use of +link:{source-root}/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java[DefaultHealthService] +which implements link:https://github.com/grpc/grpc/blob/master/doc/health-checking.md[gRPC health checking] paired with a simple "hello world" service. + +Using the following classes: +* link:{source-root}/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthServerExample.java[HealthServerExample] a server +that installs link:{source-root}/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java[DefaultHealthService] in addition to +a simple "hello world" service. +* link:{source-root}/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthClientExample.java[HealthClientExample] a client +that calls the "hello world" server, the "health check" server, and prints results. + [#errors] == Application Errors The gRPC protocol supports propagating application level errors, and also provides serialization/deserialization of diff --git a/servicetalk-examples/grpc/health/README.adoc b/servicetalk-examples/grpc/health/README.adoc new file mode 100644 index 0000000000..84d18e7ea5 --- /dev/null +++ b/servicetalk-examples/grpc/health/README.adoc @@ -0,0 +1,3 @@ +== ServiceTalk gRPC Health Example + +Extends "Hello World" ServiceTalk gRPC example to demonstrate default health service. \ No newline at end of file diff --git a/servicetalk-examples/grpc/health/build.gradle b/servicetalk-examples/grpc/health/build.gradle new file mode 100644 index 0000000000..1a0faedb7d --- /dev/null +++ b/servicetalk-examples/grpc/health/build.gradle @@ -0,0 +1,81 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +buildscript { + dependencies { + classpath "com.google.protobuf:protobuf-gradle-plugin:$protobufGradlePluginVersion" + } +} + +apply plugin: "java" +apply plugin: "com.google.protobuf" +apply from: "../../gradle/idea.gradle" + +dependencies { + implementation project(":servicetalk-annotations") + implementation project(":servicetalk-grpc-netty") + implementation project(":servicetalk-grpc-protoc") + implementation project(":servicetalk-grpc-protobuf") + implementation project(":servicetalk-grpc-health") + + implementation "org.slf4j:slf4j-api:$slf4jVersion" + runtimeOnly "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion" +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protobufVersion" + } + + //// REMOVE if outside of ServiceTalk gradle project + def pluginJar = file("${project.rootProject.rootDir}/servicetalk-grpc-protoc/build" + + "/buildExecutable/servicetalk-grpc-protoc-${project.version}-all.jar") + //// REMOVE if outside of ServiceTalk gradle project + + plugins { + servicetalk_grpc { + //// REMOVE if outside of ServiceTalk gradle project - use "artifact" as demonstrated below + //// "path" is used only because we want to use the gradle project local version of the plugin. + path = pluginJar.path + //// REMOVE if outside of ServiceTalk gradle project - use "artifact" as demonstrated below + + // artifact = "io.servicetalk:servicetalk-grpc-protoc:$serviceTalkVersion:all@jar" + } + } + generateProtoTasks { + all().each { task -> + //// REMOVE if outside of ServiceTalk gradle project + task.dependsOn(":servicetalk-grpc-protoc:buildExecutable") // use gradle project local grpc-protoc dependency + + // you may need to manually add the artifact name as an input + task.inputs + .file(pluginJar) + .withNormalizer(ClasspathNormalizer) + .withPropertyName("servicetalkPluginJar") + .withPathSensitivity(PathSensitivity.RELATIVE) + //// REMOVE if outside of ServiceTalk gradle project + + task.plugins { + servicetalk_grpc { + // Need to tell protobuf-gradle-plugin to output in the correct directory if all generated + // code for a single proto goes to a single file (e.g. "java_multiple_files = false" in the .proto). + outputSubDir = "java" + } + } + } + } + generatedFilesBaseDir = "$buildDir/generated/sources/proto" +} diff --git a/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthClientExample.java b/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthClientExample.java new file mode 100644 index 0000000000..9374600fbb --- /dev/null +++ b/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthClientExample.java @@ -0,0 +1,61 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.examples.grpc.health; + +import io.servicetalk.grpc.api.GrpcStatusException; +import io.servicetalk.grpc.health.DefaultHealthService; +import io.servicetalk.grpc.netty.GrpcClients; +import io.servicetalk.health.v1.Health; +import io.servicetalk.health.v1.Health.BlockingHealthClient; +import io.servicetalk.health.v1.HealthCheckRequest; +import io.servicetalk.health.v1.HealthCheckResponse; + +import io.grpc.examples.health.Greeter; +import io.grpc.examples.health.Greeter.BlockingGreeterClient; +import io.grpc.examples.health.HelloReply; +import io.grpc.examples.health.HelloRequest; + +/** + * Extends the async "Hello World" example to demonstrate {@link DefaultHealthService} usage. + */ +public final class HealthClientExample { + public static void main(String... args) throws Exception { + final String serviceName = "World"; + try (BlockingGreeterClient client = GrpcClients.forAddress("localhost", 8080) + .buildBlocking(new Greeter.ClientFactory()); + BlockingHealthClient healthClient = GrpcClients.forAddress("localhost", 8080) + .buildBlocking(new Health.ClientFactory())) { + // Check health before + checkHealth(healthClient, serviceName); + + HelloReply reply = client.sayHello(HelloRequest.newBuilder().setName("World").build()); + System.out.println("HelloReply=" + reply.getMessage()); + + // Check the health after to observe it changed. + checkHealth(healthClient, serviceName); + } + } + + private static void checkHealth(BlockingHealthClient healthClient, String serviceName) throws Exception { + try { + HealthCheckResponse response = healthClient.check( + HealthCheckRequest.newBuilder().setService(serviceName).build()); + System.out.println("Service '" + serviceName + "' health=" + response.getStatus()); + } catch (GrpcStatusException e) { + System.out.println("Service '" + serviceName + "' health exception=" + e); + } + } +} diff --git a/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthServerExample.java b/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthServerExample.java new file mode 100644 index 0000000000..b1bfa01814 --- /dev/null +++ b/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/HealthServerExample.java @@ -0,0 +1,42 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.examples.grpc.health; + +import io.servicetalk.grpc.health.DefaultHealthService; +import io.servicetalk.grpc.netty.GrpcServers; + +import io.grpc.examples.health.Greeter.GreeterService; +import io.grpc.examples.health.HelloReply; + +import static io.servicetalk.concurrent.api.Single.succeeded; +import static io.servicetalk.health.v1.HealthCheckResponse.ServingStatus.SERVING; + +/** + * A simple extension of the gRPC "Hello World" example which demonstrates {@link DefaultHealthService}. + */ +public final class HealthServerExample { + public static void main(String... args) throws Exception { + DefaultHealthService healthService = new DefaultHealthService(); + GreeterService greeterService = (ctx, request) -> { + // For demonstration purposes, just use the name as a service and mark it as SERVING. + healthService.setStatus(request.getName(), SERVING); + return succeeded(HelloReply.newBuilder().setMessage("Hello " + request.getName()).build()); + }; + GrpcServers.forPort(8080) + .listenAndAwait(healthService, greeterService) + .awaitShutdown(); + } +} diff --git a/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/package-info.java b/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/package-info.java new file mode 100644 index 0000000000..3e3cd1b129 --- /dev/null +++ b/servicetalk-examples/grpc/health/src/main/java/io/servicetalk/examples/grpc/health/package-info.java @@ -0,0 +1,19 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +@ElementsAreNonnullByDefault +package io.servicetalk.examples.grpc.health; + +import io.servicetalk.annotations.ElementsAreNonnullByDefault; diff --git a/servicetalk-examples/grpc/health/src/main/proto/helloworld.proto b/servicetalk-examples/grpc/health/src/main/proto/helloworld.proto new file mode 100644 index 0000000000..2390d6a016 --- /dev/null +++ b/servicetalk-examples/grpc/health/src/main/proto/helloworld.proto @@ -0,0 +1,37 @@ +// Copyright 2015 The gRPC Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +syntax = "proto3"; + +option java_multiple_files = true; +option java_package = "io.grpc.examples.health"; +option java_outer_classname = "HelloWorldProto"; +option objc_class_prefix = "HLW"; + +package helloworld; + +// The greeting service definition. +service Greeter { + // Sends a greeting + rpc SayHello (HelloRequest) returns (HelloReply) {} +} + +// The request message containing the user's name. +message HelloRequest { + string name = 1; +} + +// The response message containing the greetings +message HelloReply { + string message = 1; +} diff --git a/servicetalk-examples/grpc/health/src/main/resources/log4j2.xml b/servicetalk-examples/grpc/health/src/main/resources/log4j2.xml new file mode 100644 index 0000000000..0e67e5569a --- /dev/null +++ b/servicetalk-examples/grpc/health/src/main/resources/log4j2.xml @@ -0,0 +1,35 @@ + + + + + + + + + + + + + + + + + + + + + diff --git a/servicetalk-gradle-plugin-internal/src/main/groovy/io/servicetalk/gradle/plugin/internal/ServiceTalkLibraryPlugin.groovy b/servicetalk-gradle-plugin-internal/src/main/groovy/io/servicetalk/gradle/plugin/internal/ServiceTalkLibraryPlugin.groovy index c79873776d..4601bf1939 100644 --- a/servicetalk-gradle-plugin-internal/src/main/groovy/io/servicetalk/gradle/plugin/internal/ServiceTalkLibraryPlugin.groovy +++ b/servicetalk-gradle-plugin-internal/src/main/groovy/io/servicetalk/gradle/plugin/internal/ServiceTalkLibraryPlugin.groovy @@ -252,6 +252,12 @@ final class ServiceTalkLibraryPlugin extends ServiceTalkCorePlugin { incrementalAnalysis = true ruleSets = [] ruleSetConfig = resources.text.fromString(getClass().getResourceAsStream("pmd/basic.xml").text) + + pmdMain { + excludes = [ + '**/src/generated/**' + ] + } } tasks.withType(Pmd).all { diff --git a/servicetalk-gradle-plugin-internal/src/main/resources/io/servicetalk/gradle/plugin/internal/checkstyle/global-suppressions.xml b/servicetalk-gradle-plugin-internal/src/main/resources/io/servicetalk/gradle/plugin/internal/checkstyle/global-suppressions.xml index ca95408766..045c77d5ac 100644 --- a/servicetalk-gradle-plugin-internal/src/main/resources/io/servicetalk/gradle/plugin/internal/checkstyle/global-suppressions.xml +++ b/servicetalk-gradle-plugin-internal/src/main/resources/io/servicetalk/gradle/plugin/internal/checkstyle/global-suppressions.xml @@ -43,4 +43,7 @@ + + + diff --git a/servicetalk-grpc-health/build.gradle b/servicetalk-grpc-health/build.gradle new file mode 100644 index 0000000000..607681f121 --- /dev/null +++ b/servicetalk-grpc-health/build.gradle @@ -0,0 +1,133 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +buildscript { + dependencies { + classpath "com.google.protobuf:protobuf-gradle-plugin:$protobufGradlePluginVersion" + } +} + +apply plugin: "io.servicetalk.servicetalk-gradle-plugin-internal-library" +apply plugin: "com.google.protobuf" + +dependencies { + api project(":servicetalk-grpc-api") // generated code is exposed + implementation project(":servicetalk-annotations") + implementation project(":servicetalk-grpc-netty") + implementation project(":servicetalk-grpc-protoc") + implementation project(":servicetalk-grpc-protobuf") + + testImplementation enforcedPlatform("org.junit:junit-bom:$junit5Version") + testImplementation project(":servicetalk-concurrent-api-test") + testImplementation testFixtures(project(":servicetalk-transport-netty-internal")) + testImplementation testFixtures(project(":servicetalk-concurrent-internal")) + testImplementation "org.junit.jupiter:junit-jupiter-api" + testImplementation "org.hamcrest:hamcrest:$hamcrestVersion" +} + +// Instead of copy/pasting the .proto files into our repository, fetch them from maven central. +// This will also more likely to raise any API changes earlier so we can adjust. +configurations { + grpcProtos { transitive = false } +} + +dependencies { + grpcProtos "io.grpc:grpc-services:$grpcVersion" +} + +processResources { + duplicatesStrategy = 'include' +} + +def extractedProtoDir = "$buildDir/extracted-protos/main" +def generatedCodeDir = "$projectDir/src/generated" +def generatedJavaPkg = "io.servicetalk.health.v1"; + +task unzipGrpcProtos(type: Copy) { + dependsOn processResources + from zipTree(configurations.grpcProtos.singleFile).matching { + include '**/health.proto' + } + into extractedProtoDir + includeEmptyDirs = false + duplicatesStrategy = 'include' + // Rename the java package name to avoid potential classpath conflicts with grpc-java. + filter { line -> line.replace( + 'option java_package = "io.grpc.health.v1";', + "option java_package = \"$generatedJavaPkg\";") + } +} + +sourceSets { + main { + java { + srcDir generatedCodeDir + } + proto { + srcDir "$extractedProtoDir/main" + } + } +} + +javadoc { + exclude "**/${generatedJavaPkg.replace('.', '/')}/**" +} + +protobuf { + protoc { + artifact = "com.google.protobuf:protoc:$protobufVersion" + } + + //// REMOVE if outside of ServiceTalk gradle project + def pluginJar = file("${project.rootProject.rootDir}/servicetalk-grpc-protoc/build" + + "/buildExecutable/servicetalk-grpc-protoc-${project.version}-all.jar") + //// REMOVE if outside of ServiceTalk gradle project + + plugins { + servicetalk_grpc { + //// REMOVE if outside of ServiceTalk gradle project - use "artifact" as demonstrated below + //// "path" is used only because we want to use the gradle project local version of the plugin. + path = pluginJar.path + //// REMOVE if outside of ServiceTalk gradle project - use "artifact" as demonstrated below + + // artifact = "io.servicetalk:servicetalk-grpc-protoc:$serviceTalkVersion:all@jar" + } + } + generateProtoTasks { + all().each { task -> + task.dependsOn unzipGrpcProtos + //// REMOVE if outside of ServiceTalk gradle project + task.dependsOn(":servicetalk-grpc-protoc:buildExecutable") // use gradle project local grpc-protoc dependency + + // you may need to manually add the artifact name as an input + task.inputs + .file(pluginJar) + .withNormalizer(ClasspathNormalizer) + .withPropertyName("servicetalkPluginJar") + .withPathSensitivity(PathSensitivity.RELATIVE) + //// REMOVE if outside of ServiceTalk gradle project + + task.plugins { + servicetalk_grpc { + // Need to tell protobuf-gradle-plugin to output in the correct directory if all generated + // code for a single proto goes to a single file (e.g. "java_multiple_files = false" in the .proto). + outputSubDir = "java" + } + } + } + } + generatedFilesBaseDir = generatedCodeDir +} diff --git a/servicetalk-grpc-health/gradle/spotbugs/main-exclusions.xml b/servicetalk-grpc-health/gradle/spotbugs/main-exclusions.xml new file mode 100644 index 0000000000..e210392660 --- /dev/null +++ b/servicetalk-grpc-health/gradle/spotbugs/main-exclusions.xml @@ -0,0 +1,22 @@ + + + + + + + + diff --git a/servicetalk-grpc-health/src/README.md b/servicetalk-grpc-health/src/README.md new file mode 100644 index 0000000000..0695fd24c6 --- /dev/null +++ b/servicetalk-grpc-health/src/README.md @@ -0,0 +1,4 @@ +## gRPC Health Checking + +This package contains an implementation of the +[gRPC Health Checking v1 Protocol](https://github.com/grpc/grpc/blob/master/doc/health-checking.md). \ No newline at end of file diff --git a/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java b/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java new file mode 100644 index 0000000000..2ae60ed5fc --- /dev/null +++ b/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/DefaultHealthService.java @@ -0,0 +1,214 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.health; + +import io.servicetalk.concurrent.PublisherSource.Processor; +import io.servicetalk.concurrent.api.Publisher; +import io.servicetalk.concurrent.api.Single; +import io.servicetalk.grpc.api.GrpcServiceContext; +import io.servicetalk.grpc.api.GrpcStatus; +import io.servicetalk.grpc.api.GrpcStatusCode; +import io.servicetalk.health.v1.Health; +import io.servicetalk.health.v1.HealthCheckRequest; +import io.servicetalk.health.v1.HealthCheckResponse; +import io.servicetalk.health.v1.HealthCheckResponse.ServingStatus; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; + +import static io.servicetalk.concurrent.api.Processors.newPublisherProcessorDropHeadOnOverflow; +import static io.servicetalk.concurrent.api.SourceAdapters.fromSource; +import static io.servicetalk.grpc.api.GrpcStatusCode.FAILED_PRECONDITION; +import static io.servicetalk.grpc.api.GrpcStatusCode.NOT_FOUND; +import static io.servicetalk.health.v1.HealthCheckResponse.ServingStatus.NOT_SERVING; +import static io.servicetalk.health.v1.HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN; +import static io.servicetalk.health.v1.HealthCheckResponse.ServingStatus.SERVING; +import static io.servicetalk.health.v1.HealthCheckResponse.newBuilder; +import static java.util.Objects.requireNonNull; + +/** + * Implementation of {@link Health.HealthService} which targets + * gRPC health checking that provides + * accessors to set/clear status for arbitrary services. + */ +public final class DefaultHealthService implements Health.HealthService { + /** + * The name of the service corresponding to + * the overall health status. + */ + public static final String OVERALL_SERVICE_NAME = ""; + private final Map serviceToStatusMap = new ConcurrentHashMap<>(); + private final Predicate watchAllowed; + private final Lock lock = new ReentrantLock(); + private boolean terminated; + + /** + * Create a new instance. Service {@link #OVERALL_SERVICE_NAME} state is set to {@link ServingStatus#SERVING}. + */ + public DefaultHealthService() { + this(service -> true); + } + + /** + * Create a new instance. Service {@link #OVERALL_SERVICE_NAME} state is set to {@link ServingStatus#SERVING}. + * @param watchAllowed {@link Predicate} that determines if a {@link #watch(GrpcServiceContext, HealthCheckRequest)} + * request that doesn't match an existing service will succeed or fail with + * {@link GrpcStatusCode#FAILED_PRECONDITION}. This can be used to bound memory by restricting watches to expected + * service names. + */ + public DefaultHealthService(Predicate watchAllowed) { + this.watchAllowed = requireNonNull(watchAllowed); + serviceToStatusMap.put(OVERALL_SERVICE_NAME, new HealthValue(SERVING)); + } + + @Override + public Single check(final GrpcServiceContext ctx, final HealthCheckRequest request) { + HealthValue health = serviceToStatusMap.get(request.getService()); + if (health == null) { + return Single.failed(new GrpcStatus(NOT_FOUND, null, "unknown service: " + request.getService()) + .asException()); + } + return Single.succeeded(health.last); + } + + @Override + public Publisher watch(final GrpcServiceContext ctx, final HealthCheckRequest request) { + // Try a get first to avoid locking with the assumption that most requests will be to watch existing services. + HealthValue healthValue = serviceToStatusMap.get(request.getService()); + if (healthValue == null) { + if (!watchAllowed.test(request.getService())) { + return Publisher.failed(new GrpcStatus(FAILED_PRECONDITION, null, "watch not allowed for service " + + request.getService()).asException()); + } + lock.lock(); + try { + if (terminated) { + return Publisher.from(newBuilder().setStatus(NOT_SERVING).build()); + } + healthValue = serviceToStatusMap.computeIfAbsent(request.getService(), + __ -> new HealthValue(SERVICE_UNKNOWN)); + } finally { + lock.unlock(); + } + } + + return Publisher.from(healthValue.last).concat(healthValue.publisher); + } + + /** + * Updates the status of the server. + * @param service the name of some aspect of the server that is associated with a health status. + * This name can have no relation with the gRPC services that the server is running with. + * It can also be an empty String {@code ""} per the gRPC specification. + * @param status is one of the values {@link ServingStatus#SERVING}, {@link ServingStatus#NOT_SERVING}, + * and {@link ServingStatus#UNKNOWN}. + * @return {@code true} if this change was applied. {@code false} if it was not due to {@link #terminate()}. + */ + public boolean setStatus(String service, ServingStatus status) { + final HealthCheckResponse resp; + final HealthValue healthValue; + lock.lock(); + try { + if (terminated) { + return false; + } + resp = newBuilder().setStatus(status).build(); + healthValue = serviceToStatusMap.computeIfAbsent(service, __ -> new HealthValue(resp)); + } finally { + lock.unlock(); + } + healthValue.next(resp); + return true; + } + + /** + * Clears the health status record of a service. The health service will respond with NOT_FOUND + * error on checking the status of a cleared service. + * @param service the name of some aspect of the server that is associated with a health status. + * This name can have no relation with the gRPC services that the server is running with. + * It can also be an empty String {@code ""} per the gRPC specification. + * @return {@code true} if this call removed a service. {@code false} if service wasn't found. + */ + public boolean clearStatus(String service) { + final HealthValue healthValue = serviceToStatusMap.remove(service); + if (healthValue != null) { + healthValue.completeMultipleTerminalSafe(SERVICE_UNKNOWN); + return true; + } + return false; + } + + /** + * All services will be marked as {@link ServingStatus#NOT_SERVING}, and + * future updates to services will be prohibited. This method is meant to be called prior to server shutdown as a + * way to indicate that clients should redirect their traffic elsewhere. + * @return {@code true} if this call terminated this service. {@code false} if it was not due to previous call to + * this method. + */ + public boolean terminate() { + lock.lock(); + try { + if (terminated) { + return false; + } + terminated = true; + } finally { + lock.unlock(); + } + for (final HealthValue healthValue : serviceToStatusMap.values()) { + healthValue.completeMultipleTerminalSafe(NOT_SERVING); + } + return true; + } + + private static final class HealthValue { + private final Processor processor; + private final Publisher publisher; + private volatile HealthCheckResponse last; + + private HealthValue(final HealthCheckResponse initialState) { + this.processor = newPublisherProcessorDropHeadOnOverflow(4); + this.publisher = fromSource(processor) + // Allow multiple subscribers to Subscribe to the resulting Publisher. + .multicast(1, false); + this.last = initialState; + } + + private HealthValue(final ServingStatus status) { + this(newBuilder().setStatus(status).build()); + } + + void next(HealthCheckResponse response) { + // Set the status here instead of in an operator because we need the status to be updated regardless if + // anyone is consuming the status. + last = response; + processor.onNext(response); + } + + /** + * This method is safe to invoke multiple times. Safety is currently provided by default {@link Processor} + * implementations. + * @param status The last status to set. + */ + void completeMultipleTerminalSafe(ServingStatus status) { + next(newBuilder().setStatus(status).build()); + processor.onComplete(); + } + } +} diff --git a/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/package-info.java b/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/package-info.java new file mode 100644 index 0000000000..e5c1dde19d --- /dev/null +++ b/servicetalk-grpc-health/src/main/java/io/servicetalk/grpc/health/package-info.java @@ -0,0 +1,22 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +/** + * gRPC Health Checking Service. + */ +@ElementsAreNonnullByDefault +package io.servicetalk.grpc.health; + +import io.servicetalk.annotations.ElementsAreNonnullByDefault; diff --git a/servicetalk-grpc-health/src/test/java/io/servicetalk/grpc/health/DefaultHealthServiceTest.java b/servicetalk-grpc-health/src/test/java/io/servicetalk/grpc/health/DefaultHealthServiceTest.java new file mode 100644 index 0000000000..9a9de0ebb8 --- /dev/null +++ b/servicetalk-grpc-health/src/test/java/io/servicetalk/grpc/health/DefaultHealthServiceTest.java @@ -0,0 +1,174 @@ +/* + * Copyright © 2022 Apple Inc. and the ServiceTalk project authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.servicetalk.grpc.health; + +import io.servicetalk.concurrent.BlockingIterator; +import io.servicetalk.grpc.api.GrpcStatusCode; +import io.servicetalk.grpc.api.GrpcStatusException; +import io.servicetalk.grpc.netty.GrpcClients; +import io.servicetalk.grpc.netty.GrpcServers; +import io.servicetalk.health.v1.Health; +import io.servicetalk.health.v1.HealthCheckRequest; +import io.servicetalk.health.v1.HealthCheckResponse; +import io.servicetalk.health.v1.HealthCheckResponse.ServingStatus; +import io.servicetalk.transport.api.ServerContext; + +import org.junit.jupiter.api.Test; + +import java.net.InetSocketAddress; + +import static io.servicetalk.concurrent.internal.DeliberateException.DELIBERATE_EXCEPTION; +import static io.servicetalk.grpc.api.GrpcStatusCode.FAILED_PRECONDITION; +import static io.servicetalk.grpc.api.GrpcStatusCode.NOT_FOUND; +import static io.servicetalk.grpc.api.GrpcStatusCode.UNKNOWN; +import static io.servicetalk.grpc.health.DefaultHealthService.OVERALL_SERVICE_NAME; +import static io.servicetalk.health.v1.HealthCheckResponse.ServingStatus.NOT_SERVING; +import static io.servicetalk.health.v1.HealthCheckResponse.ServingStatus.SERVICE_UNKNOWN; +import static io.servicetalk.health.v1.HealthCheckResponse.ServingStatus.SERVING; +import static io.servicetalk.transport.netty.internal.AddressUtils.localAddress; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.junit.jupiter.api.Assertions.assertThrows; + +final class DefaultHealthServiceTest { + private static final String UNKNOWN_SERVICE_NAME = "unknown"; + + @Test + void defaultCheck() throws Exception { + DefaultHealthService service = new DefaultHealthService(); + try (ServerContext serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service)) { + try (Health.BlockingHealthClient client = GrpcClients.forResolvedAddress( + (InetSocketAddress) serverCtx.listenAddress()).buildBlocking(new Health.ClientFactory())) { + assertThat(client.check(newRequest(OVERALL_SERVICE_NAME)).getStatus(), equalTo(SERVING)); + + assertThat(service.setStatus(OVERALL_SERVICE_NAME, NOT_SERVING), equalTo(true)); + assertThat(client.check(newRequest(OVERALL_SERVICE_NAME)).getStatus(), equalTo(NOT_SERVING)); + } + } + } + + @Test + void statusChangeCheck() throws Exception { + DefaultHealthService service = new DefaultHealthService(); + String serviceName = "service"; + ServingStatus serviceStatus = NOT_SERVING; + service.setStatus(serviceName, serviceStatus); + try (ServerContext serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service)) { + try (Health.BlockingHealthClient client = GrpcClients.forResolvedAddress( + (InetSocketAddress) serverCtx.listenAddress()).buildBlocking(new Health.ClientFactory())) { + assertThat(client.check(newRequest(serviceName)).getStatus(), equalTo(serviceStatus)); + } + } + } + + @Test + void notFoundCheck() throws Exception { + DefaultHealthService service = new DefaultHealthService(); + try (ServerContext serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service)) { + try (Health.BlockingHealthClient client = GrpcClients.forResolvedAddress( + (InetSocketAddress) serverCtx.listenAddress()).buildBlocking(new Health.ClientFactory())) { + assertThat(assertThrows(GrpcStatusException.class, + () -> client.check(newRequest(UNKNOWN_SERVICE_NAME))).status().code(), + equalTo(NOT_FOUND)); + } + } + } + + @Test + void defaultWatch() throws Exception { + DefaultHealthService service = new DefaultHealthService(); + try (ServerContext serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service)) { + try (Health.BlockingHealthClient client = GrpcClients.forResolvedAddress( + (InetSocketAddress) serverCtx.listenAddress()).buildBlocking(new Health.ClientFactory())) { + BlockingIterator itr = client.watch(newRequest(OVERALL_SERVICE_NAME)).iterator(); + assertThat(itr.next().getStatus(), equalTo(SERVING)); + + assertThat(service.setStatus(OVERALL_SERVICE_NAME, NOT_SERVING), equalTo(true)); + assertThat(itr.next().getStatus(), equalTo(NOT_SERVING)); + } + } + } + + @Test + void clearWatch() throws Exception { + DefaultHealthService service = new DefaultHealthService(); + try (ServerContext serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service)) { + try (Health.BlockingHealthClient client = GrpcClients.forResolvedAddress( + (InetSocketAddress) serverCtx.listenAddress()).buildBlocking(new Health.ClientFactory())) { + assertThat(service.clearStatus(OVERALL_SERVICE_NAME), equalTo(true)); + BlockingIterator itr = client.watch(newRequest(OVERALL_SERVICE_NAME)).iterator(); + assertThat(itr.next().getStatus(), equalTo(SERVICE_UNKNOWN)); + + assertThat(service.setStatus(OVERALL_SERVICE_NAME, SERVING), equalTo(true)); + assertThat(itr.next().getStatus(), equalTo(SERVING)); + } + } + } + + @Test + void terminateWatchCheck() throws Exception { + DefaultHealthService service = new DefaultHealthService(); + try (ServerContext serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service)) { + try (Health.BlockingHealthClient client = GrpcClients.forResolvedAddress( + (InetSocketAddress) serverCtx.listenAddress()).buildBlocking(new Health.ClientFactory())) { + BlockingIterator itr = client.watch(newRequest(OVERALL_SERVICE_NAME)).iterator(); + assertThat(itr.next().getStatus(), equalTo(SERVING)); + assertThat(client.check(newRequest(OVERALL_SERVICE_NAME)).getStatus(), equalTo(SERVING)); + + assertThat(service.terminate(), equalTo(true)); + + assertThat(itr.next().getStatus(), equalTo(NOT_SERVING)); + assertThat(itr.hasNext(), equalTo(false)); + assertThat(client.check(newRequest(OVERALL_SERVICE_NAME)).getStatus(), equalTo(NOT_SERVING)); + + assertThat(service.setStatus(OVERALL_SERVICE_NAME, SERVING), equalTo(false)); + + // Clear after terminate verifies that multiple termination doesn't cause issues. + assertThat(service.clearStatus(OVERALL_SERVICE_NAME), equalTo(true)); + assertThat(assertThrows(GrpcStatusException.class, + () -> client.check(newRequest(OVERALL_SERVICE_NAME))).status().code(), + equalTo(NOT_FOUND)); + } + } + } + + @Test + void watchPredicateFalse() throws Exception { + watchFailure(new DefaultHealthService(name -> false), FAILED_PRECONDITION); + } + + @Test + void watchPredicateThrows() throws Exception { + watchFailure(new DefaultHealthService(name -> { + throw DELIBERATE_EXCEPTION; + }), UNKNOWN); + } + + private static void watchFailure(DefaultHealthService service, GrpcStatusCode expectedCode) throws Exception { + try (ServerContext serverCtx = GrpcServers.forAddress(localAddress(0)).listenAndAwait(service)) { + try (Health.BlockingHealthClient client = GrpcClients.forResolvedAddress( + (InetSocketAddress) serverCtx.listenAddress()).buildBlocking(new Health.ClientFactory())) { + assertThat(assertThrows(GrpcStatusException.class, + () -> client.watch(newRequest(UNKNOWN_SERVICE_NAME)).iterator().next()).status().code(), + equalTo(expectedCode)); + } + } + } + + private static HealthCheckRequest newRequest(String service) { + return HealthCheckRequest.newBuilder().setService(service).build(); + } +} diff --git a/settings.gradle b/settings.gradle index 40f166e56c..ac2e54cfc5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -48,6 +48,7 @@ include "servicetalk-annotations", "servicetalk-examples:grpc:debugging", "servicetalk-examples:grpc:execution-strategy", "servicetalk-examples:grpc:observer", + "servicetalk-examples:grpc:health", "servicetalk-examples:http:helloworld", "servicetalk-examples:http:mutual-tls", "servicetalk-examples:http:redirects", @@ -66,6 +67,7 @@ include "servicetalk-annotations", "servicetalk-examples:http:timeout", "servicetalk-gradle-plugin-internal", "servicetalk-grpc-api", + "servicetalk-grpc-health", "servicetalk-grpc-internal", "servicetalk-grpc-netty", "servicetalk-grpc-protobuf", @@ -113,6 +115,7 @@ project(":servicetalk-examples:grpc:debugging").name = "servicetalk-examples-grp project(":servicetalk-examples:grpc:errors").name = "servicetalk-examples-grpc-errors" project(":servicetalk-examples:grpc:execution-strategy").name = "servicetalk-examples-grpc-execution-strategy" project(":servicetalk-examples:grpc:observer").name = "servicetalk-examples-grpc-observer" +project(":servicetalk-examples:grpc:health").name = "servicetalk-examples-grpc-health" project(":servicetalk-examples:http:http2").name = "servicetalk-examples-http-http2" project(":servicetalk-examples:http:helloworld").name = "servicetalk-examples-http-helloworld" project(":servicetalk-examples:http:debugging").name = "servicetalk-examples-http-debugging"