Skip to content

Commit

Permalink
create EventSubscriptionDiagnosticInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
Siddhanttimeline committed Oct 10, 2024
1 parent 4233abf commit bd2d6a5
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,11 @@ public abstract class AbstractEventConsumer
public static final String ALERT_INFO_KEY = "alertInfoKey";
public static final String OFFSET_EXTENSION = "eventSubscription.Offset";
public static final String METRICS_EXTENSION = "eventSubscription.metrics";
public static final String FAILED_EVENT_EXTENSION = "eventSubscription.failedEvent";
public static final String FAILED_EVENT_EXTENSION_PUBLISHER =
"eventSubscription.failedEvent.publisher";
public static final String FAILED_EVENT_EXTENSION_SUBSCRIBER =
"eventSubscription.failedEvent.subscriber";

private long offset = -1;
private AlertMetrics alertMetrics;

Expand All @@ -78,7 +82,7 @@ protected void doInit(JobExecutionContext context) {
}

@Override
public void handleFailedEvent(EventPublisherException ex) {
public void handleFailedEvent(EventPublisherException ex, boolean errorOnSub) {
UUID failingSubscriptionId = ex.getChangeEventWithSubscription().getLeft();
ChangeEvent changeEvent = ex.getChangeEventWithSubscription().getRight();
LOG.debug(
Expand All @@ -87,11 +91,14 @@ public void handleFailedEvent(EventPublisherException ex) {
failingSubscriptionId,
changeEvent);

String extension =
errorOnSub ? FAILED_EVENT_EXTENSION_SUBSCRIBER : FAILED_EVENT_EXTENSION_PUBLISHER;

Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertFailedEvent(
eventSubscription.getId().toString(),
String.format("%s-%s", FAILED_EVENT_EXTENSION, changeEvent.getId()),
String.format("%s-%s", extension, changeEvent.getId()),
JsonUtils.pojoToJson(
new FailedEvent()
.withFailingSubscriptionId(failingSubscriptionId)
Expand Down Expand Up @@ -164,7 +171,7 @@ public void publishEvents(Map<ChangeEvent, Set<UUID>> events) {
alertMetrics.withSuccessEvents(alertMetrics.getSuccessEvents() + 1);
} catch (EventPublisherException e) {
alertMetrics.withFailedEvents(alertMetrics.getFailedEvents() + 1);
handleFailedEvent(e);
handleFailedEvent(e, false);
}
}
}
Expand All @@ -176,13 +183,15 @@ public void commit(JobExecutionContext jobExecutionContext) {
// Upsert Offset
EventSubscriptionOffset eventSubscriptionOffset =
new EventSubscriptionOffset().withOffset(offset).withTimestamp(currentTime);

Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertSubscriberExtension(
eventSubscription.getId().toString(),
OFFSET_EXTENSION,
"eventSubscriptionOffset",
JsonUtils.pojoToJson(eventSubscriptionOffset));

jobExecutionContext
.getJobDetail()
.getJobDataMap()
Expand All @@ -195,13 +204,15 @@ public void commit(JobExecutionContext jobExecutionContext) {
.withFailedEvents(alertMetrics.getFailedEvents())
.withSuccessEvents(alertMetrics.getSuccessEvents())
.withTimestamp(currentTime);

Entity.getCollectionDAO()
.eventSubscriptionDAO()
.upsertSubscriberExtension(
eventSubscription.getId().toString(),
METRICS_EXTENSION,
"alertMetrics",
JsonUtils.pojoToJson(metrics));

jobExecutionContext.getJobDetail().getJobDataMap().put(METRICS_EXTENSION, alertMetrics);

// Populate the Destination map
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@ public void sendAlert(UUID receiverId, ChangeEvent event) throws EventPublisherE
if (destinationMap.containsKey(receiverId)) {
Destination<ChangeEvent> destination = destinationMap.get(receiverId);
if (Boolean.TRUE.equals(destination.getEnabled())) {
destination.sendMessage(event);
try {
destination.sendMessage(event);
} catch (EventPublisherException ex) {
handleFailedEvent(ex, true);
}
} else {
LOG.debug(
"Event Subscription:{} Skipping sending message since, disabled subscription with Id: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ public interface Consumer<T> {

void publishEvents(Map<ChangeEvent, Set<UUID>> events);

void handleFailedEvent(EventPublisherException e);
void handleFailedEvent(EventPublisherException e, boolean errorOnSub);

void commit(JobExecutionContext jobExecutionContext);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,25 @@
import static org.openmetadata.service.apps.bundles.changeEvent.AbstractEventConsumer.ALERT_OFFSET_KEY;
import static org.openmetadata.service.events.subscription.AlertUtil.getStartingOffset;

import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.sqlobject.transaction.Transaction;
import org.openmetadata.schema.EntityInterface;
import org.openmetadata.schema.api.events.EventSubscriptionDiagnosticInfo;
import org.openmetadata.schema.entity.events.EventSubscription;
import org.openmetadata.schema.entity.events.EventSubscriptionOffset;
import org.openmetadata.schema.entity.events.SubscriptionDestination;
import org.openmetadata.schema.entity.events.SubscriptionStatus;
import org.openmetadata.schema.type.ChangeEvent;
import org.openmetadata.service.Entity;
import org.openmetadata.service.apps.bundles.changeEvent.AlertPublisher;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.util.JsonUtils;
import org.quartz.JobBuilder;
import org.quartz.JobDataMap;
import org.quartz.JobDetail;
Expand Down Expand Up @@ -156,62 +162,144 @@ public void deleteEventSubscriptionPublisher(EventSubscription deletedEntity)
}

public SubscriptionStatus getStatusForEventSubscription(UUID subscriptionId, UUID destinationId) {
EventSubscription eventSubscription = getEventSubscriptionFromScheduledJob(subscriptionId);
if (eventSubscription == null) {
EntityRepository<? extends EntityInterface> subscriptionRepository =
Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION);
EventSubscription subscription =
(EventSubscription)
subscriptionRepository.get(
null, subscriptionId, subscriptionRepository.getFields("id"));
if (subscription != null && (Boolean.FALSE.equals(subscription.getEnabled()))) {
return new SubscriptionStatus().withStatus(SubscriptionStatus.Status.DISABLED);
}
} else {
List<SubscriptionDestination> subscriptions =
eventSubscription.getDestinations().stream()
.filter(sub -> sub.getId().equals(destinationId))
.toList();
if (subscriptions.size() == 1) {
// We have unique Ids per destination
return subscriptions.get(0).getStatusDetails();
}
Optional<EventSubscription> eventSubscriptionOpt =
getEventSubscriptionFromScheduledJob(subscriptionId);

if (eventSubscriptionOpt.isPresent()) {
return eventSubscriptionOpt.get().getDestinations().stream()
.filter(destination -> destination.getId().equals(destinationId))
.map(SubscriptionDestination::getStatusDetails)
.findFirst()
.orElse(null);
}

EntityRepository<? extends EntityInterface> subscriptionRepository =
Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION);

// If the event subscription was not found in the scheduled job, check the repository
Optional<EventSubscription> subscriptionOpt =
Optional.ofNullable(
(EventSubscription)
subscriptionRepository.get(
null, subscriptionId, subscriptionRepository.getFields("id")));

return subscriptionOpt
.filter(subscription -> Boolean.FALSE.equals(subscription.getEnabled()))
.map(
subscription -> new SubscriptionStatus().withStatus(SubscriptionStatus.Status.DISABLED))
.orElse(null);
}

public List<SubscriptionDestination> listAlertDestinations(UUID subscriptionId) {
Optional<EventSubscription> eventSubscriptionOpt =
getEventSubscriptionFromScheduledJob(subscriptionId);

// If the EventSubscription is not found in the scheduled job, retrieve it from the repository
EventSubscription eventSubscription =
eventSubscriptionOpt.orElseGet(
() -> {
EntityRepository<? extends EntityInterface> subscriptionRepository =
Entity.getEntityRepository(Entity.EVENT_SUBSCRIPTION);

return (EventSubscription)
subscriptionRepository.get(
null,
subscriptionId,
subscriptionRepository.getFields("id,destinations,enabled"));
});

if (eventSubscription != null && Boolean.FALSE.equals(eventSubscription.getEnabled())) {
return Collections.emptyList();
}
return null;

return eventSubscription.getDestinations();
}

public EventSubscriptionDiagnosticInfo getEventSubscriptionDiagnosticInfo(UUID subscriptionId) {
boolean isAllEventsPublished = checkIfPublisherPublishedAllEvents(subscriptionId);
EventSubscriptionOffset latestOffset = getLatestOffset();
long currentOffset =
getEventSubscriptionOffset(subscriptionId)
.map(EventSubscriptionOffset::getOffset)
.orElse(
latestOffset.getOffset()); // Fallback to latest offset if current offset not found
long unpublishedEventCount = getUnpublishedEventCount(subscriptionId);
List<ChangeEvent> unprocessedEvents =
Optional.ofNullable(getUnpublishedEvents(subscriptionId)).orElse(Collections.emptyList());

return new EventSubscriptionDiagnosticInfo()
.withCurrentOffset(currentOffset)
.withLatestOffset(latestOffset.getOffset())
.withHasProcessedAllEvents(isAllEventsPublished)
.withUnprocessedEventsCount(unpublishedEventCount)
.withUnprocessedEventsList(unprocessedEvents);
}

public EventSubscription getEventSubscriptionFromScheduledJob(UUID id) {
public static EventSubscriptionOffset getLatestOffset() {
return new EventSubscriptionOffset()
.withOffset(Entity.getCollectionDAO().changeEventDAO().getLatestOffset());
}

public boolean checkIfPublisherPublishedAllEvents(UUID subscriptionID) {
long countOfEvents = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();

return getEventSubscriptionOffset(subscriptionID)
.map(offset -> offset.getOffset() == countOfEvents)
.orElse(false);
}

public long getUnpublishedEventCount(UUID subscriptionID) {
long countOfEvents = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();

return getEventSubscriptionOffset(subscriptionID)
.map(offset -> Math.abs(countOfEvents - offset.getOffset()))
.orElse(countOfEvents);
}

public List<ChangeEvent> getUnpublishedEvents(UUID subscriptionId) {
long offset =
getEventSubscriptionOffset(subscriptionId)
.map(EventSubscriptionOffset::getOffset)
.orElse(Entity.getCollectionDAO().changeEventDAO().getLatestOffset());

List<String> unprocessedEventJsonList =
Entity.getCollectionDAO().changeEventDAO().listUnprocessedEvents(offset);

return unprocessedEventJsonList.stream()
.map(eventJson -> JsonUtils.readValue(eventJson, ChangeEvent.class))
.collect(Collectors.toList());
}

public Optional<EventSubscription> getEventSubscriptionFromScheduledJob(UUID id) {
try {
JobDetail jobDetail =
alertsScheduler.getJobDetail(new JobKey(id.toString(), ALERT_JOB_GROUP));
if (jobDetail != null) {
return ((EventSubscription) jobDetail.getJobDataMap().get(ALERT_INFO_KEY));
}

return Optional.ofNullable(jobDetail)
.map(detail -> (EventSubscription) detail.getJobDataMap().get(ALERT_INFO_KEY));

} catch (SchedulerException ex) {
LOG.error("Failed to get Event Subscription from Job, Subscription Id : {}", id);
LOG.error("Failed to get Event Subscription from Job, Subscription Id : {}", id, ex);
}
return null;

return Optional.empty();
}

public boolean checkIfPublisherPublishedAllEvents(UUID subscriptionID) {
long countOfEvents = Entity.getCollectionDAO().changeEventDAO().getLatestOffset();
public Optional<EventSubscriptionOffset> getEventSubscriptionOffset(UUID subscriptionID) {
try {
JobDetail jobDetail =
alertsScheduler.getJobDetail(new JobKey(subscriptionID.toString(), ALERT_JOB_GROUP));
if (jobDetail != null) {
EventSubscriptionOffset offset =
((EventSubscriptionOffset) jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY));
if (offset != null) {
return offset.getOffset() == countOfEvents;
}
return Optional.ofNullable(
(EventSubscriptionOffset) jobDetail.getJobDataMap().get(ALERT_OFFSET_KEY));
}
} catch (Exception ex) {
LOG.error(
"Failed to get Event Subscription from Job, Subscription Id : {}, Exception: ",
subscriptionID.toString(),
ex);
}
return false;
return Optional.empty();
}

public static void shutDown() throws SchedulerException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3769,6 +3769,10 @@ default User findEntityByName(String fqn, Include include) {
}

interface ChangeEventDAO {
@SqlQuery(
"SELECT json FROM change_event ce where ce.offset > :offset ORDER BY ce.eventTime ASC")
List<String> listUnprocessedEvents(@Bind("offset") long offset);

@ConnectionAwareSqlUpdate(
value = "INSERT INTO change_event (json) VALUES (:json)",
connectionType = MYSQL)
Expand Down
Loading

0 comments on commit bd2d6a5

Please sign in to comment.