Skip to content

Commit

Permalink
[CYB - 194][UI] Git support for config pipeline. (#84)
Browse files Browse the repository at this point in the history
  • Loading branch information
vpavlenko-cv authored Nov 27, 2024
1 parent ef5a5ca commit 8748f24
Show file tree
Hide file tree
Showing 75 changed files with 1,963 additions and 531 deletions.
6 changes: 6 additions & 0 deletions flink-cyber/caracal-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void testCustomGenerator() throws Exception {
int count = 100;
JobTester.startTest(createPipeline(ParameterTool.fromMap(ImmutableMap.of(
PARAMS_RECORDS_LIMIT, String.valueOf(count),
PARAMS_GENERATOR_CONFIG, Objects.requireNonNull(getClass().getClassLoader().getResource("config/generator_config.json")).toExternalForm()
PARAMS_GENERATOR_CONFIG, Objects.requireNonNull(getClass().getClassLoader().getResource("config/generator_config_test.json")).toExternalForm()
))));

JobTester.stopTest();
Expand Down
9 changes: 5 additions & 4 deletions flink-cyber/cyber-services/cyber-service-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,11 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-compress</artifactId>
Expand All @@ -44,10 +49,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.cloudera.service.common.request;

import lombok.Data;

@Data
public abstract class AbstractRequest {
protected final String requestId;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package com.cloudera.service.common.request;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.List;


@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = true)
public class ClusterPipelineRequest extends AbstractRequest {
private final String pipelineName;
private final String branch;
private final String profileName;
private final String parserName;
private final String gitUrl;
private final String userName;
private final String password;
private final List<String> jobs;

@JsonCreator
public ClusterPipelineRequest(
@JsonProperty("requestId") String requestId,
@JsonProperty("pipelineName") String pipelineName,
@JsonProperty("branch") String branch,
@JsonProperty("profileName") String profileName,
@JsonProperty("parserName") String parserName,
@JsonProperty("gitUrl") String gitUrl,
@JsonProperty("userName") String userName,
@JsonProperty("password") String password,
@JsonProperty("jobs") List<String> jobs) {
super(requestId);
this.pipelineName = pipelineName;
this.branch = branch;
this.profileName = profileName;
this.gitUrl = gitUrl;
this.userName = userName;
this.password = password;
this.jobs = jobs;
this.parserName = parserName;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.cloudera.service.common.request;

import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.util.List;

@Getter
@Setter
@EqualsAndHashCode(callSuper = true)
@ToString
public class ClusterRequest extends AbstractRequest{
private final String clusterId;
private final List<String> jobs;

public ClusterRequest(String requestId, String clusterId, List<String> jobs) {
super(requestId);
this.clusterId = clusterId;
this.jobs = jobs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public class RequestBody {
private String pipelineName;
private String branch;
private String profileName;
private String parserName;
private List<String> jobs;
private byte[] payload;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.cloudera.service.common.request;

public enum RequestType {
GET_ALL_CLUSTERS_SERVICE_REQUEST, GET_CLUSTER_SERVICE_REQUEST, START_JOB_REQUEST, RESTART_JOB_REQUEST, STOP_JOB_REQUEST, GET_JOB_CONFIG_REQUEST, CREATE_EMPTY_PIPELINE, START_ARCHIVE_PIPELINE, UPDATE_JOB_CONFIG_REQUEST
GET_ALL_CLUSTERS_SERVICE_REQUEST, GET_CLUSTER_SERVICE_REQUEST, START_JOB_REQUEST, RESTART_JOB_REQUEST, STOP_JOB_REQUEST, GET_JOB_CONFIG_REQUEST, CREATE_EMPTY_PIPELINE, START_PIPELINE, UPDATE_JOB_CONFIG_REQUEST
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public class Job {

private String jobFullName;

private String jobName;
private String confName;

private JobStatus jobState;

Expand Down Expand Up @@ -50,7 +50,7 @@ public String[] getScript(Job job) {
switch (this) {
case PROFILE:
case PARSER:
return new String[]{scriptName, job.getJobBranch(), job.getJobPipeline(), job.getJobName()};
return new String[]{scriptName, job.getJobBranch(), job.getJobPipeline(), job.getConfName()};
case INDEX:
case TRIAGE:
return new String[]{scriptName, job.getJobBranch(), job.getJobPipeline()};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.List;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class Pipeline {
String id;
String name;
String clusterName;
String date;
String userName;
private String id;
private String name;
private String clusterName;
private String date;
private List<String> jobs;
private String userName;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
package com.cloudera.service.common.response;

public enum ResponseType {
GET_ALL_CLUSTERS_SERVICE_RESPONSE, GET_CLUSTER_SERVICE_RESPONSE, START_JOB_RESPONSE, RESTART_JOB_RESPONSE, STOP_JOB_RESPONSE, GET_JOB_CONFIG_RESPONSE, UPDATE_JOB_CONFIG_RESPONSE, CREATE_EMPTY_PIPELINE_RESPONSE, START_ARCHIVE_PIPELINE_RESPONSE, ERROR_RESPONSE
GET_ALL_CLUSTERS_SERVICE_RESPONSE, GET_CLUSTER_SERVICE_RESPONSE, START_JOB_RESPONSE, RESTART_JOB_RESPONSE, STOP_JOB_RESPONSE, GET_JOB_CONFIG_RESPONSE, UPDATE_JOB_CONFIG_RESPONSE, CREATE_EMPTY_PIPELINE_RESPONSE, START_PIPELINE_RESPONSE, ERROR_RESPONSE
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorOutputStream;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.flink.core.fs.FileStatus;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -37,7 +38,7 @@ public static void compressToTarGzFile(String inputPath, String outputPath) thro

public static byte[] compressToTarGzInMemory(String inputPath, boolean base64) throws IOException {
final byte[] bytes = compressToTarGzInMemory(inputPath);
if (base64){
if (base64) {
return Base64.getEncoder().encode(bytes);
} else {
return bytes;
Expand All @@ -51,6 +52,35 @@ public static byte[] compressToTarGzInMemory(String inputPath) throws IOExceptio
}
}

public static byte[] compressToTarGzInMemory(List<Pair<String, byte[]>> files) {
try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
if (files == null || files.isEmpty()) {
log.info("There are no files.");
return bos.toByteArray();
}
try (BufferedOutputStream buffOut = new BufferedOutputStream(bos);
GzipCompressorOutputStream gzOut = new GzipCompressorOutputStream(buffOut);
TarArchiveOutputStream tOut = new TarArchiveOutputStream(gzOut)) {
try {
for (Pair<String, byte[]> file : files) {
TarArchiveEntry tarEntry = new TarArchiveEntry(
file.getLeft());
tarEntry.setSize(file.getRight().length);
tOut.putArchiveEntry(tarEntry);
tOut.write(file.getRight());
tOut.closeArchiveEntry();
}
} finally {
tOut.finish();
}
}
return bos.toByteArray();
} catch (IOException e) {
log.error("IOException occurs while processing {}", e.getMessage());
return new byte[0];
}
}

private static void compressToTarGz(String inputPath, OutputStream outputStream) throws IOException {
final List<FileStatus> fileList = FileUtil.listFiles(inputPath, true);
if (fileList == null || fileList.isEmpty()) {
Expand Down Expand Up @@ -111,7 +141,7 @@ public static void decompressFromTarGzInMemory(byte[] rawData, String outputPath
throw new IOException("Provided null as .tar.gz data which is not allowed!");
}
final byte[] data;
if (base64){
if (base64) {
data = Base64.getDecoder().decode(rawData);
} else {
data = rawData;
Expand Down
10 changes: 7 additions & 3 deletions flink-cyber/cyber-services/cyber-worker-service/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,19 @@
<artifactId>spring-boot-autoconfigure</artifactId>
</dependency>
<dependency>
<groupId>com.cloudera.cyber</groupId>
<artifactId>metron-common</artifactId>
<version>${project.parent.version}</version>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
package com.cloudera.service.common.config.kafka;
package com.cloudera.cyber.restcli.configuration;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

@Getter
@Setter
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
package com.cloudera.cyber.restcli.configuration;

import com.cloudera.service.common.config.kafka.ClouderaKafkaProperties;
import com.cloudera.service.common.request.RequestBody;
import com.cloudera.service.common.response.ResponseBody;
import lombok.AllArgsConstructor;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import com.cloudera.service.common.response.ResponseType;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -88,12 +87,12 @@ public Message<ResponseBody> handleMessage(RequestBody requestBody, @Header(Kafk
} catch (Exception e) {
return handleErrorResponse(e, replyTo, correlationId);
}
case START_ARCHIVE_PIPELINE:
case START_PIPELINE:
try {
pipelineService.extractPipeline(requestBody.getPayload(), requestBody.getPipelineName(), requestBody.getBranch());
pipelineService.startPipelineJob(requestBody.getPipelineName(), requestBody.getBranch(), requestBody.getProfileName(), requestBody.getJobs());
pipelineService.startPipelineJob(requestBody.getPipelineName(), requestBody.getBranch(), requestBody.getProfileName(), requestBody.getProfileName(), requestBody.getJobs());
final ResponseBody responseBody = ResponseBody.builder().build();
return buildResponseMessage(responseBody, ResponseType.START_ARCHIVE_PIPELINE_RESPONSE, replyTo, correlationId);
return buildResponseMessage(responseBody, ResponseType.START_PIPELINE_RESPONSE, replyTo, correlationId);
} catch (Exception e) {
log.error("Exception while processing the Start All request {}", e.getMessage());
return handleErrorResponse(e, replyTo, correlationId);
Expand Down
Loading

0 comments on commit 8748f24

Please sign in to comment.