From bc95799a397eaf1fda6a0db4927724b85bd2a496 Mon Sep 17 00:00:00 2001 From: Jim Steinebrey Date: Thu, 13 Jun 2024 12:27:17 -0400 Subject: [PATCH] NIFI-13397 Updated PutDatabaseRecord to retry transient ProcessException causes This closes #8964 Signed-off-by: David Handermann --- .../standard/PutDatabaseRecord.java | 2 +- .../standard/PutDatabaseRecordTest.java | 70 +++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java index e0dbf5b38be2..3210c609a421 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/PutDatabaseRecord.java @@ -585,7 +585,7 @@ private void routeOnException(final ProcessContext context, final ProcessSession // When an Exception is thrown, we want to route to 'retry' if we expect that attempting the same request again // might work. Otherwise, route to failure. SQLTransientException is a specific type that indicates that a retry may work. final Relationship relationship; - final Throwable toAnalyze = (e instanceof BatchUpdateException) ? e.getCause() : e; + final Throwable toAnalyze = (e instanceof BatchUpdateException || e instanceof ProcessException) ? e.getCause() : e; if (toAnalyze instanceof SQLTransientException) { relationship = REL_RETRY; flowFile = session.penalize(flowFile); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java index f2678bb3aeed..dd004e7db7f5 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/PutDatabaseRecordTest.java @@ -58,6 +58,7 @@ import java.sql.SQLDataException; import java.sql.SQLException; import java.sql.SQLFeatureNotSupportedException; +import java.sql.SQLTransientException; import java.sql.Statement; import java.time.LocalDate; import java.time.ZoneOffset; @@ -263,6 +264,57 @@ public void testSetAutoCommitFalseFailure() throws InitializationException, SQLE } @Test + public void testProcessExceptionRouteRetry() throws InitializationException, SQLException { + setRunner(TestCaseEnum.DEFAULT_1.getTestCase()); + + // This exception should route to REL_RETRY because its cause is SQLTransientException + dbcp = new DBCPServiceThrowConnectionException(new SQLTransientException("connection failed")); + final Map dbcpProperties = new HashMap<>(); + runner = TestRunners.newTestRunner(processor); + runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); + + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_RETRY); + } + + @Test + public void testProcessExceptionRouteFailure() throws InitializationException, SQLException { + setRunner(TestCaseEnum.DEFAULT_1.getTestCase()); + + // This exception should route to REL_FAILURE because its cause is NOT SQLTransientException + dbcp = new DBCPServiceThrowConnectionException(new NullPointerException("connection is null")); + final Map dbcpProperties = new HashMap<>(); + runner = TestRunners.newTestRunner(processor); + runner.addControllerService(DBCP_SERVICE_ID, dbcp, dbcpProperties); + runner.enableControllerService(dbcp); + runner.setProperty(PutDatabaseRecord.DBCP_SERVICE, DBCP_SERVICE_ID); + + final MockRecordParser parser = new MockRecordParser(); + runner.addControllerService("parser", parser); + runner.enableControllerService(parser); + + runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, "parser"); + runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.INSERT_TYPE); + runner.setProperty(PutDatabaseRecord.TABLE_NAME, "PERSONS"); + + runner.enqueue(new byte[0]); + runner.run(); + + runner.assertAllFlowFilesTransferred(PutDatabaseRecord.REL_FAILURE); + } + public void testInsertNonRequiredColumnsUnmatchedField() throws InitializationException, ProcessException { setRunner(TestCaseEnum.DEFAULT_2.getTestCase()); @@ -2335,6 +2387,24 @@ SqlAndIncludedColumns generateInsert(RecordSchema recordSchema, String tableName } } + static class DBCPServiceThrowConnectionException extends AbstractControllerService implements DBCPService { + private final Exception rootCause; + + public DBCPServiceThrowConnectionException(final Exception rootCause) { + this.rootCause = rootCause; + } + + @Override + public String getIdentifier() { + return DBCP_SERVICE_ID; + } + + @Override + public Connection getConnection() throws ProcessException { + throw new ProcessException(rootCause); + } + } + static class DBCPServiceAutoCommitTest extends AbstractControllerService implements DBCPService { private final String databaseLocation;