Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test PR to see if removing enum from lists helps serialise Job type #390

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.datastax.mgmtapi.rpc.RpcParam;
import com.datastax.mgmtapi.rpc.RpcRegistry;
import com.datastax.mgmtapi.util.Job;
import com.datastax.mgmtapi.util.JobDto;
import com.datastax.mgmtapi.util.JobExecutor;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
Expand Down Expand Up @@ -79,22 +80,29 @@ public synchronized void unregister() {
}

@Rpc(name = "jobStatus")
public Map<String, String> getJobStatus(@RpcParam(name = "job_id") String jobId) {
Map<String, String> resultMap = new HashMap<>();
public JobDto getJobStatus(@RpcParam(name = "job_id") String jobId) {
Job jobWithId = service.getJobWithId(jobId);
if (jobWithId == null) {
return resultMap;
}
resultMap.put("id", jobWithId.getJobId());
resultMap.put("type", jobWithId.getJobType());
resultMap.put("status", jobWithId.getStatus().name());
resultMap.put("submit_time", String.valueOf(jobWithId.getSubmitTime()));
resultMap.put("end_time", String.valueOf(jobWithId.getFinishedTime()));
return new JobDto(null, null);
}
JobDto jobStatus = new JobDto(jobWithId.getJobType(), jobWithId.getJobId());
jobStatus.setStatus(jobWithId.getStatus().name());
jobWithId.statusChanges.stream()
.forEach(
statusChange -> {
JobDto.StatusChange changeToAdd =
jobStatus.new StatusChange(statusChange.getStatus(), statusChange.message);
changeToAdd.changeTime = statusChange.getChangeTime();
jobStatus.statusChanges.add(changeToAdd);
});
jobStatus.submitTime = jobWithId.getSubmitTime();
jobStatus.finishedTime = jobWithId.getFinishedTime();
jobStatus.startTime = jobWithId.startTime;
if (jobWithId.getStatus() == Job.JobStatus.ERROR) {
resultMap.put("error", jobWithId.getError().getLocalizedMessage());
jobStatus.setError(jobWithId.getError());
}

return resultMap;
return jobStatus;
}

@Rpc(name = "setFullQuerylog")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,19 @@ public enum JobStatus {
WAITING;
}

private String jobId;
private String jobType;
private JobStatus status;
private long submitTime;
private long startTime;
private long finishedTime;
private Throwable error;
public String jobId;
public String jobType;
public JobStatus status;
public long submitTime;
public long startTime;
public long finishedTime;
public Throwable error;

public class StatusChange {
ProgressEventType status;
long changeTime;
public ProgressEventType status;
public long changeTime;

String message;
public String message;

public StatusChange(ProgressEventType type, String message) {
changeTime = System.currentTimeMillis();
Expand All @@ -50,7 +50,7 @@ public String getMessage() {
}
}

private List<StatusChange> statusChanges;
public List<StatusChange> statusChanges;

public Job(String jobType, String jobId) {
this.jobType = jobType;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
/*
* Copyright DataStax, Inc.
*
* Please see the included license file for details.
*/
package com.datastax.mgmtapi.util;

import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import org.apache.cassandra.utils.progress.ProgressEventType;

public class JobDto {

public String jobId;
public String jobType;
public String status;
public long submitTime;
public long startTime;
public long finishedTime;
public Throwable error;

public class StatusChange {
public ProgressEventType status;
public long changeTime;

public String message;

public StatusChange(ProgressEventType type, String message) {
changeTime = System.currentTimeMillis();
status = type;
this.message = message;
}

public ProgressEventType getStatus() {
return status;
}

public long getChangeTime() {
return changeTime;
}

public String getMessage() {
return message;
}
}

public List<StatusChange> statusChanges;

public JobDto(String jobType, String jobId) {
this.jobType = jobType;
this.jobId = jobId;
submitTime = System.currentTimeMillis();
status = "WAITING";
statusChanges = new ArrayList<>();
}

@VisibleForTesting
// This method is only for testing purposes
public void setJobId(String jobId) {
this.jobId = jobId;
}

public String getJobId() {
return jobId;
}

public String getJobType() {
return jobType;
}

public String getStatus() {
return status;
}

public void setStatus(String status) {
this.status = status;
}

public void setStatusChange(ProgressEventType type, String message) {
statusChanges.add(new StatusChange(type, message));
}

public List<StatusChange> getStatusChanges() {
return statusChanges;
}

public long getSubmitTime() {
return submitTime;
}

public long getFinishedTime() {
return finishedTime;
}

public void setFinishedTime(long finishedTime) {
this.finishedTime = finishedTime;
}

public Throwable getError() {
return error;
}

public void setError(Throwable error) {
this.error = error;
}

public void setStartTime(long startTime) {
this.startTime = startTime;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ public abstract class BaseDockerIntegrationTest {
protected static final String BASE_PATH = "http://localhost:8080/api/v0";
protected static final String BASE_PATH_V1 = "http://localhost:8080/api/v1";
protected static final String BASE_PATH_V2 = "http://localhost:8080/api/v2";
protected static final String BASE_HOST = "http://localhost:8080";
protected static final URL BASE_URL;
protected static final ObjectMapper JSON_MAPPER = new ObjectMapper();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/
package com.datastax.mgmtapi;

import static java.util.concurrent.TimeUnit.SECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatCode;
import static org.awaitility.Awaitility.await;
Expand All @@ -31,6 +32,8 @@
import com.datastax.mgmtapi.resources.models.ScrubRequest;
import com.datastax.mgmtapi.resources.models.Table;
import com.datastax.mgmtapi.resources.models.TakeSnapshotRequest;
import com.datastax.mgmtapi.resources.v2.models.RepairParallelism;
import com.datastax.mgmtapi.resources.v2.models.RepairRequestResponse;
import com.datastax.oss.driver.api.core.metadata.schema.ClusteringOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
Expand All @@ -49,7 +52,6 @@
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.http.HttpStatus;
import org.apache.http.client.utils.URIBuilder;
Expand Down Expand Up @@ -112,7 +114,7 @@ public static void ensureStarted() throws IOException {

if (ready) break;

Uninterruptibles.sleepUninterruptibly(10, TimeUnit.SECONDS);
Uninterruptibles.sleepUninterruptibly(10, SECONDS);
}

logger.info("CASSANDRA ALIVE: {}", ready);
Expand Down Expand Up @@ -1033,4 +1035,63 @@ public void testMoveNode() throws IOException, URISyntaxException {
"status", value -> assertThat(value).isIn("COMPLETED", "ERROR"));
});
}

@Test
public void testEnsureStatusChanges() throws Exception {
assumeTrue(IntegrationTestUtils.shouldRun());
ensureStarted();
NettyHttpClient client = new NettyHttpClient(BASE_URL);

com.datastax.mgmtapi.resources.v2.models.RepairRequest req =
new com.datastax.mgmtapi.resources.v2.models.RepairRequest(
"system_distributed",
null,
true,
true,
Collections.singletonList(
new com.datastax.mgmtapi.resources.v2.models.RingRange(-1L, 100L)),
RepairParallelism.SEQUENTIAL,
null,
null);

logger.info("Sending repair request: {}", req);
URI repairUri = new URIBuilder(BASE_PATH_V2 + "/repairs").build();
Pair<Integer, String> repairResp =
client
.put(repairUri.toURL(), new ObjectMapper().writeValueAsString(req))
.thenApply(this::responseAsCodeAndBody)
.join();
System.out.println("repairResp was " + repairResp);
String jobID =
new ObjectMapper().readValue(repairResp.getRight(), RepairRequestResponse.class).repairID;
Integer repairID =
Integer.parseInt(
jobID.substring(7) // Trimming off "repair-" prefix.
);
logger.info("Repair ID: {}", repairID);
assertThat(repairID).isNotNull();
assertThat(repairID).isGreaterThan(0);

URI statusUri =
new URIBuilder(BASE_PATH_V2 + "/ops/executor/job").addParameter("job_id", jobID).build();
Pair<Integer, String> statusResp =
client.get(statusUri.toURL()).thenApply(this::responseAsCodeAndBody).join();
logger.info("Repair job status: {}", statusResp);
Job jobStatus = new ObjectMapper().readValue(statusResp.getRight(), Job.class);

assertThat(jobStatus.getStatus()).isNotNull();
assertThat(jobStatus.getStatusChanges()).isNotNull();
await()
.atMost(5, SECONDS)
.until(
() -> {
Pair<Integer, String> statusResp2 =
client.get(statusUri.toURL()).thenApply(this::responseAsCodeAndBody).join();
logger.info("Repair job status: {}", statusResp);
Job jobStatus2 = new ObjectMapper().readValue(statusResp.getRight(), Job.class);
return jobStatus2.getStatusChanges().size() > 0
&& jobStatus2.getStatus()
== com.datastax.mgmtapi.resources.models.Job.JobStatus.COMPLETED;
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,36 @@ public CompletableFuture<FullHttpResponse> post(
return result;
}

public CompletableFuture<FullHttpResponse> put(
URL url, final CharSequence body, String contentType) throws UnsupportedEncodingException {
CompletableFuture<FullHttpResponse> result = new CompletableFuture<>();

if (!activeRequestFuture.compareAndSet(null, result))
throw new RuntimeException("outstanding request");

DefaultFullHttpRequest request =
new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, url.getFile());
request.headers().set(HttpHeaders.Names.CONTENT_TYPE, contentType);
request.headers().set(HttpHeaderNames.HOST, url.getHost());

if (body != null) {
request.content().writeBytes(body.toString().getBytes(CharsetUtil.UTF_8.name()));
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, request.content().readableBytes());
} else {
request.headers().set(HttpHeaders.Names.CONTENT_LENGTH, 0);
}

// Send the HTTP request.
client.writeAndFlush(request);

return result;
}

public CompletableFuture<FullHttpResponse> put(URL url, final CharSequence body)
throws UnsupportedEncodingException {
return post(url, body, "application/json");
}

public CompletableFuture<FullHttpResponse> delete(URL url) {
return buildAndSendRequest(HttpMethod.DELETE, url);
}
Expand Down