Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove LMAX Disruptor , use queue and Quartz #14422

Merged
merged 39 commits into from
Jan 8, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
f157a29
Remove LMAX Disruptor , use queue and Quartz
mohityadav766 Dec 18, 2023
093dfd7
Merge branch 'main' into addQueue
mohityadav766 Dec 19, 2023
72a8a5f
Spotless Fix
mohityadav766 Dec 19, 2023
06c9b30
Merge branch 'main' into addQueue
mohityadav766 Dec 27, 2023
22cefe0
Spotless formatting
mohityadav766 Dec 27, 2023
edcb5ad
Review Comments
mohityadav766 Dec 31, 2023
0a279bb
Improvements
mohityadav766 Jan 1, 2024
c028984
Improvements to add Retry and DLQ
mohityadav766 Jan 1, 2024
76b9076
Spotless fix
mohityadav766 Jan 1, 2024
5e50d95
Merge branch 'main' into addQueue
mohityadav766 Jan 1, 2024
74d11ea
Update Stats
mohityadav766 Jan 2, 2024
a44cfbd
Fix stats
mohityadav766 Jan 2, 2024
e7ae4c2
Fix delete Failures
mohityadav766 Jan 2, 2024
baf3281
Fix Failing Tests
mohityadav766 Jan 2, 2024
1573165
Merge branch 'main' into addQueue
mohityadav766 Jan 3, 2024
a1af715
Fix Failing Tests
mohityadav766 Jan 3, 2024
7cc2b86
Fix Failing Tests
mohityadav766 Jan 3, 2024
40f8fb2
Merge branch 'main' into addQueue
mohityadav766 Jan 3, 2024
22b8973
Postgres Fix
mohityadav766 Jan 3, 2024
18e7c59
Fix Schema Change for Change Event Table
mohityadav766 Jan 3, 2024
d2a4186
Failing test
mohityadav766 Jan 3, 2024
0c212e1
Merge branch 'main' into addQueue
mohityadav766 Jan 3, 2024
5f3f15f
Fix Error for non ChangeDescriptions
mohityadav766 Jan 3, 2024
d3543c9
Adds Observability On Top of Event Subscription
mohityadav766 Jan 6, 2024
1f8e513
Merge branch 'main' into addQueue
mohityadav766 Jan 6, 2024
2b74ab8
Make List Separate
mohityadav766 Jan 6, 2024
950df98
Add more filters for Test Suite
mohityadav766 Jan 7, 2024
f9f09bf
Populate Default Schema Updates to Look for
mohityadav766 Jan 7, 2024
def4bd5
typo
mohityadav766 Jan 7, 2024
dc81ce6
Merge branch 'main' into addQueue
mohityadav766 Jan 7, 2024
1250c87
Add matchFieldChange and other in paramAdditionalContext
mohityadav766 Jan 7, 2024
82df4d4
Merge branch 'main' into addQueue
mohityadav766 Jan 8, 2024
54f9ede
Review Comments
mohityadav766 Jan 8, 2024
abe9b08
Spotless fix
mohityadav766 Jan 8, 2024
65068ef
Merge branch 'main' into addQueue
mohityadav766 Jan 8, 2024
7c56334
Merge issue
mohityadav766 Jan 8, 2024
c6fab7f
Update Filter as per new Syntax
mohityadav766 Jan 8, 2024
8797770
Fix Failing Test
mohityadav766 Jan 8, 2024
e07f4d6
Merge branch 'main' into addQueue
mohityadav766 Jan 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/mysql/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,24 @@ set json = JSON_INSERT(
)
where name = 'DataInsightsApplication';

-- Update Change Event Table
ALTER TABLE change_event ADD COLUMN offset INT AUTO_INCREMENT PRIMARY KEY;

-- Add new table for event subscription extensions
CREATE TABLE IF NOT EXISTS event_subscription_extension (
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
id VARCHAR(36) NOT NULL,
extension VARCHAR(256) NOT NULL,
jsonSchema VARCHAR(256) NOT NULL,
json JSON NOT NULL,
timestamp BIGINT UNSIGNED GENERATED ALWAYS AS (json ->> '$.timestamp') NOT NULL,
UNIQUE(id, extension)
);

DELETE FROM event_subscription_entity ese where name = 'DataInsightReport';

ALTER TABLE event_subscription_extension ADD COLUMN offset INT AUTO_INCREMENT;


-- Rename NOOP Secret Manager to DB
update metadata_service_entity
set json = JSON_REPLACE(json, '$.connection.config.secretsManagerProvider', 'db')
Expand Down
17 changes: 17 additions & 0 deletions bootstrap/sql/migrations/native/1.3.0/postgres/schemaChanges.sql
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ SET json = jsonb_set(
)
where name = 'DataInsightsApplication';

-- Update Change Event Table
ALTER TABLE change_event ADD COLUMN offset SERIAL PRIMARY KEY;

-- Add new table for event subscription extensions
CREATE TABLE IF NOT EXISTS event_subscription_extension (
id VARCHAR(36) NOT NULL,
extension VARCHAR(256) NOT NULL,
jsonSchema VARCHAR(256) NOT NULL,
json jsonb NOT NULL,
timestamp BIGINT GENERATED ALWAYS AS ((json ->> 'timestamp')::bigint) STORED NOT NULL,
UNIQUE(id, extension)
);

DELETE FROM event_subscription_entity ese where name = 'DataInsightReport';

ALTER TABLE event_subscription_extension ADD COLUMN offset SERIAL;

-- Rename NOOP Secret Manager to DB
update metadata_service_entity
set json = jsonb_set(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,8 @@
import org.openmetadata.service.config.OMWebConfiguration;
import org.openmetadata.service.events.EventFilter;
import org.openmetadata.service.events.EventPubSub;
import org.openmetadata.service.events.scheduled.EventSubscriptionScheduler;
import org.openmetadata.service.events.scheduled.PipelineServiceStatusJobHandler;
import org.openmetadata.service.events.scheduled.ReportsHandler;
import org.openmetadata.service.exception.CatalogGenericExceptionMapper;
import org.openmetadata.service.exception.ConstraintViolationExceptionMapper;
import org.openmetadata.service.exception.JsonMappingExceptionMapper;
Expand Down Expand Up @@ -518,8 +518,8 @@ public void stop() throws InterruptedException, SchedulerException {
LOG.info("Cache with Id Stats {}", EntityRepository.CACHE_WITH_ID.stats());
LOG.info("Cache with name Stats {}", EntityRepository.CACHE_WITH_NAME.stats());
EventPubSub.shutdown();
ReportsHandler.shutDown();
AppScheduler.shutDown();
EventSubscriptionScheduler.shutDown();
LOG.info("Stopping the application");
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openmetadata.service.apps.bundles.changeEvent;

import static org.openmetadata.schema.entity.events.SubscriptionStatus.Status.ACTIVE;
import static org.openmetadata.schema.entity.events.SubscriptionStatus.Status.AWAITING_RETRY;
import static org.openmetadata.schema.entity.events.SubscriptionStatus.Status.FAILED;

import java.util.ArrayList;
import java.util.List;
import lombok.Getter;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.EventSubscriptionOffset;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.events.errors.RetriableException;
import org.openmetadata.service.events.subscription.AlertUtil;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.DisallowConcurrentExecution;
import org.quartz.Job;
import org.quartz.JobDetail;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;

@Slf4j
@DisallowConcurrentExecution
public abstract class AbstractEventConsumer implements Consumer<ChangeEvent>, Job {
public static final String ALERT_OFFSET_KEY = "alertOffsetKey";
public static final String ALERT_INFO_KEY = "alertInfoKey";
private static final String OFFSET_EXTENSION = "eventSubscription.Offset";
protected static final int BACKOFF_NORMAL = 0;
protected static final int BACKOFF_3_SECONDS = 3 * 1000;
protected static final int BACKOFF_30_SECONDS = 30 * 1000;
protected static final int BACKOFF_5_MINUTES = 5 * 60 * 1000;
protected static final int BACKOFF_1_HOUR = 60 * 60 * 1000;
protected static final int BACKOFF_24_HOUR = 24 * 60 * 60 * 1000;

@Getter protected int currentBackoffTime = BACKOFF_NORMAL;
private int offset = -1;
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved

@Getter @Setter private JobDetail jobDetail;
protected EventSubscription eventSubscription;

protected AbstractEventConsumer() {}

private void init(JobExecutionContext context) {
EventSubscription sub = (EventSubscription) context.getJobDetail().getJobDataMap().get(ALERT_INFO_KEY);
this.jobDetail = context.getJobDetail();
this.eventSubscription = sub;
this.offset = loadInitialOffset();
this.doInit(context);
}

protected abstract void doInit(JobExecutionContext context);

protected void sendAlert(List<ChangeEvent> list) {
/* This method needs to be over-ridden by specific Publisher for sending Alert */

}

@Override
public void handleFailedEvents(List<ChangeEvent> failedEvents) {}

@Override
public void handleException(Exception e) {}

private int loadInitialOffset() {
int eventSubscriptionOffset;
String json =
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.getSubscriberOffset(eventSubscription.getId().toString(), OFFSET_EXTENSION);
if (json != null) {
EventSubscriptionOffset offsetFromDb = JsonUtils.readValue(json, EventSubscriptionOffset.class);
eventSubscriptionOffset = offsetFromDb.getOffset();
} else {
eventSubscriptionOffset = Entity.getCollectionDAO().changeEventDAO().listCount();
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
}
// Update the Job Data Map with the latest offset
return eventSubscriptionOffset;
}

@Override
public boolean publishEvents(List<ChangeEvent> events) throws InterruptedException {
// Publish to the given Alert Actions
// Evaluate Alert Trigger Config

// Filter the Change Events based on Alert Trigger Config
List<ChangeEvent> filteredEvents = new ArrayList<>();
for (ChangeEvent event : events) {
boolean triggerChangeEvent =
AlertUtil.shouldTriggerAlert(event.getEntityType(), eventSubscription.getFilteringRules());

// Evaluate ChangeEvent Alert Filtering
if (eventSubscription.getFilteringRules() != null
&& !AlertUtil.evaluateAlertConditions(event, eventSubscription.getFilteringRules().getRules())) {
triggerChangeEvent = false;
}

if (triggerChangeEvent) {
// Ignore the event since change description is null
if (event.getChangeDescription() != null) {
filteredEvents.add(event);
} else {
LOG.info("Email Publisher Event Will be Ignored Since Change Description is null. Received Event: {}", event);
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
}
}
}

try {
sendAlert(filteredEvents);
return true;
} catch (RetriableException ex) {
setNextBackOff();
LOG.error(
"Failed to publish event in batch {} due to {}, will try again in {} ms",
filteredEvents,
ex,
currentBackoffTime);
Thread.sleep(currentBackoffTime);
} catch (Exception e) {
LOG.error("[AbstractAlertPublisher] error {}", e.getMessage(), e);
}
return false;
}

@Override
public void commitOffset(JobExecutionContext jobExecutionContext, int offset) {
EventSubscriptionOffset eventSubscriptionOffset =
new EventSubscriptionOffset().withOffset(offset).withTimestamp(System.currentTimeMillis());
Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertSubscriberOffset(
eventSubscription.getId().toString(),
OFFSET_EXTENSION,
"eventSubscriptionOffset",
JsonUtils.pojoToJson(eventSubscriptionOffset));

// Update the Job Data Map with the latest offset
jobExecutionContext.getJobDetail().getJobDataMap().put(ALERT_OFFSET_KEY, offset);
}

public synchronized void setErrorStatus(Long attemptTime, Integer statusCode, String reason) {
setStatus(FAILED, attemptTime, statusCode, reason, null);
}

public synchronized void setAwaitingRetry(Long attemptTime, int statusCode, String reason) {
setStatus(AWAITING_RETRY, attemptTime, statusCode, reason, attemptTime + currentBackoffTime);
}

public synchronized void setSuccessStatus(Long updateTime) {
SubscriptionStatus subStatus =
AlertUtil.buildSubscriptionStatus(ACTIVE, updateTime, null, null, null, updateTime, updateTime);
eventSubscription.setStatusDetails(subStatus);
}

protected synchronized void setStatus(
SubscriptionStatus.Status status, Long attemptTime, Integer statusCode, String reason, Long timestamp) {
SubscriptionStatus subStatus =
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
AlertUtil.buildSubscriptionStatus(status, null, attemptTime, statusCode, reason, timestamp, attemptTime);
eventSubscription.setStatusDetails(subStatus);
}

@Override
public List<ChangeEvent> pollEvents(long offset, long batchSize) {
// Read from Change Event Table
List<String> eventJson = Entity.getCollectionDAO().changeEventDAO().list(batchSize, offset);

List<ChangeEvent> changeEvents = new ArrayList<>();
for (String json : eventJson) {
ChangeEvent event = JsonUtils.readValue(json, ChangeEvent.class);
changeEvents.add(event);
}
return changeEvents;
}

@Override
public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException {
// Must Have , Before Execute the Init, Quartz Requires a Non-Arg Constructor
this.init(jobExecutionContext);

try {
List<ChangeEvent> batch = pollEvents(offset, 100);
if (!batch.isEmpty()) {
boolean success = publishEvents(batch);
if (success) {
offset += batch.size();
} else {
handleFailedEvents(batch);
}
}
} catch (InterruptedException e) {
LOG.error("Interrupted while polling events", e);
Thread.currentThread().interrupt();
} catch (Exception e) {
handleException(e);
} finally {
LOG.debug("Committing offset for eventSubscription {} {}", eventSubscription.getName(), offset);
commitOffset(jobExecutionContext, offset);
mohityadav766 marked this conversation as resolved.
Show resolved Hide resolved
}
}

public void setNextBackOff() {
if (currentBackoffTime == BACKOFF_NORMAL) {
currentBackoffTime = BACKOFF_3_SECONDS;
} else if (currentBackoffTime == BACKOFF_3_SECONDS) {
currentBackoffTime = BACKOFF_30_SECONDS;
} else if (currentBackoffTime == BACKOFF_30_SECONDS) {
currentBackoffTime = BACKOFF_5_MINUTES;
} else if (currentBackoffTime == BACKOFF_5_MINUTES) {
currentBackoffTime = BACKOFF_1_HOUR;
} else if (currentBackoffTime == BACKOFF_1_HOUR) {
currentBackoffTime = BACKOFF_24_HOUR;
}
}

public EventSubscription getEventSubscription() {
return (EventSubscription) jobDetail.getJobDataMap().get(ALERT_INFO_KEY);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.openmetadata.service.apps.bundles.changeEvent;

import io.dropwizard.lifecycle.Managed;
import java.util.List;
import org.quartz.JobExecutionContext;

public interface Consumer<T> extends Managed {
List<T> pollEvents(long offset, long batchSize);

boolean publishEvents(List<T> events) throws InterruptedException;

void handleFailedEvents(List<T> failedEvents);

void handleException(Exception e);

void commitOffset(JobExecutionContext jobExecutionContext, int offset);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package org.openmetadata.service.apps.bundles.changeEvent;

public class EventAlertProducer {}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package org.openmetadata.service.events.subscription.email;
package org.openmetadata.service.apps.bundles.changeEvent.email;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.util.List;
Expand Down
Loading
Loading