Skip to content

Commit

Permalink
NIFI-13397 Updated PutDatabaseRecord to retry transient ProcessExcept…
Browse files Browse the repository at this point in the history
…ion causes

This closes apache#8964

Signed-off-by: David Handermann <exceptionfactory@apache.org>
  • Loading branch information
jrsteinebrey authored and exceptionfactory committed Jun 17, 2024
1 parent a52d6a8 commit bc95799
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -585,7 +585,7 @@ private void routeOnException(final ProcessContext context, final ProcessSession
// When an Exception is thrown, we want to route to 'retry' if we expect that attempting the same request again
// might work. Otherwise, route to failure. SQLTransientException is a specific type that indicates that a retry may work.
final Relationship relationship;
final Throwable toAnalyze = (e instanceof BatchUpdateException) ? e.getCause() : e;
final Throwable toAnalyze = (e instanceof BatchUpdateException || e instanceof ProcessException) ? e.getCause() : e;
if (toAnalyze instanceof SQLTransientException) {
relationship = REL_RETRY;
flowFile = session.penalize(flowFile);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@
import java.sql.SQLDataException;
import java.sql.SQLException;
import java.sql.SQLFeatureNotSupportedException;
import java.sql.SQLTransientException;
import java.sql.Statement;
import java.time.LocalDate;
import java.time.ZoneOffset;
Expand Down Expand Up @@ -263,6 +264,57 @@ public void testSetAutoCommitFalseFailure() throws InitializationException, SQLE
}

@Test
public void testProcessExceptionRouteRetry() throws InitializationException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1.getTestCase());

// This exception should route to REL_RETRY because its cause is SQLTransientException
dbcp = new DBCPServiceThrowConnectionException(new SQLTransientException("connection failed"));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);

final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");

runner.enqueue(new byte[0]);
runner.run();

runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_RETRY);
}

@Test
public void testProcessExceptionRouteFailure() throws InitializationException, SQLException {
setRunner(TestCaseEnum.DEFAULT_1.getTestCase());

// This exception should route to REL_FAILURE because its cause is NOT SQLTransientException
dbcp = new DBCPServiceThrowConnectionException(new NullPointerException("connection is null"));
final Map<String, String> dbcpProperties = new HashMap<>();
runner = TestRunners.newTestRunner(processor);
runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties);
runner.enableControllerService(dbcp);
runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID);

final MockRecordParser parser = new MockRecordParser();
runner.addControllerService("parser", parser);
runner.enableControllerService(parser);

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser");
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE);
runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS");

runner.enqueue(new byte[0]);
runner.run();

runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE);
}

public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException {
setRunner(TestCaseEnum.DEFAULT_2.getTestCase());

Expand Down Expand Up @@ -2335,6 +2387,24 @@ SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName
}
}

static class DBCPServiceThrowConnectionException extends AbstractControllerService implements DBCPService {
private final Exception rootCause;

public DBCPServiceThrowConnectionException(final Exception rootCause) {
this.rootCause = rootCause;
}

@Override
public String getIdentifier() {
return DBCP_SERVICE_ID;
}

@Override
public Connection getConnection() throws ProcessException {
throw new ProcessException(rootCause);
}
}

static class DBCPServiceAutoCommitTest extends AbstractControllerService implements DBCPService {
private final String databaseLocation;

Expand Down

0 comments on commit bc95799

Please sign in to comment.