Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-10391][Manager] Supports configuring zk clusters and issuing zk addresses to agents #10392

Merged
merged 4 commits into from
Jun 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.pojo.agent;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* The Agent config info.
*/
@Data
public class AgentConfigInfo {

private String zkUrl;

private AgentClusterInfo cluster;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public static class AgentClusterInfo {

private Integer parentId;

private String clusterName;

}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.common.pojo.agent;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* Request for Agent config
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AgentConfigRequest {

private String clusterTag;

private String clusterName;

private String ip;
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class ClusterType {
public static final String SORT_PULSAR = "SORT_PULSAR";
public static final String SORT_KAFKA = "SORT_KAFKA";

public static final String AGENT_ZK = "AGENT_ZK";

private static final Set<String> TYPE_SET = new HashSet<String>() {

{
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.zk;

import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.JsonUtils;

import io.swagger.annotations.ApiModel;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import javax.validation.constraints.NotNull;

/**
* Agent zk cluster info
*/
@Data
@Builder
@NoArgsConstructor
@ApiModel("Agent zk cluster info")
public class AgentZkClusterDTO {

/**
* Get the dto instance from the request
*/
public static AgentZkClusterDTO getFromRequest(AgentZkClusterRequest request) {
return AgentZkClusterDTO.builder()
.build();
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved
}

/**
* Get the dto instance from the JSON string.
*/
public static AgentZkClusterDTO getFromJson(@NotNull String extParams) {
try {
return JsonUtils.parseObject(extParams, AgentZkClusterDTO.class);
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
String.format("parse extParams of agent zk Cluster failure: %s", e.getMessage()));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.zk;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Inlong cluster info for agent ZK
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.AGENT_ZK)
@ApiModel("Inlong cluster info for agent ZK")
public class AgentZkClusterInfo extends ClusterInfo {

public AgentZkClusterInfo() {
this.setType(ClusterType.AGENT_ZK);
}

@Override
public AgentZkClusterRequest genRequest() {
return CommonBeanUtils.copyProperties(this, AgentZkClusterRequest::new);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.cluster.zk;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.util.JsonTypeDefine;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;

import io.swagger.annotations.ApiModel;
import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;

/**
* Inlong cluster request for agent ZK
*/
@Data
@ToString(callSuper = true)
@EqualsAndHashCode(callSuper = true)
@JsonTypeDefine(value = ClusterType.AGENT_ZK)
@ApiModel("Inlong cluster request for agent ZK")
public class AgentZkClusterRequest extends ClusterRequest {

public AgentZkClusterRequest() {
this.setType(ClusterType.AGENT_ZK);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.service.cluster;

import org.apache.inlong.manager.common.enums.ClusterType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongClusterEntity;
import org.apache.inlong.manager.pojo.cluster.ClusterInfo;
import org.apache.inlong.manager.pojo.cluster.ClusterRequest;
import org.apache.inlong.manager.pojo.cluster.zk.AgentZkClusterDTO;
import org.apache.inlong.manager.pojo.cluster.zk.AgentZkClusterInfo;
import org.apache.inlong.manager.pojo.cluster.zk.AgentZkClusterRequest;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

@Service
public class AgentZkClusterOperator extends AbstractClusterOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(AgentZkClusterOperator.class);

@Autowired
private ObjectMapper objectMapper;

@Override
public Boolean accept(String clusterType) {
return getClusterType().equals(clusterType);
}

@Override
public String getClusterType() {
return ClusterType.AGENT_ZK;
}

@Override
protected void setTargetEntity(ClusterRequest request, InlongClusterEntity targetEntity) {
AgentZkClusterRequest agentZkClusterRequest = (AgentZkClusterRequest) request;
CommonBeanUtils.copyProperties(agentZkClusterRequest, targetEntity, true);
try {
AgentZkClusterDTO dto = AgentZkClusterDTO.getFromRequest(agentZkClusterRequest);
targetEntity.setExtParams(objectMapper.writeValueAsString(dto));
} catch (Exception e) {
throw new BusinessException(ErrorCodeEnum.CLUSTER_INFO_INCORRECT,
String.format("serialize extParams of agent zk Cluster failure: %s", e.getMessage()));
}
}

@Override
public ClusterInfo getFromEntity(InlongClusterEntity entity) {
if (entity == null) {
throw new BusinessException(ErrorCodeEnum.CLUSTER_NOT_FOUND);
}
AgentZkClusterInfo info = new AgentZkClusterInfo();
CommonBeanUtils.copyProperties(entity, info);
if (StringUtils.isNotBlank(entity.getExtParams())) {
AgentZkClusterDTO dto = AgentZkClusterDTO.getFromJson(entity.getExtParams());
CommonBeanUtils.copyProperties(dto, info);
}
return info;
}

@Override
public Object getClusterInfo(InlongClusterEntity entity) {
AgentZkClusterInfo agentZkClusterInfo = (AgentZkClusterInfo) this.getFromEntity(entity);
Map<String, String> map = new HashMap<>();
map.put("url", agentZkClusterInfo.getUrl());
fuweng11 marked this conversation as resolved.
Show resolved Hide resolved
return map;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.inlong.manager.service.core;

import org.apache.inlong.common.pojo.agent.AgentConfigInfo;
import org.apache.inlong.common.pojo.agent.AgentConfigRequest;
import org.apache.inlong.common.pojo.agent.TaskRequest;
import org.apache.inlong.common.pojo.agent.TaskResult;
import org.apache.inlong.common.pojo.agent.TaskSnapshotRequest;
Expand Down Expand Up @@ -44,6 +46,14 @@ public interface AgentService {
*/
void report(TaskRequest request);

/**
* Agent cluster config.
*
* @param request Request of the agent config.
* @return Agent config info result.
*/
AgentConfigInfo getAgentConfig(AgentConfigRequest request);

/**
* Agent pull task config.
*
Expand Down
Loading
Loading