Skip to content

Commit

Permalink
[INLONG-9960][Manager] Support to config kafka data node (#10005)
Browse files Browse the repository at this point in the history
Co-authored-by: castorqin <qhj00725@qq.com>
Co-authored-by: Charles Zhang <dockerzhang@apache.org>
  • Loading branch information
3 people authored Apr 19, 2024
1 parent 27d5777 commit fbf1ffc
Show file tree
Hide file tree
Showing 9 changed files with 331 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ public enum ErrorCodeEnum {
DATA_NODE_NOT_FOUND(1150, "Data node information does not exist"),
DATA_NODE_TYPE_NOT_SUPPORTED(1151, "Data node type '%s' not supported"),
DATA_NODE_ID_CHANGED(1152, "Data node information's id not equals"),
DATA_NODE_INFO_INCORRECT(1153, "Data node info was incorrect"),

STREAM_NOT_FOUND(1201, "Inlong stream does not exist/no operation permission"),
STREAM_ID_DUPLICATE(1202, "The current inlong group has a inlong stream with the same ID"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand All @@ -39,6 +41,10 @@
@ApiModel("Inlong cluster info for Kafka")
public class KafkaClusterInfo extends ClusterInfo {

@JsonProperty("bootstrap.servers")
@ApiModelProperty(value = "Kafka bootstrap servers' URL")
private String bootstrapServers;

public KafkaClusterInfo() {
this.setType(ClusterType.KAFKA);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;

import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
Expand All @@ -36,6 +38,10 @@
@ApiModel("Inlong cluster request for Kafka")
public class KafkaClusterRequest extends ClusterRequest {

@JsonProperty("bootstrap.servers")
@ApiModelProperty(value = "Kafka bootstrap servers' URL")
private String bootstrapServers;

public KafkaClusterRequest() {
this.setType(ClusterType.KAFKA);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
@AllArgsConstructor
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.ELASTICSEARCH)
@JsonTypeDefine(value = DataNodeType.CLS)
@ApiModel("Cloud log service data node info")
public class ClsDataNodeInfo extends DataNodeInfo {

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.kafka;

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonUtils;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.commons.lang3.StringUtils;

import javax.validation.constraints.NotNull;

/**
* Kafka data node info
*/
@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@ApiModel("Kafka data node info")
public class KafkaDataNodeDTO {

@ApiModelProperty("kafka bootstrapServers")
private String bootstrapServers;

@ApiModelProperty("kafka client id")
private String clientId;

@ApiModelProperty(value = "kafka produce confirmation mechanism")
private String ack;

@ApiModelProperty("audit set name")
private String auditSetName;

/**
* Get the dto instance from the request
*/
public static KafkaDataNodeDTO getFromRequest(KafkaDataNodeRequest request, String extParams) {
KafkaDataNodeDTO dto = StringUtils.isNotBlank(extParams)
? KafkaDataNodeDTO.getFromJson(extParams)
: new KafkaDataNodeDTO();
return CommonBeanUtils.copyProperties(request, dto, true);
}

/**
* Get the dto instance from the JSON string.
*/
public static KafkaDataNodeDTO getFromJson(@NotNull String extParams) {
try {
return JsonUtils.parseObject(extParams, KafkaDataNodeDTO.class);
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.DATA_NODE_INFO_INCORRECT,
String.format("Failed to parse extParams for kafka node: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.kafka;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Kafka data node info
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.KAFKA)
@ApiModel("Kafka data node info")
public class KafkaDataNodeInfo extends DataNodeInfo {

@ApiModelProperty("kafka bootstrapServers")
private String bootstrapServers;

@ApiModelProperty("kafka client id")
private String clientId;

@ApiModelProperty(value = "kafka produce confirmation mechanism")
private String ack;

@ApiModelProperty("audit set name")
private String auditSetName;

public KafkaDataNodeInfo() {
this.setType(DataNodeType.KAFKA);
}

@Override
public KafkaDataNodeRequest genRequest() {
return CommonBeanUtils.copyProperties(this, KafkaDataNodeRequest::new);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.node.kafka;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;

import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Kafka data node request
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = DataNodeType.KAFKA)
@ApiModel("Kafka data node request")
public class KafkaDataNodeRequest extends DataNodeRequest {

@ApiModelProperty("kafka bootstrapServers")
private String bootstrapServers;

@ApiModelProperty("kafka client id")
private String clientId;

@ApiModelProperty(value = "kafka produce confirmation mechanism")
private String ack;

@ApiModelProperty("audit set name")
private String auditSetName;

public KafkaDataNodeRequest() {
this.setType(DataNodeType.KAFKA);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.node.kafka;

import org.apache.inlong.manager.common.consts.DataNodeType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.Preconditions;
import org.apache.inlong.manager.dao.entity.DataNodeEntity;
import org.apache.inlong.manager.pojo.cluster.kafka.KafkaClusterInfo;
import org.apache.inlong.manager.pojo.node.DataNodeInfo;
import org.apache.inlong.manager.pojo.node.DataNodeRequest;
import org.apache.inlong.manager.pojo.node.kafka.KafkaDataNodeDTO;
import org.apache.inlong.manager.pojo.node.kafka.KafkaDataNodeInfo;
import org.apache.inlong.manager.pojo.node.kafka.KafkaDataNodeRequest;
import org.apache.inlong.manager.service.node.AbstractDataNodeOperator;
import org.apache.inlong.manager.service.resource.queue.kafka.KafkaUtils;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

/**
* Kafka data node operator
*/
@Service
public class KafkaDataNodeOperator extends AbstractDataNodeOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDataNodeOperator.class);

@Autowired
private ObjectMapper objectMapper;

@Autowired
private RestTemplate restTemplate;

@Override
public Boolean accept(String dataNodeType) {
return getDataNodeType().equals(dataNodeType);
}

@Override
public String getDataNodeType() {
return DataNodeType.KAFKA;
}

@Override
public DataNodeInfo getFromEntity(DataNodeEntity entity) {
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.DATA_NODE_NOT_FOUND);
}

KafkaDataNodeInfo kafkaDataNodeInfo = new KafkaDataNodeInfo();
CommonBeanUtils.copyProperties(entity, kafkaDataNodeInfo);
if (StringUtils.isNotBlank(entity.getExtParams())) {
KafkaDataNodeDTO dto = KafkaDataNodeDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(dto, kafkaDataNodeInfo);
}
return kafkaDataNodeInfo;
}

@Override
protected void setTargetEntity(DataNodeRequest request, DataNodeEntity targetEntity) {
KafkaDataNodeRequest nodeRequest = (KafkaDataNodeRequest) request;
CommonBeanUtils.copyProperties(nodeRequest, targetEntity, true);
try {
KafkaDataNodeDTO dto = KafkaDataNodeDTO.getFromRequest(nodeRequest, targetEntity.getExtParams());
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.DATA_NODE_INFO_INCORRECT,
String.format("Failed to build extParams for kafka node: %s", e.getMessage()));
}
}

@Override
public Boolean testConnection(DataNodeRequest request) {
KafkaDataNodeRequest kafkaDataNodeRequest = (KafkaDataNodeRequest) request;
String bootstrapServers = kafkaDataNodeRequest.getBootstrapServers();
Preconditions.expectNotBlank(bootstrapServers, ErrorCodeEnum.INVALID_PARAMETER,
"connection bootstrapServers cannot be empty");
if (getKafkaConnection(bootstrapServers)) {
LOGGER.info("kafka connection success for bootstrapServers={}",
bootstrapServers);
}
return true;
}

private boolean getKafkaConnection(String bootstrapServers) {
KafkaClusterInfo kafkaClusterInfo = KafkaClusterInfo.builder().bootstrapServers(bootstrapServers).build();
try {

// test connect for kafka adminUrl
KafkaUtils.getAdminClient(kafkaClusterInfo);
return true;
} catch (Exception e) {
String errMsg = String.format("Kafka connection failed for bootstrapServers=%s",
kafkaClusterInfo.getBootstrapServers());
LOGGER.error(errMsg, e);
throw new BusinessException(errMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public class KafkaUtils {
public static AdminClient getAdminClient(KafkaClusterInfo kafkaClusterInfo) {
Properties properties = new Properties();
// Configure the access address and port number of the Kafka service
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getUrl());
properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaClusterInfo.getBootstrapServers());
// Create AdminClient instance
return AdminClient.create(properties);
}
Expand Down

0 comments on commit fbf1ffc

Please sign in to comment.