Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ plugins {
id 'org.gradle.test-retry' version '1.3.1' apply false
id 'org.scoverage' version '7.0.0' apply false
id 'com.github.johnrengelman.shadow' version '7.1.2' apply false
id "io.swagger.core.v3.swagger-gradle-plugin" version "2.2.0"
}

spotless {
Expand Down Expand Up @@ -1023,7 +1024,7 @@ project(':core') {
':connect:runtime:genConnectPredicateDocs',
':connect:runtime:genSinkConnectorConfigDocs', ':connect:runtime:genSourceConnectorConfigDocs',
':streams:genStreamsConfigDocs', 'genConsumerMetricsDocs', 'genProducerMetricsDocs',
':connect:runtime:genConnectMetricsDocs'], type: Tar) {
':connect:runtime:genConnectMetricsDocs', ':connect:runtime:genConnectOpenAPIDocs'], type: Tar) {
archiveClassifier = 'site-docs'
compression = Compression.GZIP
from project.file("$rootDir/docs")
Expand Down Expand Up @@ -2442,6 +2443,8 @@ project(':connect:runtime') {
implementation libs.jettyClient
implementation libs.reflections
implementation libs.mavenArtifact
implementation libs.swaggerJaxrs2
implementation libs.swaggerAnnotations

testImplementation project(':clients').sourceSets.test.output
testImplementation project(':core')
Expand Down Expand Up @@ -2522,6 +2525,26 @@ project(':connect:runtime') {
standardOutput = new File(generatedDocsDir, "connect_metrics.html").newOutputStream()
}

task setVersionInOpenAPISpec(type: Copy) {
from "$rootDir/gradle/openapi.template"
into "$buildDir/resources/docs"
rename ('openapi.template', 'openapi.yaml')
expand(kafkaVersion: "$rootProject.version")
}

task genConnectOpenAPIDocs(type: io.swagger.v3.plugins.gradle.tasks.ResolveTask, dependsOn: setVersionInOpenAPISpec) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It also depends on having the connect runtime classes/jar.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe this happens automatically because of classpath = sourceSets.main.runtimeClasspath

classpath = sourceSets.main.runtimeClasspath
buildClasspath = classpath
outputFileName = 'connect_rest'
outputFormat = 'YAML'
prettyPrint = 'TRUE'
sortOutput = 'TRUE'
openApiFile = file("$buildDir/resources/docs/openapi.yaml")
resourcePackages = ['org.apache.kafka.connect.runtime.rest.resources']
if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() }
outputDir = file(generatedDocsDir)
}

}

project(':connect:file') {
Expand Down
1 change: 1 addition & 0 deletions checkstyle/import-control.xml
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@
<allow pkg="org.glassfish.jersey" />
<allow pkg="com.fasterxml.jackson" />
<allow pkg="org.apache.http"/>
<allow pkg="io.swagger.v3.oas.annotations"/>
<subpackage name="resources">
<allow pkg="org.apache.log4j" />
</subpackage>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.kafka.connect.runtime.rest.resources;

import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.PredicatedTransformation;
Expand Down Expand Up @@ -102,17 +104,18 @@ private <T> void addConnectorPlugins(Collection<PluginDesc<T>> plugins, Collecti
}

@PUT
@Path("/{connectorType}/config/validate")
@Path("/{pluginName}/config/validate")
@Operation(summary = "Validate the provided configuration against the configuration definition for the specified pluginName")
public ConfigInfos validateConfigs(
final @PathParam("connectorType") String connType,
final @PathParam("pluginName") String pluginName,
final Map<String, String> connectorConfig
) throws Throwable {
String includedConnType = connectorConfig.get(ConnectorConfig.CONNECTOR_CLASS_CONFIG);
if (includedConnType != null
&& !normalizedPluginName(includedConnType).endsWith(normalizedPluginName(connType))) {
&& !normalizedPluginName(includedConnType).endsWith(normalizedPluginName(pluginName))) {
throw new BadRequestException(
"Included connector type " + includedConnType + " does not match request type "
+ connType
+ pluginName
);
}

Expand All @@ -133,7 +136,10 @@ public ConfigInfos validateConfigs(

@GET
@Path("/")
public List<PluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("connectorsOnly") boolean connectorsOnly) {
@Operation(summary = "List all connector plugins installed")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should explain the behaivour of the connectorsOnly parameter too

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public List<PluginInfo> listConnectorPlugins(
@DefaultValue("true") @QueryParam("connectorsOnly") @Parameter(description = "Whether to list only connectors instead of all plugins") boolean connectorsOnly
) {
synchronized (this) {
if (connectorsOnly) {
return Collections.unmodifiableList(connectorPlugins.stream()
Expand All @@ -146,8 +152,9 @@ public List<PluginInfo> listConnectorPlugins(@DefaultValue("true") @QueryParam("
}

@GET
@Path("/{name}/config")
public List<ConfigKeyInfo> getConnectorConfigDef(final @PathParam("name") String pluginName) {
@Path("/{pluginName}/config")
@Operation(summary = "Get the configuration definition for the specified pluginName")
public List<ConfigKeyInfo> getConnectorConfigDef(final @PathParam("pluginName") String pluginName) {
synchronized (this) {
return herder.connectorPluginConfig(pluginName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import javax.ws.rs.core.HttpHeaders;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.Parameter;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.ConnectorConfig;
import org.apache.kafka.connect.runtime.Herder;
Expand Down Expand Up @@ -113,6 +115,7 @@ public static void resetRequestTimeout() {

@GET
@Path("/")
@Operation(summary = "List all active connectors")
public Response listConnectors(
final @Context UriInfo uriInfo,
final @Context HttpHeaders headers
Expand Down Expand Up @@ -150,7 +153,8 @@ public Response listConnectors(

@POST
@Path("/")
public Response createConnector(final @QueryParam("forward") Boolean forward,
@Operation(summary = "Create a new connector")
public Response createConnector(final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final @Context HttpHeaders headers,
final CreateConnectorRequest createRequest) throws Throwable {
// Trim leading and trailing whitespaces from the connector name, replace null with empty string
Expand All @@ -172,43 +176,48 @@ public Response createConnector(final @QueryParam("forward") Boolean forward,

@GET
@Path("/{connector}")
@Operation(summary = "Get the details for the specified connector")
public ConnectorInfo getConnector(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
herder.connectorInfo(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", headers, null, forward);
}

@GET
@Path("/{connector}/config")
@Operation(summary = "Get the configuration for the specified connector")
public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Map<String, String>> cb = new FutureCallback<>();
herder.connectorConfig(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", headers, null, forward);
}

@GET
@Path("/{connector}/tasks-config")
@Operation(summary = "Get the configuration of all tasks for the specified connector")
public Map<ConnectorTaskId, Map<String, String>> getTasksConfig(
final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Map<ConnectorTaskId, Map<String, String>>> cb = new FutureCallback<>();
herder.tasksConfig(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks-config", "GET", headers, null, forward);
}

@GET
@Path("/{connector}/status")
@Operation(summary = "Get the status for the specified connector")
public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) {
return herder.connectorStatus(connector);
}

@GET
@Path("/{connector}/topics")
@Operation(summary = "Get the list of topics actively used by the specified connector")
public Response getConnectorActiveTopics(final @PathParam("connector") String connector) {
if (isTopicTrackingDisabled) {
throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
Expand All @@ -220,6 +229,7 @@ public Response getConnectorActiveTopics(final @PathParam("connector") String co

@PUT
@Path("/{connector}/topics/reset")
@Operation(summary = "Reset the list of topics actively used by the specified connector")
public Response resetConnectorActiveTopics(final @PathParam("connector") String connector, final @Context HttpHeaders headers) {
if (isTopicTrackingDisabled) {
throw new ConnectRestException(Response.Status.FORBIDDEN.getStatusCode(),
Expand All @@ -235,9 +245,10 @@ public Response resetConnectorActiveTopics(final @PathParam("connector") String

@PUT
@Path("/{connector}/config")
@Operation(summary = "Create or reconfigure the specified connector")
public Response putConnectorConfig(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward,
final Map<String, String> connectorConfig) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
checkAndPutConnectorConfigName(connector, connectorConfig);
Expand All @@ -257,11 +268,12 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto

@POST
@Path("/{connector}/restart")
@Operation(summary = "Restart the specified connector")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we mention includeTasks and onlyFailed parameters?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added descriptions for these parameters

public Response restartConnector(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @DefaultValue("false") @QueryParam("includeTasks") Boolean includeTasks,
final @DefaultValue("false") @QueryParam("onlyFailed") Boolean onlyFailed,
final @QueryParam("forward") Boolean forward) throws Throwable {
final @DefaultValue("false") @QueryParam("includeTasks") @Parameter(description = "Whether to also restart tasks") Boolean includeTasks,
final @DefaultValue("false") @QueryParam("onlyFailed") @Parameter(description = "Whether to only restart failed tasks/connectors")Boolean onlyFailed,
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
RestartRequest restartRequest = new RestartRequest(connector, onlyFailed, includeTasks);
String forwardingPath = "/connectors/" + connector + "/restart";
if (restartRequest.forceRestartConnectorOnly()) {
Expand All @@ -285,23 +297,28 @@ public Response restartConnector(final @PathParam("connector") String connector,

@PUT
@Path("/{connector}/pause")
@Operation(summary = "Pause the specified connector",
description = "This operation is idempotent and has no effects if the connector is already paused")
public Response pauseConnector(@PathParam("connector") String connector, final @Context HttpHeaders headers) {
herder.pauseConnector(connector);
return Response.accepted().build();
}

@PUT
@Path("/{connector}/resume")
@Operation(summary = "Resume the specified connector",
description = "This operation is idempotent and has no effects if the connector is already running")
public Response resumeConnector(@PathParam("connector") String connector) {
herder.resumeConnector(connector);
return Response.accepted().build();
}

@GET
@Path("/{connector}/tasks")
@Operation(summary = "List all tasks for the specified connector")
public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
herder.taskConfigs(connector, cb);
return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", headers, null, new TypeReference<List<TaskInfo>>() {
Expand All @@ -310,6 +327,7 @@ public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connec

@POST
@Path("/{connector}/tasks")
@Operation(hidden = true, summary = "This operation is only for inter-worker communications")
public void putTaskConfigs(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward,
Expand All @@ -322,6 +340,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector,

@GET
@Path("/{connector}/tasks/{task}/status")
@Operation(summary = "Get the state of the specified task for the specified connector")
public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @PathParam("task") Integer task) {
Expand All @@ -330,10 +349,11 @@ public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector")

@POST
@Path("/{connector}/tasks/{task}/restart")
@Operation(summary = "Restart the specified task for the specified connector")
public void restartTask(final @PathParam("connector") String connector,
final @PathParam("task") Integer task,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Void> cb = new FutureCallback<>();
ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
herder.restartTask(taskId, cb);
Expand All @@ -342,9 +362,10 @@ public void restartTask(final @PathParam("connector") String connector,

@DELETE
@Path("/{connector}")
@Operation(summary = "Delete the specified connector")
public void destroyConnector(final @PathParam("connector") String connector,
final @Context HttpHeaders headers,
final @QueryParam("forward") Boolean forward) throws Throwable {
final @Parameter(hidden = true) @QueryParam("forward") Boolean forward) throws Throwable {
FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
herder.deleteConnectorConfig(connector, cb);
completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", headers, null, forward);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.rest.resources;

import io.swagger.v3.oas.annotations.Operation;
import org.apache.kafka.connect.errors.NotFoundException;
import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
import org.apache.log4j.Level;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class LoggingResource {
*/
@GET
@Path("/")
@Operation(summary = "List the current loggers that have their levels explicitly set and their log levels")
public Response listLoggers() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Response return types in this and other Resource classes means there are no schemas generated, which seems less than ideal.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes this is not great. I think this should be fixable without requiring a KIP, but let's do this in a separate PR

Map<String, Map<String, String>> loggers = new TreeMap<>();
Enumeration<Logger> enumeration = currentLoggers();
Expand All @@ -83,6 +85,7 @@ public Response listLoggers() {
*/
@GET
@Path("/{logger}")
@Operation(summary = "Get the log level for the specified logger")
public Response getLogger(final @PathParam("logger") String namedLogger) {
Objects.requireNonNull(namedLogger, "require non-null name");

Expand Down Expand Up @@ -120,6 +123,7 @@ public Response getLogger(final @PathParam("logger") String namedLogger) {
*/
@PUT
@Path("/{logger}")
@Operation(summary = "Set the level for the specified logger")
public Response setLevel(final @PathParam("logger") String namedLogger,
final Map<String, String> levelMap) {
String desiredLevelStr = levelMap.get("level");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.rest.resources;

import io.swagger.v3.oas.annotations.Operation;
import org.apache.kafka.connect.runtime.Herder;
import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;

Expand All @@ -36,6 +37,7 @@ public RootResource(Herder herder) {

@GET
@Path("/")
@Operation(summary = "Get details about this Connect worker and the id of the Kafka cluster it is connected to")
public ServerInfo serverInfo() {
return new ServerInfo(herder.kafkaClusterId());
}
Expand Down
2 changes: 2 additions & 0 deletions docs/connect.html
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,8 @@ <h4><a id="connect_rest" href="#connect_rest">REST API</a></h4>
<li><code>GET /</code>- return basic information about the Kafka Connect cluster such as the version of the Connect worker that serves the REST request (including git commit ID of the source code) and the Kafka cluster ID that is connected to.
</ul>

<p>For the complete specification of the REST API, see the <a href="generated/connect_rest.yaml">OpenAPI documentation</a></p>

<h4><a id="connect_errorreporting" href="#connect_errorreporting">Error Reporting in Connect</a></h4>

<p>Kafka Connect provides error reporting to handle errors encountered along various stages of processing. By default, any error encountered during conversion or within transformations will cause the connector to fail. Each connector configuration can also enable tolerating such errors by skipping them, optionally writing each error and the details of the failed operation and problematic record (with various levels of detail) to the Connect application log. These mechanisms also capture errors when a sink connector is processing the messages consumed from its Kafka topics, and all of the errors can be written to a configurable "dead letter queue" (DLQ) Kafka topic.</p>
Expand Down
Loading