diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/ConverterTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/ConverterTest.java index 209f2723c7..e3fc353379 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/ConverterTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/ConverterTest.java @@ -16,18 +16,15 @@ package com.oceanbase.odc.service.notification; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.when; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.Optional; import java.util.stream.Collectors; -import org.apache.commons.compress.utils.Lists; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -36,11 +33,13 @@ import org.springframework.boot.test.mock.mockito.MockBean; import com.oceanbase.odc.ServiceTestEnv; +import com.oceanbase.odc.common.json.JsonUtils; import com.oceanbase.odc.core.shared.constant.TaskType; import com.oceanbase.odc.metadb.notification.ChannelEntity; import com.oceanbase.odc.metadb.notification.ChannelRepository; import com.oceanbase.odc.metadb.notification.EventEntity; import com.oceanbase.odc.metadb.notification.EventRepository; +import com.oceanbase.odc.metadb.notification.NotificationChannelRelationEntity; import com.oceanbase.odc.metadb.notification.NotificationPolicyChannelRelationRepository; import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity; import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository; @@ -95,11 +94,11 @@ public void testConvert_Success() { } eventRepository.saveAll(events); - when(policyRepository.findByOrganizationIdAndMatchExpression(anyLong(), anyString())) - .thenReturn(Optional.of(getPolicyEntity())); - when(policyChannelRepository.findByOrganizationIdAndNotificationPolicyId(anyLong(), anyLong())) - .thenReturn(Lists.newArrayList()); - when(channelRepository.findAllById(any())) + when(policyRepository.findByOrganizationIds(any())) + .thenReturn(Collections.singletonList(getNotificationPolicy())); + when(policyChannelRepository.findByNotificationPolicyIds(any())) + .thenReturn(Collections.singletonList(getNotificationChannelRelationEntity())); + when(channelRepository.findByIdIn(any())) .thenReturn(Arrays.asList(getChannelEntity())); List notifications = @@ -143,4 +142,20 @@ private ChannelEntity getChannelEntity() { return entity; } + private NotificationPolicyEntity getNotificationPolicy() { + NotificationPolicyEntity policy = new NotificationPolicyEntity(); + policy.setId(1L); + policy.setMatchExpression(JsonUtils.toJson(getLabels())); + policy.setOrganizationId(ORGANIZATION_ID); + return policy; + } + + private NotificationChannelRelationEntity getNotificationChannelRelationEntity() { + NotificationChannelRelationEntity entity = new NotificationChannelRelationEntity(); + entity.setOrganizationId(ORGANIZATION_ID); + entity.setChannelId(1L); + entity.setNotificationPolicyId(1L); + return entity; + } + } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/EventFilterTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/EventFilterTest.java index 5209eed5b7..4cd2ab7ba2 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/EventFilterTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/EventFilterTest.java @@ -15,14 +15,13 @@ */ package com.oceanbase.odc.service.notification; -import static org.mockito.ArgumentMatchers.anyLong; -import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.doReturn; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.Set; import java.util.stream.Collectors; import org.junit.After; @@ -33,9 +32,11 @@ import org.springframework.boot.test.mock.mockito.MockBean; import com.oceanbase.odc.ServiceTestEnv; +import com.oceanbase.odc.common.json.JsonUtils; import com.oceanbase.odc.core.shared.constant.TaskType; import com.oceanbase.odc.metadb.notification.EventEntity; import com.oceanbase.odc.metadb.notification.EventRepository; +import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity; import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository; import com.oceanbase.odc.service.notification.helper.EventMapper; import com.oceanbase.odc.service.notification.helper.EventUtils; @@ -78,14 +79,11 @@ public void testFilter_Success() { events.add((mapper.toEntity(getEvent()))); } List entities = eventRepository.saveAll(events); - - when(policyRepository.existsByOrganizationIdAndMatchExpression(anyLong(), anyString())) - .thenReturn(Boolean.TRUE); + doReturn(Collections.singletonList(getNotificationPolicy())).when(policyRepository) + .findByOrganizationIds(any()); List filtered = filter.filter(entities.stream().map(entity -> mapper.fromEntity(entity)).collect(Collectors.toList())); Assert.assertEquals(eventCount, filtered.size()); - Set statusSet = filtered.stream().map(Event::getStatus).collect(Collectors.toSet()); - Assert.assertTrue(statusSet.contains(EventStatus.CONVERTED) && statusSet.size() == 1); } private Event getEvent() { @@ -98,6 +96,13 @@ private Event getEvent() { return event; } + private NotificationPolicyEntity getNotificationPolicy() { + NotificationPolicyEntity policy = new NotificationPolicyEntity(); + policy.setMatchExpression(JsonUtils.toJson(getLabels())); + policy.setOrganizationId(ORGANIZATION_ID); + return policy; + } + private EventLabels getLabels() { return EventUtils.buildEventLabels(TaskType.ASYNC, "failed", 1L); } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/NotificationDispatcherTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/NotificationDispatcherTest.java index b54e3616bf..699afa4c91 100644 --- a/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/NotificationDispatcherTest.java +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/NotificationDispatcherTest.java @@ -69,7 +69,7 @@ public void tearDown() { public void testDispatch_SendFailed() { MessageEntity entity = messageRepository.save(getMessage().toEntity()); - dispatcher.dispatch(Arrays.asList(getNotification(entity))); + dispatcher.dispatch(getNotification(entity)); Assert.assertEquals(MessageSendingStatus.SENT_FAILED, messageRepository.findAll().get(0).getStatus()); } diff --git a/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/NotificationFilterTest.java b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/NotificationFilterTest.java new file mode 100644 index 0000000000..ee4fc363b6 --- /dev/null +++ b/server/integration-test/src/test/java/com/oceanbase/odc/service/notification/NotificationFilterTest.java @@ -0,0 +1,60 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.odc.service.notification; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; +import org.junit.Test; + +import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity; +import com.oceanbase.odc.service.notification.helper.NotificationPolicyFilter; +import com.oceanbase.odc.service.notification.model.EventLabels; + +public class NotificationFilterTest { + + @Test + public void testFilterNotificationPolicies_Succeed() { + EventLabels labels = new EventLabels(); + labels.put("action", "failed"); + + List filtered = NotificationPolicyFilter.filter(labels, getPolicies()); + Assert.assertEquals(1, filtered.size()); + } + + @Test + public void testFilterNotificationPolicies_Fail() { + EventLabels labels = new EventLabels(); + labels.put("action", "succeed"); + + List filtered = NotificationPolicyFilter.filter(labels, getPolicies()); + Assert.assertEquals(0, filtered.size()); + } + + List getPolicies() { + Map conditions = new HashMap<>(); + conditions.put("action", "failed"); + NotificationPolicyEntity policy = new NotificationPolicyEntity(); + policy.setMatchExpression(JsonUtils.toJson(conditions)); + return Collections.singletonList(policy); + } + +} diff --git a/server/odc-migrate/src/main/resources/migrate/common/V_4_2_2_6__alter_notification_event_add_index.sql b/server/odc-migrate/src/main/resources/migrate/common/V_4_2_2_6__alter_notification_event_add_index.sql new file mode 100644 index 0000000000..0ad100b9b1 --- /dev/null +++ b/server/odc-migrate/src/main/resources/migrate/common/V_4_2_2_6__alter_notification_event_add_index.sql @@ -0,0 +1,3 @@ +alter table `notification_event` add index `notification_event_status_time`(`status`, `trigger_time`); + +alter table `notification_message` add index `idx_status_retry_times_max_retry_times`(`status`, `retry_times`, `max_retry_times`); \ No newline at end of file diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/ChannelRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/ChannelRepository.java index 39404165b2..83e27bbf98 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/ChannelRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/ChannelRepository.java @@ -15,9 +15,15 @@ */ package com.oceanbase.odc.metadb.notification; +import java.util.Collection; +import java.util.List; + import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; public interface ChannelRepository extends JpaRepository, JpaSpecificationExecutor { + + List findByIdIn(Collection ids); + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/EventRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/EventRepository.java index ab2bc6a754..f8de6470ad 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/EventRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/EventRepository.java @@ -15,6 +15,7 @@ */ package com.oceanbase.odc.metadb.notification; +import java.util.Collection; import java.util.List; import java.util.Optional; @@ -50,5 +51,10 @@ public interface EventRepository extends JpaRepository, nativeQuery = true) List findNByStatusForUpdate(@Param("status") EventStatus status, @Param("limit") Integer limit); + @Transactional + @Modifying + @Query(value = "update notification_event set `status`=:#{#status.name()} where `id` in (:ids)", nativeQuery = true) + int updateStatusByIds(@Param("status") EventStatus status, @Param("ids") Collection ids); + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/MessageRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/MessageRepository.java index 07c0c291b7..22a8284750 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/MessageRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/MessageRepository.java @@ -46,4 +46,5 @@ int updateStatusById(@Param("id") Long id, nativeQuery = true) List findNByStatusForUpdate(@Param("status") MessageSendingStatus status, @Param("limit") Integer limit); + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyChannelRelationRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyChannelRelationRepository.java index 954e303b25..a0f05b5caf 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyChannelRelationRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyChannelRelationRepository.java @@ -15,10 +15,13 @@ */ package com.oceanbase.odc.metadb.notification; +import java.util.Collection; import java.util.List; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; public interface NotificationPolicyChannelRelationRepository extends JpaRepository, @@ -27,4 +30,8 @@ public interface NotificationPolicyChannelRelationRepository List findByOrganizationIdAndNotificationPolicyId(Long organizationId, Long notificationPolicyId); + + @Query(value = "select * from notification_policy_channel_relation where notification_policy_id in (:ids)", + nativeQuery = true) + List findByNotificationPolicyIds(@Param("ids") Collection ids); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyRepository.java b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyRepository.java index 2ed16a09ca..f86b70a6ff 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyRepository.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/metadb/notification/NotificationPolicyRepository.java @@ -15,15 +15,24 @@ */ package com.oceanbase.odc.metadb.notification; +import java.util.Collection; +import java.util.List; import java.util.Optional; import org.springframework.data.jpa.repository.JpaRepository; import org.springframework.data.jpa.repository.JpaSpecificationExecutor; +import org.springframework.data.jpa.repository.Query; +import org.springframework.data.repository.query.Param; public interface NotificationPolicyRepository extends JpaRepository, JpaSpecificationExecutor { Optional findByOrganizationIdAndMatchExpression(Long organizationId, String matchExpression); + List findByOrganizationId(Long organizationId); + + @Query(value = "select * from notification_policy where organization_id in (:organizationIds)", nativeQuery = true) + List findByOrganizationIds(@Param("organizationIds") Collection ids); + boolean existsByOrganizationIdAndMatchExpression(Long organizationId, String matchExpression); } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java index a490b51d8a..3de984669a 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/connection/ConnectionService.java @@ -713,7 +713,8 @@ private ConnectionConfig internalGet(Long id) { return connection; } - private ConnectionConfig internalGetSkipUserCheck(Long id, boolean withEnvironment) { + @SkipAuthorize("odc internal usage") + public ConnectionConfig internalGetSkipUserCheck(Long id, boolean withEnvironment) { ConnectionConfig config = entityToModel(getEntity(id), withEnvironment); List entities = this.attributeRepository.findByConnectionId(config.getId()); config.setAttributes(attrEntitiesToMap(entities)); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java index fa69b0b94b..b2058ab1bd 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/flow/task/BaseODCFlowTaskDelegate.java @@ -29,6 +29,7 @@ import org.springframework.beans.factory.annotation.Autowired; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.oceanbase.odc.common.util.SystemUtils; import com.oceanbase.odc.core.flow.exception.BaseFlowException; import com.oceanbase.odc.core.shared.Verify; import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository; @@ -246,46 +247,36 @@ protected boolean isTimeout() { */ protected void onFailure(Long taskId, TaskService taskService) { if (notificationProperties.isEnabled()) { - TaskEntity taskEntity = taskService.detail(taskId); - ConnectionConfig connection = connectionService.getWithoutPermissionCheck(taskEntity.getConnectionId()); - EventLabels labels = EventUtils.buildEventLabels(taskEntity.getTaskType(), "failed", - taskEntity.getConnectionId()); - Map extend = new HashMap<>(); - extend.put(EventLabelKeys.VARIABLE_KEY_CLUSTER_NAME, connection.getClusterName()); - extend.put(EventLabelKeys.VARIABLE_KEY_TENANT_NAME, connection.getTenantName()); - labels.addLabels(extend); - broker.enqueueEvent(Event.builder() - .status(EventStatus.CREATED) - .creatorId(taskEntity.getCreatorId()) - .organizationId(taskEntity.getOrganizationId()) - .triggerTime(new Date(System.currentTimeMillis())) - .labels(labels) - .build()); + try { + TaskEntity taskEntity = taskService.detail(taskId); + ConnectionConfig connection = + connectionService.internalGetSkipUserCheck(taskEntity.getConnectionId(), true); + EventLabels labels = EventUtils.buildEventLabels(taskEntity.getTaskType(), "failed", + taskEntity.getConnectionId()); + Map extend = new HashMap<>(); + extend.put(EventLabelKeys.VARIABLE_KEY_CLUSTER_NAME, connection.getClusterName()); + extend.put(EventLabelKeys.VARIABLE_KEY_TENANT_NAME, connection.getTenantName()); + extend.put(EventLabelKeys.VARIABLE_KEY_TASK_ID, taskId + ""); + extend.put(EventLabelKeys.VARIABLE_KEY_REGION, SystemUtils.getEnvOrProperty("OB_ARN_PARTITION")); + labels.addLabels(extend); + broker.enqueueEvent(Event.builder() + .status(EventStatus.CREATED) + .creatorId(taskEntity.getCreatorId()) + .organizationId(taskEntity.getOrganizationId()) + .triggerTime(new Date(System.currentTimeMillis())) + .labels(labels) + .build()); + } catch (Exception e) { + log.warn("Failed to enqueue event.", e); + } } + } /** * The callback method when the task is successful, used to update the status and other operations */ - protected void onSuccessful(Long taskId, TaskService taskService) { - if (notificationProperties.isEnabled()) { - TaskEntity taskEntity = taskService.detail(taskId); - ConnectionConfig connection = connectionService.getWithoutPermissionCheck(taskEntity.getConnectionId()); - EventLabels labels = EventUtils.buildEventLabels(taskEntity.getTaskType(), "succeed", - taskEntity.getConnectionId()); - Map extend = new HashMap<>(); - extend.put(EventLabelKeys.VARIABLE_KEY_CLUSTER_NAME, connection.getClusterName()); - extend.put(EventLabelKeys.VARIABLE_KEY_TENANT_NAME, connection.getTenantName()); - labels.addLabels(extend); - broker.enqueueEvent(Event.builder() - .status(EventStatus.CREATED) - .creatorId(taskEntity.getCreatorId()) - .organizationId(taskEntity.getOrganizationId()) - .triggerTime(new Date(System.currentTimeMillis())) - .labels(labels) - .build()); - } - } + protected void onSuccessful(Long taskId, TaskService taskService) {} /** * The callback method of the task execution timeout, which is used to update the status and other diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Broker.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Broker.java index 5f19a7ec83..257bf1f163 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Broker.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Broker.java @@ -22,11 +22,14 @@ import org.springframework.transaction.annotation.Transactional; import com.oceanbase.odc.core.authority.util.SkipAuthorize; +import com.oceanbase.odc.metadb.notification.MessageRepository; import com.oceanbase.odc.service.notification.model.Event; import com.oceanbase.odc.service.notification.model.EventStatus; import com.oceanbase.odc.service.notification.model.MessageSendingStatus; import com.oceanbase.odc.service.notification.model.Notification; +import lombok.extern.slf4j.Slf4j; + /** * @Author: Lebie * @Date: 2023/3/20 14:45 @@ -34,6 +37,7 @@ */ @Service @SkipAuthorize("currently not in use") +@Slf4j public class Broker { @Autowired private EventQueue eventQueue; @@ -53,6 +57,9 @@ public class Broker { @Autowired private NotificationProperties notificationProperties; + @Autowired + private MessageRepository messageRepository; + @Transactional(rollbackFor = Exception.class) public void dequeueEvent(EventStatus eventStatus) { // 从事件队列中拉取事件 @@ -70,11 +77,17 @@ public void enqueueEvent(Event event) { eventQueue.offer(event); } - - @Transactional(rollbackFor = Exception.class) public void dequeueNotification(MessageSendingStatus status) { List notifications = notificationQueue.peek(notificationProperties.getNotificationDequeueBatchSize(), status); - notificationDispatcher.dispatch(notifications); + for (Notification notification : notifications) { + try { + notificationDispatcher.dispatch(notification); + } catch (Exception e) { + messageRepository.updateStatusAndRetryTimesById(notification.getMessage().getId(), + MessageSendingStatus.SENT_FAILED); + log.warn("Send notification failed.", e); + } + } } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Converter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Converter.java index 1ce6ac4996..202ea79383 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Converter.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/Converter.java @@ -17,10 +17,10 @@ import java.util.ArrayList; import java.util.List; -import java.util.Optional; +import java.util.Map; +import java.util.Objects; import java.util.stream.Collectors; -import org.apache.commons.collections.ListUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.util.CollectionUtils; @@ -32,8 +32,8 @@ import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity; import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository; import com.oceanbase.odc.service.notification.helper.ChannelMapper; -import com.oceanbase.odc.service.notification.helper.EventUtils; import com.oceanbase.odc.service.notification.helper.MessageTemplateProcessor; +import com.oceanbase.odc.service.notification.helper.NotificationPolicyFilter; import com.oceanbase.odc.service.notification.model.ChannelConfig; import com.oceanbase.odc.service.notification.model.Event; import com.oceanbase.odc.service.notification.model.Message; @@ -64,50 +64,72 @@ public class Converter { private NotificationProperties notificationProperties; public List convert(List events) { + List notifications = new ArrayList<>(); if (CollectionUtils.isEmpty(events)) { - return ListUtils.EMPTY_LIST; + return notifications; } - List notifications = new ArrayList<>(); + + List policies = notificationPolicyRepository.findByOrganizationIds( + events.stream().map(Event::getOrganizationId).collect(Collectors.toSet())); + if (CollectionUtils.isEmpty(policies)) { + return notifications; + } + Map> mappedPolicies = policies.stream() + .collect(Collectors.groupingBy(NotificationPolicyEntity::getOrganizationId)); + + List policyChannelEntities = + policyChannelRepository.findByNotificationPolicyIds( + policies.stream().map(NotificationPolicyEntity::getId).collect(Collectors.toSet())); + if (CollectionUtils.isEmpty(policyChannelEntities)) { + return notifications; + } + + Map> mappedChannels = channelRepository.findByIdIn( + policyChannelEntities.stream() + .map(NotificationChannelRelationEntity::getChannelId).collect(Collectors.toSet())) + .stream().map(entity -> channelMapper.fromEntity(entity)) + .collect(Collectors.groupingBy(channel -> { + for (NotificationChannelRelationEntity entity : policyChannelEntities) { + if (Objects.equals(entity.getChannelId(), channel.getId())) { + return entity.getNotificationPolicyId(); + } + } + return null; + })); + for (Event event : events) { - Optional policyOpt = - notificationPolicyRepository.findByOrganizationIdAndMatchExpression(event.getOrganizationId(), - EventUtils.generateMatchExpression(event.getLabels())); - if (!policyOpt.isPresent()) { - return null; + List matched = NotificationPolicyFilter.filter(event.getLabels(), + mappedPolicies.get(event.getOrganizationId())); + if (matched.isEmpty()) { + continue; } - NotificationPolicyEntity policy = policyOpt.get(); - List policyChannelEntity = - policyChannelRepository.findByOrganizationIdAndNotificationPolicyId(event.getOrganizationId(), - policy.getId()); - List channels = channelRepository.findAllById( - policyChannelEntity.stream() - .map(NotificationChannelRelationEntity::getChannelId) - .collect(Collectors.toSet())) - .stream().map(entity -> channelMapper.fromEntity(entity)).collect(Collectors.toList()); + for (NotificationPolicyEntity policy : matched) { + List channels = mappedChannels.get(policy.getId()); - if (CollectionUtils.isEmpty(channels)) { - return null; - } - channels.stream().forEach(channel -> { - Notification notification = new Notification(); + if (CollectionUtils.isEmpty(channels)) { + return null; + } + channels.forEach(channel -> { + Notification notification = new Notification(); - Message message = new Message(); - message.setTitle( - MessageTemplateProcessor.replaceVariables(policy.getTitleTemplate(), event.getLabels())); - message.setContent( - MessageTemplateProcessor.replaceVariables(policy.getContentTemplate(), event.getLabels())); - message.setOrganizationId(policy.getOrganizationId()); - message.setCreatorId(event.getCreatorId()); - message.setEventId(event.getId()); - message.setChannelId(channel.getId()); - message.setStatus(MessageSendingStatus.CREATED); - message.setRetryTimes(0); - message.setMaxRetryTimes(notificationProperties.getMaxResendTimes()); - message.setToRecipients(policy.getToRecipients()); - message.setCcRecipients(policy.getCcRecipients()); - notification.setMessage(message); - notifications.add(notification); - }); + Message message = new Message(); + message.setTitle( + MessageTemplateProcessor.replaceVariables(policy.getTitleTemplate(), event.getLabels())); + message.setContent( + MessageTemplateProcessor.replaceVariables(policy.getContentTemplate(), event.getLabels())); + message.setOrganizationId(policy.getOrganizationId()); + message.setCreatorId(event.getCreatorId()); + message.setEventId(event.getId()); + message.setChannelId(channel.getId()); + message.setStatus(MessageSendingStatus.CREATED); + message.setRetryTimes(0); + message.setMaxRetryTimes(notificationProperties.getMaxResendTimes()); + message.setToRecipients(policy.getToRecipients()); + message.setCcRecipients(policy.getCcRecipients()); + notification.setMessage(message); + notifications.add(notification); + }); + } } return notifications; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/DingTalkBotChannel.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/DingTalkBotChannel.java index 722cc30850..773be9e97e 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/DingTalkBotChannel.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/DingTalkBotChannel.java @@ -17,6 +17,8 @@ import java.util.stream.Collectors; +import org.apache.commons.collections4.CollectionUtils; + import com.dingtalk.api.DefaultDingTalkClient; import com.dingtalk.api.DingTalkClient; import com.dingtalk.api.request.OapiRobotSendRequest; @@ -61,13 +63,14 @@ public boolean send(Message message) { DingTalkClient client = new DefaultDingTalkClient(this.serviceUri); OapiRobotSendRequest request = new OapiRobotSendRequest(); OapiRobotSendRequest.At at = new OapiRobotSendRequest.At(); - at.setAtUserIds(userRepository - .findByIdIn(message.getToRecipients().stream().map(recipient -> Long.valueOf(recipient)) - .collect(Collectors.toSet())) - .stream().map( - userEntity -> ChannelUtils.getRecipientAttributeName(this.atUserIdsAttributeName, - userEntity)) - .collect(Collectors.toList())); + if (CollectionUtils.isNotEmpty(message.getToRecipients())) { + at.setAtUserIds(userRepository.findByIdIn(message.getToRecipients() + .stream() + .map(Long::valueOf).collect(Collectors.toSet())) + .stream() + .map(userEntity -> ChannelUtils.getRecipientAttributeName(this.atUserIdsAttributeName, userEntity)) + .collect(Collectors.toList())); + } at.setIsAtAll(this.atAll); request.setAt(at); request.setMsgtype("text"); diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/EventFilter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/EventFilter.java index 9cb542a4b0..a6b10704e1 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/EventFilter.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/EventFilter.java @@ -17,8 +17,10 @@ import java.util.ArrayList; import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; -import org.apache.commons.collections.ListUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; import org.springframework.transaction.annotation.Transactional; @@ -26,9 +28,10 @@ import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.metadb.notification.EventRepository; +import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity; import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository; import com.oceanbase.odc.service.notification.helper.EventMapper; -import com.oceanbase.odc.service.notification.helper.EventUtils; +import com.oceanbase.odc.service.notification.helper.NotificationPolicyFilter; import com.oceanbase.odc.service.notification.model.Event; import com.oceanbase.odc.service.notification.model.EventStatus; @@ -53,20 +56,28 @@ public class EventFilter { public List filter(List events) { List filtered = new ArrayList<>(); if (CollectionUtils.isEmpty(events)) { - return ListUtils.EMPTY_LIST; + return filtered; } + Set organizationIds = events.stream().map(Event::getOrganizationId).collect(Collectors.toSet()); + Map> policies = notificationPolicyRepository + .findByOrganizationIds(organizationIds).stream() + .collect(Collectors.groupingBy(NotificationPolicyEntity::getOrganizationId)); + List thrown = new ArrayList<>(); for (Event event : events) { - String matchExpression = EventUtils.generateMatchExpression(event.getLabels()); - EventStatus status; - if (notificationPolicyRepository.existsByOrganizationIdAndMatchExpression(event.getOrganizationId(), - matchExpression)) { - status = EventStatus.CONVERTED; - filtered.add(event); + List matched = NotificationPolicyFilter.filter(event.getLabels(), + policies.get(event.getOrganizationId())); + if (matched.isEmpty()) { + thrown.add(event.getId()); } else { - status = EventStatus.THROWN; + filtered.add(event); } - event.setStatus(status); - eventRepository.updateStatusById(event.getId(), status); + } + if (!CollectionUtils.isEmpty(thrown)) { + eventRepository.updateStatusByIds(EventStatus.THROWN, thrown); + } + if (!CollectionUtils.isEmpty(filtered)) { + eventRepository.updateStatusByIds(EventStatus.CONVERTED, filtered.stream().map(Event::getId).collect( + Collectors.toSet())); } return filtered; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcEventQueue.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcEventQueue.java index 8486e9f62f..fb334ba776 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcEventQueue.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcEventQueue.java @@ -48,9 +48,6 @@ public class JdbcEventQueue implements EventQueue { @Autowired private EventMapper eventMapper; - @Autowired - private NotificationProperties notificationProperties; - @Override @Transactional(rollbackFor = Exception.class) public boolean offer(Event event) { @@ -67,16 +64,13 @@ public Event peek(EventStatus status) { @Override public List peek(int batchSize, EventStatus status) { - List events; try { - events = eventRepository.findNByStatusForUpdate(status, - notificationProperties.getEventDequeueBatchSize()).stream() - .map(entity -> eventMapper.fromEntity(entity)).collect(Collectors.toList()); + return eventRepository.findNByStatusForUpdate(status, batchSize) + .stream().map(entity -> eventMapper.fromEntity(entity)).collect(Collectors.toList()); } catch (Exception ex) { log.warn("peek events failed, ", ex); return ListUtils.EMPTY_LIST; } - return events; } @Override diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcNotificationQueue.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcNotificationQueue.java index d9c5d0616a..257196be4b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcNotificationQueue.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/JdbcNotificationQueue.java @@ -20,9 +20,9 @@ import java.util.Optional; import java.util.stream.Collectors; -import org.apache.commons.collections.ListUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; import org.springframework.util.CollectionUtils; import com.oceanbase.odc.core.authority.util.SkipAuthorize; @@ -67,30 +67,26 @@ public boolean offer(List notifications) { } @Override + @Transactional(rollbackFor = Exception.class) public List peek(int batchSize, MessageSendingStatus status) { List notifications = new ArrayList<>(); - try { - List messageEntities = messageRepository.findNByStatusForUpdate(status, batchSize); - if (CollectionUtils.isEmpty(messageEntities)) { - return notifications; - } - messageEntities.stream().forEach(messageEntity -> { - Optional channelOpt = channelRepository.findById(messageEntity.getChannelId()); - if (!channelOpt.isPresent()) { - messageEntity.setStatus(MessageSendingStatus.THROWN); - messageRepository.save(messageEntity); - } else { - Notification notification = new Notification(); - ChannelEntity channel = channelOpt.get(); - notification.setMessage(Message.fromEntity(messageEntity)); - notification.setChannel(channelMapper.fromEntity(channel)); - notifications.add(notification); - } - }); - } catch (Exception ex) { - log.warn("peek notifications failed, ", ex); - return ListUtils.EMPTY_LIST; + List messageEntities = messageRepository.findNByStatusForUpdate(status, batchSize); + if (CollectionUtils.isEmpty(messageEntities)) { + return notifications; } + messageEntities.forEach(messageEntity -> { + Optional channelOpt = channelRepository.findById(messageEntity.getChannelId()); + if (!channelOpt.isPresent()) { + messageRepository.updateStatusById(messageEntity.getId(), MessageSendingStatus.THROWN); + } else { + messageRepository.updateStatusById(messageEntity.getId(), MessageSendingStatus.CONVERTING); + Notification notification = new Notification(); + ChannelEntity channel = channelOpt.get(); + notification.setMessage(Message.fromEntity(messageEntity)); + notification.setChannel(channelMapper.fromEntity(channel)); + notifications.add(notification); + } + }); return notifications; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationDispatcher.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationDispatcher.java index 200712c83c..600d937ca4 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationDispatcher.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationDispatcher.java @@ -15,11 +15,8 @@ */ package com.oceanbase.odc.service.notification; -import java.util.List; - import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; -import org.springframework.transaction.annotation.Transactional; import com.oceanbase.odc.core.authority.util.SkipAuthorize; import com.oceanbase.odc.metadb.notification.MessageRepository; @@ -27,8 +24,6 @@ import com.oceanbase.odc.service.notification.model.MessageSendingStatus; import com.oceanbase.odc.service.notification.model.Notification; -import lombok.NonNull; - /** * @Author: Lebie * @Date: 2023/3/20 14:45 @@ -43,18 +38,15 @@ public class NotificationDispatcher { @Autowired private ChannelFactory channelFactory; - @Transactional(rollbackFor = Exception.class) - public void dispatch(@NonNull List notifications) { - notifications.stream().forEach(notification -> { - ChannelConfig channelConfig = notification.getChannel(); - Channel channel = channelFactory.generate(channelConfig); - if (channel.send(notification.getMessage())) { - messageRepository.updateStatusById(notification.getMessage().getId(), - MessageSendingStatus.SENT_SUCCESSFULLY); - } else { - messageRepository.updateStatusAndRetryTimesById(notification.getMessage().getId(), - MessageSendingStatus.SENT_FAILED); - } - }); + public void dispatch(Notification notification) { + ChannelConfig channelConfig = notification.getChannel(); + Channel channel = channelFactory.generate(channelConfig); + if (channel.send(notification.getMessage())) { + messageRepository.updateStatusById(notification.getMessage().getId(), + MessageSendingStatus.SENT_SUCCESSFULLY); + } else { + messageRepository.updateStatusAndRetryTimesById(notification.getMessage().getId(), + MessageSendingStatus.SENT_FAILED); + } } } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationProperties.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationProperties.java index 002a71cbb1..8c12058b10 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationProperties.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationProperties.java @@ -39,4 +39,10 @@ public class NotificationProperties { private int maxResendTimes; + private int dequeueEventFixedDelayMillis; + + private int dequeueCreatedNotificationFixedDelayMillis; + + private int dequeueFailedNotificationFixedDelayMillis; + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationScheduleConfiguration.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationScheduleConfiguration.java new file mode 100644 index 0000000000..dc43b728b3 --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationScheduleConfiguration.java @@ -0,0 +1,72 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.odc.service.notification; + +import java.time.Duration; +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.Executors; +import java.util.function.Supplier; + +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.Trigger; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.scheduling.annotation.SchedulingConfigurer; +import org.springframework.scheduling.config.ScheduledTaskRegistrar; + +import com.oceanbase.odc.service.notification.model.EventStatus; +import com.oceanbase.odc.service.notification.model.MessageSendingStatus; + +@EnableScheduling +@Configuration +@ConditionalOnProperty(value = "odc.notification.enabled", havingValue = "true") +public class NotificationScheduleConfiguration implements SchedulingConfigurer { + @Autowired + private NotificationProperties notificationProperties; + + @Autowired + private Broker broker; + + @Override + public void configureTasks(ScheduledTaskRegistrar taskRegistrar) { + taskRegistrar.setScheduler(Executors.newScheduledThreadPool(3)); + + taskRegistrar.addTriggerTask(() -> broker.dequeueEvent(EventStatus.CREATED), + getTrigger(() -> Duration.ofMillis(notificationProperties.getDequeueEventFixedDelayMillis()))); + taskRegistrar.addTriggerTask(() -> broker.dequeueNotification(MessageSendingStatus.CREATED), + getTrigger(() -> Duration + .ofMillis(notificationProperties.getDequeueCreatedNotificationFixedDelayMillis()))); + taskRegistrar.addTriggerTask(() -> broker.dequeueNotification(MessageSendingStatus.SENT_FAILED), + getTrigger(() -> Duration + .ofMillis(notificationProperties.getDequeueFailedNotificationFixedDelayMillis()))); + } + + private Trigger getTrigger(Supplier durationSupplier) { + + return triggerContext -> { + Calendar nextExecutionTime = Calendar.getInstance(); + Date lastActualExecutionTime = triggerContext.lastActualExecutionTime(); + nextExecutionTime.setTime(lastActualExecutionTime != null ? lastActualExecutionTime : new Date()); + nextExecutionTime.add(Calendar.MILLISECOND, (int) durationSupplier.get().toMillis()); + return nextExecutionTime.getTime(); + }; + + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationSchedules.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationSchedules.java deleted file mode 100644 index 9e2fe2ed02..0000000000 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/NotificationSchedules.java +++ /dev/null @@ -1,59 +0,0 @@ -/* - * Copyright (c) 2023 OceanBase. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.oceanbase.odc.service.notification; - -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; -import org.springframework.context.annotation.Configuration; -import org.springframework.scheduling.annotation.Scheduled; - -import com.oceanbase.odc.service.notification.model.EventStatus; -import com.oceanbase.odc.service.notification.model.MessageSendingStatus; - -import lombok.extern.slf4j.Slf4j; - -/** - * @Author: Lebie - * @Date: 2023/3/22 15:50 - * @Description: [] - */ -@Slf4j -@Configuration -@ConditionalOnProperty(value = "odc.notification.enabled", havingValue = "true") -public class NotificationSchedules { - @Autowired - private Broker broker; - - @Autowired - private NotificationProperties notificationProperties; - - @Scheduled(fixedDelayString = "${odc.notification.dequeue-event-fixed-delay-millis:30000}") - public void convertEventToNotification() { - broker.dequeueEvent(EventStatus.CREATED); - } - - @Scheduled(fixedDelayString = "${odc.notification.dequeue-created-notification-fixed-delay-millis:30000}") - public void sendNotification() { - broker.dequeueNotification(MessageSendingStatus.CREATED); - } - - @Scheduled(fixedDelayString = "${odc.notification.dequeue-failed-notification-fixed-delay-millis:30000}") - public void resendNotification() { - broker.dequeueNotification(MessageSendingStatus.SENT_FAILED); - } - - -} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/constant/EventLabelKeys.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/constant/EventLabelKeys.java index ce754e637a..5a2a96aaa5 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/constant/EventLabelKeys.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/constant/EventLabelKeys.java @@ -31,4 +31,8 @@ public class EventLabelKeys { public static final String VARIABLE_KEY_TENANT_NAME = "tenantName"; + public static final String VARIABLE_KEY_REGION = "region"; + + public static final String VARIABLE_KEY_TASK_ID = "taskId"; + } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/helper/NotificationPolicyFilter.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/helper/NotificationPolicyFilter.java new file mode 100644 index 0000000000..745c809afd --- /dev/null +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/helper/NotificationPolicyFilter.java @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2023 OceanBase. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.oceanbase.odc.service.notification.helper; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import org.apache.commons.collections4.CollectionUtils; +import org.apache.commons.collections4.MapUtils; + +import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.common.util.StringUtils; +import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity; +import com.oceanbase.odc.service.notification.model.EventLabels; + +public class NotificationPolicyFilter { + + public static List filter(EventLabels labels, List policies) { + List filtered = new ArrayList<>(); + if (CollectionUtils.isEmpty(policies) || MapUtils.isEmpty(labels)) { + return filtered; + } + for (NotificationPolicyEntity policy : policies) { + Map conditions = + JsonUtils.fromJsonMap(policy.getMatchExpression(), String.class, String.class); + if (conditions.entrySet().stream().allMatch(entry -> labels.containsKey(entry.getKey()) + && StringUtils.equals(entry.getValue(), labels.get(entry.getKey())))) { + filtered.add(policy); + } + } + return filtered; + } + +} diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/model/MessageSendingStatus.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/model/MessageSendingStatus.java index f5d1287e1c..8a00e560b8 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/model/MessageSendingStatus.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/notification/model/MessageSendingStatus.java @@ -27,11 +27,10 @@ public enum MessageSendingStatus { THROWN("THROWN"), - PROCESSING("SENDING"); + CONVERTING("CONVERTING"); private String name; - MessageSendingStatus(String name) { this.name = name; } diff --git a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java index 80eacf182c..b011faaa3b 100644 --- a/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java +++ b/server/odc-service/src/main/java/com/oceanbase/odc/service/quartz/OdcJobListener.java @@ -15,10 +15,12 @@ */ package com.oceanbase.odc.service.quartz; +import java.util.Date; import java.util.List; import java.util.Objects; import java.util.Optional; +import org.quartz.JobDataMap; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; @@ -29,6 +31,7 @@ import org.springframework.stereotype.Component; import com.oceanbase.odc.common.json.JsonUtils; +import com.oceanbase.odc.common.util.SystemUtils; import com.oceanbase.odc.core.shared.constant.ResourceType; import com.oceanbase.odc.core.shared.constant.TaskStatus; import com.oceanbase.odc.core.shared.exception.NotFoundException; @@ -41,6 +44,12 @@ import com.oceanbase.odc.service.iam.UserService; import com.oceanbase.odc.service.iam.model.User; import com.oceanbase.odc.service.iam.util.SecurityContextUtils; +import com.oceanbase.odc.service.notification.Broker; +import com.oceanbase.odc.service.notification.NotificationProperties; +import com.oceanbase.odc.service.notification.constant.EventLabelKeys; +import com.oceanbase.odc.service.notification.model.Event; +import com.oceanbase.odc.service.notification.model.EventLabels; +import com.oceanbase.odc.service.notification.model.EventStatus; import com.oceanbase.odc.service.quartz.util.ScheduleTaskUtils; import com.oceanbase.odc.service.schedule.model.JobType; import com.oceanbase.odc.service.schedule.model.QuartzKeyGenerator; @@ -67,6 +76,10 @@ public class OdcJobListener implements JobListener { private UserService userService; @Autowired private HostProperties hostProperties; + @Autowired + private Broker broker; + @Autowired + private NotificationProperties notificationProperties; private static final String ODC_JOB_LISTENER = "ODC_JOB_LISTENER"; @Override @@ -129,6 +142,26 @@ public void jobExecutionVetoed(JobExecutionContext context) { @Override public void jobWasExecuted(JobExecutionContext context, JobExecutionException jobException) { + if (notificationProperties.isEnabled() && jobException != null) { + try { + JobDataMap dataMap = context.getMergedJobDataMap(); + EventLabels labels = new EventLabels(); + labels.put(EventLabelKeys.IDENTIFIER_KEY_TASK_TYPE, context.getJobInstance().getClass().getName()); + labels.put(EventLabelKeys.IDENTIFIER_KEY_ACTION, "failed"); + labels.put(EventLabelKeys.VARIABLE_KEY_REGION, SystemUtils.getEnvOrProperty("OB_ARN_PARTITION")); + labels.put("taskInfo", JsonUtils.toJson(context.getMergedJobDataMap())); + labels.put("errorMessage", jobException.getMessage()); + broker.enqueueEvent(Event.builder() + .status(EventStatus.CREATED) + .creatorId(dataMap.getLongFromString("creatorId")) + .organizationId(dataMap.getLongFromString("organizationId")) + .triggerTime(new Date(System.currentTimeMillis())) + .labels(labels) + .build()); + } catch (Exception e) { + log.warn("Enqueue event failed, jobException:{}", jobException, e); + } + } List jobTriggers; Optional scheduleEntityOptional = scheduleRepository.findById(ScheduleTaskUtils.getScheduleId(context));