Skip to content

Commit

Permalink
NIFI-13869: Enhance QuerySalesforceObject Processor to Support Queryi…
Browse files Browse the repository at this point in the history
…ng Deleted Records

- Added a new boolean property `Include Deleted Records` to allow users to include deleted (soft-deleted) records in Salesforce queries.
- Modified the query construction to include the `IsDeleted` field and use the `queryAll` API endpoint when `Include Deleted Records` is enabled.
- Updated `SalesforceRestClient` to support the `queryAll` API by adding a new `queryAll` method.
- Ensured backward compatibility by defaulting `Include Deleted Records` to `false`, maintaining existing behavior unless explicitly enabled.
- Updated processor documentation and annotations to reflect the new functionality.
- Adjusted state management to reset state when `Include Deleted Records` property is modified.

Signed-off-by: Pierre Villard <pierre.villard.fr@gmail.com>

This closes apache#9434.
  • Loading branch information
Nicolae93 authored and pvillard31 committed Oct 22, 2024
1 parent a44fb52 commit 3521905
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,9 @@
+ " It's also possible to define an initial cutoff value for the age, filtering out all older records"
+ " even for the first run. In case of 'Property Based Query' this processor should run on the Primary Node only."
+ " FlowFile attribute 'record.count' indicates how many records were retrieved and written to the output."
+ " The processor can accept an optional input FlowFile and reference the FlowFile attributes in the query.")
+ " The processor can accept an optional input FlowFile and reference the FlowFile attributes in the query."
+ " When 'Include Deleted Records' is true, the processor will include deleted records (soft-deletes) in the results by using the 'queryAll' API."
+ " The 'IsDeleted' field will be automatically included in the results when querying deleted records.")
@Stateful(scopes = Scope.CLUSTER, description = "When 'Age Field' is set, after performing a query the time of execution is stored. Subsequent queries will be augmented"
+ " with an additional condition so that only records that are newer than the stored execution time (adjusted with the optional value of 'Age Delay') will be retrieved."
+ " State is stored across the cluster so that this Processor can be run on Primary Node only and if a new Primary Node is selected,"
Expand Down Expand Up @@ -232,6 +234,16 @@ public class QuerySalesforceObject extends AbstractProcessor {
.dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();

static final PropertyDescriptor INCLUDE_DELETED_RECORDS = new PropertyDescriptor.Builder()
.name("include-deleted-records")
.displayName("Include Deleted Records")
.description("If true, the processor will include deleted records (IsDeleted = true) in the query results. When enabled, the processor will use the 'queryAll' API.")
.required(true)
.defaultValue("false")
.allowableValues("true", "false")
.dependsOn(QUERY_TYPE, PROPERTY_BASED_QUERY)
.build();

static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("For FlowFiles created as a result of a successful query.")
Expand Down Expand Up @@ -306,6 +318,7 @@ public void onScheduled(ProcessContext context) {
INITIAL_AGE_FILTER,
AGE_DELAY,
CUSTOM_WHERE_CONDITION,
INCLUDE_DELETED_RECORDS,
READ_TIMEOUT,
CREATE_ZERO_RECORD_FILES,
TOKEN_PROVIDER
Expand Down Expand Up @@ -339,7 +352,8 @@ public void onPropertyModified(PropertyDescriptor descriptor, String oldValue, S
|| descriptor.equals(SOBJECT_NAME)
|| descriptor.equals(AGE_FIELD)
|| descriptor.equals(INITIAL_AGE_FILTER)
|| descriptor.equals(CUSTOM_WHERE_CONDITION))
|| descriptor.equals(CUSTOM_WHERE_CONDITION)
|| descriptor.equals(INCLUDE_DELETED_RECORDS))
) {
getLogger().debug("A property that require resetting state was modified - {} oldValue {} newValue {}",
descriptor.getDisplayName(), oldValue, newValue);
Expand All @@ -365,10 +379,11 @@ private void processQuery(ProcessContext context, ProcessSession session, FlowFi
String customWhereClause = context.getProperty(CUSTOM_WHERE_CONDITION).evaluateAttributeExpressions(originalFlowFile).getValue();
RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
boolean createZeroRecordFlowFiles = context.getProperty(CREATE_ZERO_RECORD_FILES).asBoolean();
boolean includeDeletedRecords = context.getProperty(INCLUDE_DELETED_RECORDS).asBoolean();

StateMap state = getState(session);
IncrementalContext incrementalContext = new IncrementalContext(context, state);
SalesforceSchemaHolder salesForceSchemaHolder = getConvertedSalesforceSchema(sObject, fields);
SalesforceSchemaHolder salesForceSchemaHolder = getConvertedSalesforceSchema(sObject, fields, includeDeletedRecords);

if (StringUtils.isBlank(fields)) {
fields = salesForceSchemaHolder.getSalesforceObject().getFields()
Expand All @@ -377,6 +392,14 @@ private void processQuery(ProcessContext context, ProcessSession session, FlowFi
.collect(Collectors.joining(","));
}

// Add IsDeleted to fields if Include Deleted Records is true
if (includeDeletedRecords) {
List<String> fieldList = Arrays.stream(fields.split("\\s*,\\s*")).collect(Collectors.toList());
if (fieldList.stream().noneMatch(f -> f.equalsIgnoreCase("IsDeleted"))) {
fields = fields + ", IsDeleted";
}
}

String querySObject = new SalesforceQueryBuilder(incrementalContext)
.buildQuery(sObject, fields, customWhereClause);

Expand All @@ -396,7 +419,7 @@ private void processQuery(ProcessContext context, ProcessSession session, FlowFi
AtomicInteger recordCountHolder = new AtomicInteger();
try {
outgoingFlowFile = session.write(outgoingFlowFile, processRecordsCallback(session, nextRecordsUrl, writerFactory, state, incrementalContext,
salesForceSchemaHolder, querySObject, originalAttributes, attributes, recordCountHolder));
salesForceSchemaHolder, querySObject, originalAttributes, attributes, recordCountHolder, includeDeletedRecords));
int recordCount = recordCountHolder.get();

if (createZeroRecordFlowFiles || recordCount != 0) {
Expand Down Expand Up @@ -428,10 +451,10 @@ private void processQuery(ProcessContext context, ProcessSession session, FlowFi
private OutputStreamCallback processRecordsCallback(ProcessSession session, AtomicReference<String> nextRecordsUrl, RecordSetWriterFactory writerFactory,
StateMap state, IncrementalContext incrementalContext, SalesforceSchemaHolder salesForceSchemaHolder,
String querySObject, Map<String, String> originalAttributes, Map<String, String> attributes,
AtomicInteger recordCountHolder) {
AtomicInteger recordCountHolder, boolean includeDeletedRecords) {
return out -> {
try {
handleRecordSet(out, nextRecordsUrl, querySObject, writerFactory, salesForceSchemaHolder, originalAttributes, attributes, recordCountHolder);
handleRecordSet(out, nextRecordsUrl, querySObject, writerFactory, salesForceSchemaHolder, originalAttributes, attributes, recordCountHolder, includeDeletedRecords);

if (incrementalContext.getAgeFilterUpper() != null) {
Map<String, String> newState = new HashMap<>(state.toMap());
Expand All @@ -446,9 +469,9 @@ private OutputStreamCallback processRecordsCallback(ProcessSession session, Atom

private void handleRecordSet(OutputStream out, AtomicReference<String> nextRecordsUrl, String querySObject, RecordSetWriterFactory writerFactory,
SalesforceSchemaHolder salesForceSchemaHolder, Map<String, String> originalAttributes, Map<String, String> attributes,
AtomicInteger recordCountHolder) throws Exception {
AtomicInteger recordCountHolder, boolean includeDeletedRecords) throws Exception {
try (
InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject);
InputStream querySObjectResultInputStream = getResultInputStream(nextRecordsUrl.get(), querySObject, includeDeletedRecords);
JsonTreeRowRecordReader jsonReader = createJsonReader(querySObjectResultInputStream, salesForceSchemaHolder.getRecordSchema());
RecordSetWriter writer = createRecordSetWriter(writerFactory, originalAttributes, out, salesForceSchemaHolder.getRecordSchema())
) {
Expand Down Expand Up @@ -506,8 +529,9 @@ private void processCustomQuery(ProcessContext context, ProcessSession session,
AtomicBoolean isOriginalTransferred = new AtomicBoolean(false);
List<FlowFile> outgoingFlowFiles = new ArrayList<>();
long startNanos = System.nanoTime();
boolean includeDeletedRecords = context.getProperty(INCLUDE_DELETED_RECORDS).asBoolean();
do {
try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery)) {
try (InputStream response = getResultInputStream(nextRecordsUrl.get(), customQuery, includeDeletedRecords)) {
FlowFile outgoingFlowFile = createOutgoingFlowFile(session, originalFlowFile);
outgoingFlowFiles.add(outgoingFlowFile);
outgoingFlowFile = session.write(outgoingFlowFile, parseCustomQueryResponse(response, nextRecordsUrl, totalSize));
Expand Down Expand Up @@ -574,14 +598,18 @@ private boolean nextTokenIs(JsonParser jsonParser, String value) throws IOExcept
.equals(value) && jsonParser.nextToken() != null;
}

private InputStream getResultInputStream(String nextRecordsUrl, String querySObject) {
private InputStream getResultInputStream(String nextRecordsUrl, String querySObject, boolean includeDeletedRecords) {
if (nextRecordsUrl == null) {
return salesforceRestService.query(querySObject);
if (includeDeletedRecords) {
return salesforceRestService.queryAll(querySObject);
} else {
return salesforceRestService.query(querySObject);
}
}
return salesforceRestService.getNextRecords(nextRecordsUrl);
}

private SalesforceSchemaHolder getConvertedSalesforceSchema(String sObject, String fields) {
private SalesforceSchemaHolder getConvertedSalesforceSchema(String sObject, String fields, boolean includeDeletedRecords) {
try (InputStream describeSObjectResult = salesforceRestService.describeSObject(sObject)) {
return convertSchema(describeSObjectResult, fields);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,14 @@ public InputStream query(String query) {
return executeRequest(request);
}

public InputStream queryAll(String query) {
HttpUrl httpUrl = HttpUrl.get(getUrl("/queryAll")).newBuilder()
.addQueryParameter("q", query)
.build();
Request request = buildGetRequest(httpUrl.toString());
return executeRequest(request);
}

public InputStream getNextRecords(String nextRecordsUrl) {
HttpUrl httpUrl = HttpUrl.get(configuration.getInstanceUrl() + nextRecordsUrl).newBuilder().build();
Request request = buildGetRequest(httpUrl.toString());
Expand Down

0 comments on commit 3521905

Please sign in to comment.