Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improve][Zeta] Add check for submit duplicate job id #7021

Merged
merged 6 commits into from
Jul 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,8 @@ private void submitJob(JobImmutableInformation jobImmutableInformation) {
jobImmutableInformation.getJobId(),
seaTunnelHazelcastClient
.getSerializationService()
.toData(jobImmutableInformation));
.toData(jobImmutableInformation),
jobImmutableInformation.isStartWithSavePoint());
PassiveCompletableFuture<Void> submitJobFuture =
seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(request);
submitJobFuture.join();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,55 @@ public void testSetJobId() throws ExecutionException, InterruptedException {
}
}

@Test
public void testSetJobIdDuplicate() {
Common.setDeployMode(DeployMode.CLIENT);
String filePath = TestUtils.getResource("/streaming_fake_to_console.conf");
JobConfig jobConfig = new JobConfig();
jobConfig.setName("testSetJobId");
long jobId = System.currentTimeMillis();
SeaTunnelClient seaTunnelClient = createSeaTunnelClient();
JobClient jobClient = seaTunnelClient.getJobClient();
try {
ClientJobExecutionEnvironment jobExecutionEnv =
seaTunnelClient.createExecutionContext(
filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId);

final ClientJobProxy clientJobProxy = jobExecutionEnv.execute();

Assertions.assertEquals(jobId, clientJobProxy.getJobId());

await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"RUNNING", jobClient.getJobStatus(jobId)));
jobClient.cancelJob(jobId);
await().atMost(30000, TimeUnit.MILLISECONDS)
.untilAsserted(
() ->
Assertions.assertEquals(
"CANCELED", jobClient.getJobStatus(jobId)));

ClientJobExecutionEnvironment jobExecutionEnvWithSameJobId =
seaTunnelClient.createExecutionContext(
filePath, new ArrayList<>(), jobConfig, SEATUNNEL_CONFIG, jobId);
Exception exception =
Assertions.assertThrows(
Exception.class,
() -> jobExecutionEnvWithSameJobId.execute().waitForJobCompleteV2());
Assertions.assertEquals(
String.format(
"The job id %s has already been submitted and is not starting with a savepoint.",
jobId),
exception.getCause().getMessage());
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
seaTunnelClient.close();
}
}

@Test
public void testGetJobInfo() {
Common.setDeployMode(DeployMode.CLIENT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,13 @@
import static com.hazelcast.client.impl.protocol.ClientMessage.RESPONSE_BACKUP_ACKS_FIELD_OFFSET;
import static com.hazelcast.client.impl.protocol.ClientMessage.TYPE_FIELD_OFFSET;
import static com.hazelcast.client.impl.protocol.ClientMessage.UNFRAGMENTED_MESSAGE;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BOOLEAN_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.BYTE_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.INT_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.LONG_SIZE_IN_BYTES;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeBoolean;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.decodeLong;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeBoolean;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeInt;
import static com.hazelcast.client.impl.protocol.codec.builtin.FixedSizeTypesCodec.encodeLong;

Expand All @@ -37,16 +40,19 @@
* to seatunnel-engine/seatunnel-engine-core/src/main/resources/client-protocol-definition/SeaTunnelEngine.yaml
*/

@Generated("ebea440b36898863958c102f47603fee")
/** */
@Generated("9933654790f5fbe98d0ee1c248bc999b")
public final class SeaTunnelSubmitJobCodec {
// hex: 0xDE0200
public static final int REQUEST_MESSAGE_TYPE = 14549504;
// hex: 0xDE0201
public static final int RESPONSE_MESSAGE_TYPE = 14549505;
private static final int REQUEST_JOB_ID_FIELD_OFFSET =
PARTITION_ID_FIELD_OFFSET + INT_SIZE_IN_BYTES;
private static final int REQUEST_INITIAL_FRAME_SIZE =
private static final int REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET =
REQUEST_JOB_ID_FIELD_OFFSET + LONG_SIZE_IN_BYTES;
private static final int REQUEST_INITIAL_FRAME_SIZE =
REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET + BOOLEAN_SIZE_IN_BYTES;
private static final int RESPONSE_INITIAL_FRAME_SIZE =
RESPONSE_BACKUP_ACKS_FIELD_OFFSET + BYTE_SIZE_IN_BYTES;

Expand All @@ -57,10 +63,14 @@ public static class RequestParameters {
public long jobId;

public com.hazelcast.internal.serialization.Data jobImmutableInformation;

public boolean isStartWithSavePoint;
}

public static ClientMessage encodeRequest(
long jobId, com.hazelcast.internal.serialization.Data jobImmutableInformation) {
long jobId,
com.hazelcast.internal.serialization.Data jobImmutableInformation,
boolean isStartWithSavePoint) {
ClientMessage clientMessage = ClientMessage.createForEncode();
clientMessage.setRetryable(false);
clientMessage.setOperationName("SeaTunnel.SubmitJob");
Expand All @@ -69,6 +79,10 @@ public static ClientMessage encodeRequest(
encodeInt(initialFrame.content, TYPE_FIELD_OFFSET, REQUEST_MESSAGE_TYPE);
encodeInt(initialFrame.content, PARTITION_ID_FIELD_OFFSET, -1);
encodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET, jobId);
encodeBoolean(
initialFrame.content,
REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET,
isStartWithSavePoint);
clientMessage.add(initialFrame);
DataCodec.encode(clientMessage, jobImmutableInformation);
return clientMessage;
Expand All @@ -80,6 +94,8 @@ public static SeaTunnelSubmitJobCodec.RequestParameters decodeRequest(
RequestParameters request = new RequestParameters();
ClientMessage.Frame initialFrame = iterator.next();
request.jobId = decodeLong(initialFrame.content, REQUEST_JOB_ID_FIELD_OFFSET);
request.isStartWithSavePoint =
decodeBoolean(initialFrame.content, REQUEST_IS_START_WITH_SAVE_POINT_FIELD_OFFSET);
request.jobImmutableInformation = DataCodec.decode(iterator);
return request;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ methods:
nullable: false
since: 2.0
doc: ''
- name: isStartWithSavePoint
type: boolean
nullable: false
since: 2.0
doc: ''
response: {}

- id: 3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class CoordinatorService {
* <p>This IMap is used to recovery runningJobStateIMap in JobMaster when a new master node
* active
*/
IMap<Object, Object> runningJobStateIMap;
private IMap<Object, Object> runningJobStateIMap;

/**
* IMap key is one of jobId {@link
Expand All @@ -130,13 +130,13 @@ public class CoordinatorService {
* <p>This IMap is used to recovery runningJobStateTimestampsIMap in JobMaster when a new master
* node active
*/
IMap<Object, Long[]> runningJobStateTimestampsIMap;
private IMap<Object, Long[]> runningJobStateTimestampsIMap;

/**
* key: job id; <br>
* value: job master;
*/
private Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();
private final Map<Long, JobMaster> runningJobMasterMap = new ConcurrentHashMap<>();

/**
* IMap key is {@link PipelineLocation}
Expand Down Expand Up @@ -210,8 +210,7 @@ private JobEventProcessor createJobEventProcessor(
handlers.add(httpReportHandler);
}
logger.info("Loaded event handlers: " + handlers);
JobEventProcessor eventProcessor = new JobEventProcessor(handlers);
return eventProcessor;
return new JobEventProcessor(handlers);
}

public JobHistoryService getJobHistoryService() {
Expand Down Expand Up @@ -437,7 +436,8 @@ public ResourceManager getResourceManager() {
}

/** call by client to submit job */
public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInformation) {
public PassiveCompletableFuture<Void> submitJob(
long jobId, Data jobImmutableInformation, boolean isStartWithSavePoint) {
CompletableFuture<Void> jobSubmitFuture = new CompletableFuture<>();

// Check if the current jobID is already running. If so, complete the submission
Expand Down Expand Up @@ -468,6 +468,13 @@ public PassiveCompletableFuture<Void> submitJob(long jobId, Data jobImmutableInf
executorService.submit(
() -> {
try {
if (!isStartWithSavePoint
&& getJobHistoryService().getJobMetrics(jobId) != null) {
throw new JobException(
String.format(
"The job id %s has already been submitted and is not starting with a savepoint.",
jobId));
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we move this step to ClientJobExecutionEnvironment or RestJobExecutionEnvironment

image

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

forget this, it can‘t check the case that use same job id submit at the same time

runningJobInfoIMap.put(
jobId,
new JobInfo(System.currentTimeMillis(), jobImmutableInformation));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,15 @@

public class SubmitJobOperation extends AbstractJobAsyncOperation {
private Data jobImmutableInformation;
private boolean isStartWithSavePoint;

public SubmitJobOperation() {}

public SubmitJobOperation(long jobId, @NonNull Data jobImmutableInformation) {
public SubmitJobOperation(
long jobId, @NonNull Data jobImmutableInformation, boolean isStartWithSavePoint) {
super(jobId);
this.jobImmutableInformation = jobImmutableInformation;
this.isStartWithSavePoint = isStartWithSavePoint;
}

@Override
Expand All @@ -48,17 +51,21 @@ public int getClassId() {
protected void writeInternal(ObjectDataOutput out) throws IOException {
super.writeInternal(out);
IOUtil.writeData(out, jobImmutableInformation);
out.writeBoolean(isStartWithSavePoint);
}

@Override
protected void readInternal(ObjectDataInput in) throws IOException {
super.readInternal(in);
jobImmutableInformation = IOUtil.readData(in);
isStartWithSavePoint = in.readBoolean();
}

@Override
protected PassiveCompletableFuture<?> doRun() throws Exception {
SeaTunnelServer seaTunnelServer = getService();
return seaTunnelServer.getCoordinatorService().submitJob(jobId, jobImmutableInformation);
return seaTunnelServer
.getCoordinatorService()
.submitJob(jobId, jobImmutableInformation, isStartWithSavePoint);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ protected SubmitJobTask(ClientMessage clientMessage, Node node, Connection conne

@Override
protected Operation prepareOperation() {
return new SubmitJobOperation(parameters.jobId, parameters.jobImmutableInformation);
return new SubmitJobOperation(
parameters.jobId,
parameters.jobImmutableInformation,
parameters.isStartWithSavePoint);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,9 @@ private void handleSubmitJob(HttpPostCommand httpPostCommand, String uri)
NodeEngineUtil.sendOperationToMasterNode(
getNode().nodeEngine,
new SubmitJobOperation(
jobId, getNode().nodeEngine.toData(jobImmutableInformation)))
jobId,
getNode().nodeEngine.toData(jobImmutableInformation),
jobImmutableInformation.isStartWithSavePoint()))
.join();

} else {
Expand Down Expand Up @@ -231,7 +233,9 @@ private void submitJob(
.toData(jobImmutableInformation);
PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
coordinatorService.submitJob(
Long.parseLong(jobConfig.getJobContext().getJobId()), data);
Long.parseLong(jobConfig.getJobContext().getJobId()),
data,
jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ protected void startJob(Long jobId, String path, boolean isStartWithSavePoint) {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
server.getCoordinatorService()
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,9 @@ public void testRestoreWhenMasterNodeSwitch() throws InterruptedException, IOExc

Data data = instance1.getSerializationService().toData(jobImmutableInformation);

coordinatorService.submitJob(jobId, data).join();
coordinatorService
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
.join();

// waiting for job status turn to running
await().atMost(20000, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,9 @@ public void testClearCoordinatorService() {
Data data =
coordinatorServiceTest.getSerializationService().toData(jobImmutableInformation);

coordinatorService.submitJob(jobId, data).join();
coordinatorService
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
.join();

// waiting for job status turn to running
await().atMost(10000, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -174,7 +176,9 @@ public void testJobRestoreWhenMasterNodeSwitch() throws InterruptedException {

Data data = instance1.getSerializationService().toData(jobImmutableInformation);

coordinatorService.submitJob(jobId, data).join();
coordinatorService
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint())
.join();

// waiting for job status turn to running
await().atMost(20000, TimeUnit.MILLISECONDS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,8 @@ private void startJob(Long jobid, String path) {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobid, data);
server.getCoordinatorService()
.submitJob(jobid, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ public void testTask() throws MalformedURLException {
jobImmutableInformation.getJobId(),
nodeEngine
.getSerializationService()
.toData(jobImmutableInformation));
.toData(jobImmutableInformation),
jobImmutableInformation.isStartWithSavePoint());

Assertions.assertNotNull(voidPassiveCompletableFuture);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,8 @@ private void startJob(Long jobid, String path) {
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobid, data);
server.getCoordinatorService()
.submitJob(jobid, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private JobMaster newJobInstanceWithRunningState(long jobId, boolean restore)
Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation);

PassiveCompletableFuture<Void> voidPassiveCompletableFuture =
server.getCoordinatorService().submitJob(jobId, data);
server.getCoordinatorService()
.submitJob(jobId, data, jobImmutableInformation.isStartWithSavePoint());
voidPassiveCompletableFuture.join();

JobMaster jobMaster = server.getCoordinatorService().getJobMaster(jobId);
Expand Down
Loading