Skip to content

Commit

Permalink
Fix Imap operation timeout bug
Browse files Browse the repository at this point in the history
  • Loading branch information
EricJoy2048 committed May 30, 2023
1 parent f9563ef commit bf8db4a
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.common.utils;

import lombok.NonNull;

import java.io.PrintWriter;
import java.io.StringWriter;

Expand All @@ -38,4 +40,13 @@ public static String getMessage(Throwable e) {
throw new RuntimeException("Failed to print exception logs", e1);
}
}

public static Throwable getRootException(@NonNull Throwable e) {
Throwable cause = e.getCause();
if (cause != null) {
return getRootException(cause);
} else {
return e;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.common.utils;

import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ExceptionUtilsTest {
@Test
public void testGetRootException() {
Exception exception =
new UnsupportedOperationException(
new SeaTunnelException(
new SeaTunnelRuntimeException(
CommonErrorCode.CLASS_NOT_FOUND, "class not fount")));
Throwable throwable = ExceptionUtils.getRootException(exception);
Assertions.assertTrue(throwable instanceof SeaTunnelRuntimeException);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.seatunnel.engine.client.SeaTunnelHazelcastClient;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.Job;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
Expand All @@ -35,7 +36,6 @@
import org.apache.commons.lang3.StringUtils;

import com.hazelcast.client.impl.protocol.ClientMessage;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
import lombok.NonNull;
Expand Down Expand Up @@ -104,8 +104,7 @@ public JobStatus waitForJobComplete() {
100000,
true,
exception ->
exception.getCause()
instanceof OperationTimeoutException,
ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
if (jobResult == null) {
throw new SeaTunnelEngineException("failed to fetch job result");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.apache.seatunnel.engine.common.utils;

import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.function.ConsumerWithException;
import org.apache.seatunnel.common.utils.function.RunnableWithException;
import org.apache.seatunnel.common.utils.function.SupplierWithException;
Expand All @@ -27,6 +28,8 @@

import com.hazelcast.client.impl.protocol.ClientExceptionFactory;
import com.hazelcast.client.impl.protocol.ClientProtocolErrorCodes;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.instance.impl.OutOfMemoryErrorDispatcher;
import lombok.NonNull;

Expand Down Expand Up @@ -142,4 +145,11 @@ public static <R, E extends Throwable> R sneaky(SupplierWithException<R, E> supp
// This method wouldn't be executed.
throw new RuntimeException("Never throw here.");
}

public static boolean isOperationNeedRetryException(@NonNull Throwable e) {
Throwable exception = ExceptionUtils.getRootException(e);
return exception instanceof HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException
|| exception instanceof OperationTimeoutException;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@

import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.core.checkpoint.CheckpointIDCounter;
import org.apache.seatunnel.engine.core.job.PipelineStatus;

import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.map.IMap;
import com.hazelcast.spi.impl.NodeEngine;

Expand Down Expand Up @@ -56,7 +56,7 @@ public void start() throws Exception {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception -> exception instanceof HazelcastInstanceNotActiveException,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.server.SeaTunnelServer;
Expand All @@ -41,8 +42,6 @@

import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.Member;
import com.hazelcast.core.HazelcastInstanceNotActiveException;
import com.hazelcast.core.OperationTimeoutException;
import com.hazelcast.flakeidgen.FlakeIdGenerator;
import com.hazelcast.logging.ILogger;
import com.hazelcast.logging.Logger;
Expand Down Expand Up @@ -366,12 +365,7 @@ private boolean turnToEndState(@NonNull ExecutionState endState) {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
exception instanceof OperationTimeoutException
|| exception
instanceof
HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(ExceptionUtils.getMessage(e));
Expand Down Expand Up @@ -436,11 +430,7 @@ public boolean updateTaskState(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
exception instanceof OperationTimeoutException
|| exception
instanceof
HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException,
ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(ExceptionUtils.getMessage(e));
Expand Down Expand Up @@ -562,12 +552,7 @@ private void resetExecutionState() {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
exception instanceof OperationTimeoutException
|| exception
instanceof
HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
} catch (Exception e) {
LOGGER.warning(ExceptionUtils.getMessage(e));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.common.utils.ExceptionUtils;
import org.apache.seatunnel.common.utils.RetryUtils;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.seatunnel.engine.common.utils.ExceptionUtil;
import org.apache.seatunnel.engine.common.utils.PassiveCompletableFuture;
import org.apache.seatunnel.engine.core.job.JobImmutableInformation;
import org.apache.seatunnel.engine.core.job.PipelineExecutionState;
Expand Down Expand Up @@ -293,10 +294,7 @@ private void subPlanDone(PipelineStatus pipelineStatus) throws Exception {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
exception instanceof OperationTimeoutException
|| exception instanceof HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
}

Expand Down Expand Up @@ -331,11 +329,7 @@ private void turnToEndState(@NonNull PipelineStatus endState) throws Exception {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
exception instanceof OperationTimeoutException
|| exception
instanceof HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
this.currPipelineStatus = endState;
}
Expand Down Expand Up @@ -390,12 +384,7 @@ public boolean updatePipelineState(
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
exception instanceof OperationTimeoutException
|| exception
instanceof
HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
this.currPipelineStatus = targetState;
return true;
Expand Down Expand Up @@ -532,10 +521,7 @@ private void resetPipelineState() throws Exception {
new RetryUtils.RetryMaterial(
Constant.OPERATION_RETRY_TIME,
true,
exception ->
exception instanceof OperationTimeoutException
|| exception instanceof HazelcastInstanceNotActiveException
|| exception instanceof InterruptedException,
exception -> ExceptionUtil.isOperationNeedRetryException(exception),
Constant.OPERATION_RETRY_SLEEP));
}

Expand Down

0 comments on commit bf8db4a

Please sign in to comment.