Skip to content

Commit

Permalink
Fix Expression Parsing (#14753)
Browse files Browse the repository at this point in the history
  • Loading branch information
mohityadav766 authored Jan 17, 2024
1 parent ddf7a0d commit 7dcdc1a
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -273,37 +273,42 @@ public List<ChangeEvent> pollEvents(long offset, long batchSize) {
@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// Must Have , Before Execute the Init, Quartz Requires a Non-Arg Constructor
this.init(jobExecutionContext);

// Poll Events from Change Event Table
List<ChangeEvent> batch = pollEvents(offset, 100);
int batchSize = batch.size();

// Retry Failed Events
Set<FailedEvent> failedEventsList =
JsonUtils.convertValue(
jobDetail.getJobDataMap().get(FAILED_EVENT_EXTENSION), new TypeReference<>() {});
if (failedEventsList != null) {
List<ChangeEvent> failedChangeEvents =
failedEventsList.stream()
.filter(failedEvent -> failedEvent.getRetriesLeft() > 0)
.map(FailedEvent::getChangeEvent)
.toList();
batch.addAll(failedChangeEvents);
}
try {

if (!batch.isEmpty()) {
// Publish Events
alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + batch.size());
publishEvents(batch);
this.init(jobExecutionContext);

// Commit the Offset
offset += batchSize;
commit(jobExecutionContext);
}
// Poll Events from Change Event Table
List<ChangeEvent> batch = pollEvents(offset, 100);
int batchSize = batch.size();

// Retry Failed Events
Set<FailedEvent> failedEventsList =
JsonUtils.convertValue(
jobDetail.getJobDataMap().get(FAILED_EVENT_EXTENSION), new TypeReference<>() {});
if (failedEventsList != null) {
List<ChangeEvent> failedChangeEvents =
failedEventsList.stream()
.filter(failedEvent -> failedEvent.getRetriesLeft() > 0)
.map(FailedEvent::getChangeEvent)
.toList();
batch.addAll(failedChangeEvents);
}

if (!batch.isEmpty()) {
// Publish Events
alertMetrics.withTotalEvents(alertMetrics.getTotalEvents() + batch.size());
publishEvents(batch);

// Call stop to close the client
this.stop();
// Commit the Offset
offset += batchSize;
commit(jobExecutionContext);
}
} catch (Exception e) {
LOG.error("Error in executing the Job : {} ", e.getMessage());
} finally {
// Call stop to close the client
this.stop();
}
}

public EventSubscription getEventSubscription() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,10 +183,10 @@ const AddAlertPage = () => {

const requestFilteringRules = filteringRules?.rules?.map((curr) => ({
...curr,
condition: `${curr.name}(${map(
condition: `${curr.name}({${map(
curr.condition,
(v: string) => `'${v}'`
)?.join(', ')})`,
)?.join(', ')}})`,
}));

try {
Expand Down

0 comments on commit 7dcdc1a

Please sign in to comment.