diff --git a/build.gradle b/build.gradle index f7fe0ea046dc3..3e1a3a91f3be4 100644 --- a/build.gradle +++ b/build.gradle @@ -41,6 +41,7 @@ plugins { id 'org.gradle.test-retry' version '1.3.1' apply false id 'org.scoverage' version '7.0.0' apply false id 'com.github.johnrengelman.shadow' version '7.1.2' apply false + id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0" } spotless { @@ -1023,7 +1024,7 @@ project(':core') { ':connect:runtime:genConnectPredicateDocs', ':connect:runtime:genSinkConnectorConfigDocs', ':connect:runtime:genSourceConnectorConfigDocs', ':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs', - ':connect:runtime:genConnectMetricsDocs'], type: Tar) { + ':connect:runtime:genConnectMetricsDocs', ':connect:runtime:genConnectOpenAPIDocs'], type: Tar) { archiveClassifier = 'site-docs' compression = Compression.GZIP from project.file("$rootDir/docs") @@ -2442,6 +2443,8 @@ project(':connect:runtime') { implementation libs.jettyClient implementation libs.reflections implementation libs.mavenArtifact + implementation libs.swaggerJaxrs2 + implementation libs.swaggerAnnotations testImplementation project(':clients').sourceSets.test.output testImplementation project(':core') @@ -2522,6 +2525,26 @@ project(':connect:runtime') { standardOutput = new File(generatedDocsDir, "connect_metrics.html").newOutputStream() } + task setVersionInOpenAPISpec(type: Copy) { + from "$rootDir/gradle/openapi.template" + into "$buildDir/resources/docs" + rename ('openapi.template', 'openapi.yaml') + expand(kafkaVersion: "$rootProject.version") + } + + task genConnectOpenAPIDocs(type: io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: setVersionInOpenAPISpec) { + classpath = sourceSets.main.runtimeClasspath + buildClasspath = classpath + outputFileName = 'connect_rest' + outputFormat = 'YAML' + prettyPrint = 'TRUE' + sortOutput = 'TRUE' + openApiFile = file("$buildDir/resources/docs/openapi.yaml") + resourcePackages = ['org.apache.kafka.connect.runtime.rest.resources'] + if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } + outputDir = file(generatedDocsDir) + } + } project(':connect:file') { diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 070d22c14c358..d81a66362502a 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -560,6 +560,7 @@ + diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java index 2beda9fb8a142..269d4471a5660 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.connect.runtime.rest.resources; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.PredicatedTransformation; @@ -102,17 +104,18 @@ private void addConnectorPlugins(Collection> plugins, Collecti } @PUT - @Path("/{connectorType}/config/validate") + @Path("/{pluginName}/config/validate") + @Operation(summary = "Validate the provided configuration against the configuration definition for the specified pluginName") public ConfigInfos validateConfigs( - final @PathParam("connectorType") String connType, + final @PathParam("pluginName") String pluginName, final Map connectorConfig ) throws Throwable { String includedConnType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG); if (includedConnType != null - && !normalizedPluginName(includedConnType).endsWith(normalizedPluginName(connType))) { + && !normalizedPluginName(includedConnType).endsWith(normalizedPluginName(pluginName))) { throw new BadRequestException( "Included connector type " + includedConnType + " does not match request type " - + connType + + pluginName ); } @@ -133,7 +136,10 @@ public ConfigInfos validateConfigs( @GET @Path("/") - public List listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) { + @Operation(summary = "List all connector plugins installed") + public List listConnectorPlugins( + @DefaultValue("true") @QueryParam("connectorsOnly") @Parameter(description = "Whether to list only connectors instead of all plugins") boolean connectorsOnly + ) { synchronized (this) { if (connectorsOnly) { return Collections.unmodifiableList(connectorPlugins.stream() @@ -146,8 +152,9 @@ public List listConnectorPlugins(@DefaultValue("true") @QueryParam(" } @GET - @Path("/{name}/config") - public List getConnectorConfigDef(final @PathParam("name") String pluginName) { + @Path("/{pluginName}/config") + @Operation(summary = "Get the configuration definition for the specified pluginName") + public List getConnectorConfigDef(final @PathParam("pluginName") String pluginName) { synchronized (this) { return herder.connectorPluginConfig(pluginName); } diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java index dbf246f00ef1c..fac582de61265 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -22,6 +22,8 @@ import javax.ws.rs.core.HttpHeaders; import com.fasterxml.jackson.databind.ObjectMapper; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.Parameter; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.ConnectorConfig; import org.apache.kafka.connect.runtime.Herder; @@ -113,6 +115,7 @@ public static void resetRequestTimeout() { @GET @Path("/") + @Operation(summary = "List all active connectors") public Response listConnectors( final @Context UriInfo uriInfo, final @Context HttpHeaders headers @@ -150,7 +153,8 @@ public Response listConnectors( @POST @Path("/") - public Response createConnector(final @QueryParam("forward") Boolean forward, + @Operation(summary = "Create a new connector") + public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final @Context HttpHeaders headers, final CreateConnectorRequest createRequest) throws Throwable { // Trim leading and trailing whitespaces from the connector name, replace null with empty string @@ -172,9 +176,10 @@ public Response createConnector(final @QueryParam("forward") Boolean forward, @GET @Path("/{connector}") + @Operation(summary = "Get the details for the specified connector") public ConnectorInfo getConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback cb = new FutureCallback<>(); herder.connectorInfo(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward); @@ -182,9 +187,10 @@ public ConnectorInfo getConnector(final @PathParam("connector") String connector @GET @Path("/{connector}/config") + @Operation(summary = "Get the configuration for the specified connector") public Map getConnectorConfig(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); herder.connectorConfig(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward); @@ -192,10 +198,11 @@ public Map getConnectorConfig(final @PathParam("connector") Stri @GET @Path("/{connector}/tasks-config") + @Operation(summary = "Get the configuration of all tasks for the specified connector") public Map> getTasksConfig( final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback>> cb = new FutureCallback<>(); herder.tasksConfig(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward); @@ -203,12 +210,14 @@ public Map> getTasksConfig( @GET @Path("/{connector}/status") + @Operation(summary = "Get the status for the specified connector") public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) { return herder.connectorStatus(connector); } @GET @Path("/{connector}/topics") + @Operation(summary = "Get the list of topics actively used by the specified connector") public Response getConnectorActiveTopics(final @PathParam("connector") String connector) { if (isTopicTrackingDisabled) { throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), @@ -220,6 +229,7 @@ public Response getConnectorActiveTopics(final @PathParam("connector") String co @PUT @Path("/{connector}/topics/reset") + @Operation(summary = "Reset the list of topics actively used by the specified connector") public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) { if (isTopicTrackingDisabled) { throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(), @@ -235,9 +245,10 @@ public Response resetConnectorActiveTopics(final @PathParam("connector") String @PUT @Path("/{connector}/config") + @Operation(summary = "Create or reconfigure the specified connector") public Response putConnectorConfig(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward, + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward, final Map connectorConfig) throws Throwable { FutureCallback> cb = new FutureCallback<>(); checkAndPutConnectorConfigName(connector, connectorConfig); @@ -257,11 +268,12 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto @POST @Path("/{connector}/restart") + @Operation(summary = "Restart the specified connector") public Response restartConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @DefaultValue("false") @QueryParam("includeTasks") Boolean includeTasks, - final @DefaultValue("false") @QueryParam("onlyFailed") Boolean onlyFailed, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @DefaultValue("false") @QueryParam("includeTasks") @Parameter(description = "Whether to also restart tasks") Boolean includeTasks, + final @DefaultValue("false") @QueryParam("onlyFailed") @Parameter(description = "Whether to only restart failed tasks/connectors")Boolean onlyFailed, + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks); String forwardingPath = "/connectors/" + connector + "/restart"; if (restartRequest.forceRestartConnectorOnly()) { @@ -285,6 +297,8 @@ public Response restartConnector(final @PathParam("connector") String connector, @PUT @Path("/{connector}/pause") + @Operation(summary = "Pause the specified connector", + description = "This operation is idempotent and has no effects if the connector is already paused") public Response pauseConnector(@PathParam("connector") String connector, final @Context HttpHeaders headers) { herder.pauseConnector(connector); return Response.accepted().build(); @@ -292,6 +306,8 @@ public Response pauseConnector(@PathParam("connector") String connector, final @ @PUT @Path("/{connector}/resume") + @Operation(summary = "Resume the specified connector", + description = "This operation is idempotent and has no effects if the connector is already running") public Response resumeConnector(@PathParam("connector") String connector) { herder.resumeConnector(connector); return Response.accepted().build(); @@ -299,9 +315,10 @@ public Response resumeConnector(@PathParam("connector") String connector) { @GET @Path("/{connector}/tasks") + @Operation(summary = "List all tasks for the specified connector") public List getTaskConfigs(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); herder.taskConfigs(connector, cb); return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference>() { @@ -310,6 +327,7 @@ public List getTaskConfigs(final @PathParam("connector") String connec @POST @Path("/{connector}/tasks") + @Operation(hidden = true, summary = "This operation is only for inter-worker communications") public void putTaskConfigs(final @PathParam("connector") String connector, final @Context HttpHeaders headers, final @QueryParam("forward") Boolean forward, @@ -322,6 +340,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector, @GET @Path("/{connector}/tasks/{task}/status") + @Operation(summary = "Get the state of the specified task for the specified connector") public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector, final @Context HttpHeaders headers, final @PathParam("task") Integer task) { @@ -330,10 +349,11 @@ public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") @POST @Path("/{connector}/tasks/{task}/restart") + @Operation(summary = "Restart the specified task for the specified connector") public void restartTask(final @PathParam("connector") String connector, final @PathParam("task") Integer task, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback cb = new FutureCallback<>(); ConnectorTaskId taskId = new ConnectorTaskId(connector, task); herder.restartTask(taskId, cb); @@ -342,9 +362,10 @@ public void restartTask(final @PathParam("connector") String connector, @DELETE @Path("/{connector}") + @Operation(summary = "Delete the specified connector") public void destroyConnector(final @PathParam("connector") String connector, final @Context HttpHeaders headers, - final @QueryParam("forward") Boolean forward) throws Throwable { + final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable { FutureCallback> cb = new FutureCallback<>(); herder.deleteConnectorConfig(connector, cb); completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java index ce9ce14e97488..cab9e4a57619c 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/LoggingResource.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.resources; +import io.swagger.v3.oas.annotations.Operation; import org.apache.kafka.connect.errors.NotFoundException; import org.apache.kafka.connect.runtime.rest.errors.BadRequestException; import org.apache.log4j.Level; @@ -59,6 +60,7 @@ public class LoggingResource { */ @GET @Path("/") + @Operation(summary = "List the current loggers that have their levels explicitly set and their log levels") public Response listLoggers() { Map> loggers = new TreeMap<>(); Enumeration enumeration = currentLoggers(); @@ -83,6 +85,7 @@ public Response listLoggers() { */ @GET @Path("/{logger}") + @Operation(summary = "Get the log level for the specified logger") public Response getLogger(final @PathParam("logger") String namedLogger) { Objects.requireNonNull(namedLogger, "require non-null name"); @@ -120,6 +123,7 @@ public Response getLogger(final @PathParam("logger") String namedLogger) { */ @PUT @Path("/{logger}") + @Operation(summary = "Set the level for the specified logger") public Response setLevel(final @PathParam("logger") String namedLogger, final Map levelMap) { String desiredLevelStr = levelMap.get("level"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java index 9666bf15954f9..be0c2811d5b00 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.runtime.rest.resources; +import io.swagger.v3.oas.annotations.Operation; import org.apache.kafka.connect.runtime.Herder; import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; @@ -36,6 +37,7 @@ public RootResource(Herder herder) { @GET @Path("/") + @Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to") public ServerInfo serverInfo() { return new ServerInfo(herder.kafkaClusterId()); } diff --git a/docs/connect.html b/docs/connect.html index be6a2ac46137a..d13d25d31393c 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -327,6 +327,8 @@

REST API

  • GET /- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to. +

    For the complete specification of the REST API, see the OpenAPI documentation

    +

    Error Reporting in Connect

    Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable "dead letter queue" (DLQ) Kafka topic.

    diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle index 066d62bc854fe..c6433d16af893 100644 --- a/gradle/dependencies.gradle +++ b/gradle/dependencies.gradle @@ -116,6 +116,8 @@ versions += [ slf4j: "1.7.36", snappy: "1.1.8.4", spotbugs: "4.2.2", + swaggerAnnotations: "2.2.0", + swaggerJaxrs2: "2.2.0", zinc: "1.3.5", zookeeper: "3.6.3", zstd: "1.5.2-1" @@ -199,6 +201,8 @@ libs += [ slf4jApi: "org.slf4j:slf4j-api:$versions.slf4j", slf4jlog4j: "org.slf4j:slf4j-log4j12:$versions.slf4j", snappy: "org.xerial.snappy:snappy-java:$versions.snappy", + swaggerAnnotations: "io.swagger.core.v3:swagger-annotations:$versions.swaggerAnnotations", + swaggerJaxrs2: "io.swagger.core.v3:swagger-jaxrs2:$versions.swaggerJaxrs2", zookeeper: "org.apache.zookeeper:zookeeper:$versions.zookeeper", jfreechart: "jfreechart:jfreechart:$versions.jfreechart", mavenArtifact: "org.apache.maven:maven-artifact:$versions.mavenArtifact", diff --git a/gradle/openapi.template b/gradle/openapi.template new file mode 100644 index 0000000000000..d15c40c0070a1 --- /dev/null +++ b/gradle/openapi.template @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +openapi: 3.0.0 +info: + version: $kafkaVersion + title: Kafka Connect REST API + description: "This is the documentation of the [Apache Kafka](https://kafka.apache.org) Connect REST API." + contact: + email: dev@kafka.apache.org + license: + name: Apache 2.0 + url: https://www.apache.org/licenses/LICENSE-2.0.html