Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notification): support send notification when schedule job failed #711

Merged
merged 31 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1aff355
fix NPE when no recipients configured
LuckyPickleZZ Nov 3, 2023
0446dbe
add NotificationPolicyFilter to match expression
LuckyPickleZZ Nov 3, 2023
4defcae
enqueue event when job failed
LuckyPickleZZ Nov 3, 2023
d5e1c04
fix message would be resent on exception
LuckyPickleZZ Nov 3, 2023
9d7dc1b
remove unused imports
LuckyPickleZZ Nov 3, 2023
9deaeba
restore debug
LuckyPickleZZ Nov 3, 2023
b65b8f6
add region into message
LuckyPickleZZ Nov 3, 2023
e7ea78c
add UT
LuckyPickleZZ Nov 3, 2023
aa841df
format code
LuckyPickleZZ Nov 3, 2023
ec4cdd0
fix UT failure
LuckyPickleZZ Nov 6, 2023
4672d2c
Merge branch 'dev/4.2.2' into wenmu_422_feat_schedule_failed_notifica…
LuckyPickleZZ Nov 7, 2023
4f09cdc
resolve conflicts
LuckyPickleZZ Nov 7, 2023
b366b16
fix UT
LuckyPickleZZ Nov 7, 2023
899043f
format code
LuckyPickleZZ Nov 7, 2023
261ac6e
a failed notification shouldn't block others
LuckyPickleZZ Nov 8, 2023
50a99c2
optimize performance
LuckyPickleZZ Nov 17, 2023
016667a
format code
LuckyPickleZZ Nov 17, 2023
6cd0f00
response to CR
LuckyPickleZZ Nov 17, 2023
41c2eca
use NotificationScheduleConfiguration to replace NotificationSchedules
LuckyPickleZZ Nov 17, 2023
13f84e0
fix JPA error
LuckyPickleZZ Nov 17, 2023
fe19c93
fix UT
LuckyPickleZZ Nov 18, 2023
7c16975
format code
LuckyPickleZZ Nov 18, 2023
5d1419c
format code
LuckyPickleZZ Nov 18, 2023
2ae7d82
ignore UT
LuckyPickleZZ Nov 18, 2023
245dc74
Merge branch 'dev/4.2.2' into wenmu_422_feat_schedule_failed_notifica…
LuckyPickleZZ Nov 18, 2023
757f4a3
fix
LuckyPickleZZ Nov 19, 2023
cc08238
rename script
LuckyPickleZZ Nov 20, 2023
9f8edd4
remove async
LuckyPickleZZ Nov 20, 2023
e42b8d6
add `taskId` into labels
LuckyPickleZZ Nov 20, 2023
ea0bdfc
add `region` into labels
LuckyPickleZZ Nov 20, 2023
21e9eef
remove unused code
LuckyPickleZZ Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,15 @@
package com.oceanbase.odc.service.notification;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.commons.compress.utils.Lists;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
Expand All @@ -36,11 +33,13 @@
import org.springframework.boot.test.mock.mockito.MockBean;

import com.oceanbase.odc.ServiceTestEnv;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.metadb.notification.ChannelEntity;
import com.oceanbase.odc.metadb.notification.ChannelRepository;
import com.oceanbase.odc.metadb.notification.EventEntity;
import com.oceanbase.odc.metadb.notification.EventRepository;
import com.oceanbase.odc.metadb.notification.NotificationChannelRelationEntity;
import com.oceanbase.odc.metadb.notification.NotificationPolicyChannelRelationRepository;
import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity;
import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository;
Expand Down Expand Up @@ -95,11 +94,11 @@ public void testConvert_Success() {
}
eventRepository.saveAll(events);

when(policyRepository.findByOrganizationIdAndMatchExpression(anyLong(), anyString()))
.thenReturn(Optional.of(getPolicyEntity()));
when(policyChannelRepository.findByOrganizationIdAndNotificationPolicyId(anyLong(), anyLong()))
.thenReturn(Lists.newArrayList());
when(channelRepository.findAllById(any()))
when(policyRepository.findByOrganizationIds(any()))
.thenReturn(Collections.singletonList(getNotificationPolicy()));
when(policyChannelRepository.findByNotificationPolicyIds(any()))
.thenReturn(Collections.singletonList(getNotificationChannelRelationEntity()));
when(channelRepository.findByIdIn(any()))
.thenReturn(Arrays.asList(getChannelEntity()));

List<Notification> notifications =
Expand Down Expand Up @@ -143,4 +142,20 @@ private ChannelEntity getChannelEntity() {
return entity;
}

private NotificationPolicyEntity getNotificationPolicy() {
NotificationPolicyEntity policy = new NotificationPolicyEntity();
policy.setId(1L);
policy.setMatchExpression(JsonUtils.toJson(getLabels()));
policy.setOrganizationId(ORGANIZATION_ID);
return policy;
}

private NotificationChannelRelationEntity getNotificationChannelRelationEntity() {
NotificationChannelRelationEntity entity = new NotificationChannelRelationEntity();
entity.setOrganizationId(ORGANIZATION_ID);
entity.setChannelId(1L);
entity.setNotificationPolicyId(1L);
return entity;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
*/
package com.oceanbase.odc.service.notification;

import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.when;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;

import org.junit.After;
Expand All @@ -33,9 +32,11 @@
import org.springframework.boot.test.mock.mockito.MockBean;

import com.oceanbase.odc.ServiceTestEnv;
import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.core.shared.constant.TaskType;
import com.oceanbase.odc.metadb.notification.EventEntity;
import com.oceanbase.odc.metadb.notification.EventRepository;
import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity;
import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository;
import com.oceanbase.odc.service.notification.helper.EventMapper;
import com.oceanbase.odc.service.notification.helper.EventUtils;
Expand Down Expand Up @@ -78,14 +79,11 @@ public void testFilter_Success() {
events.add((mapper.toEntity(getEvent())));
}
List<EventEntity> entities = eventRepository.saveAll(events);

when(policyRepository.existsByOrganizationIdAndMatchExpression(anyLong(), anyString()))
.thenReturn(Boolean.TRUE);
doReturn(Collections.singletonList(getNotificationPolicy())).when(policyRepository)
.findByOrganizationIds(any());
List<Event> filtered =
filter.filter(entities.stream().map(entity -> mapper.fromEntity(entity)).collect(Collectors.toList()));
Assert.assertEquals(eventCount, filtered.size());
Set<EventStatus> statusSet = filtered.stream().map(Event::getStatus).collect(Collectors.toSet());
Assert.assertTrue(statusSet.contains(EventStatus.CONVERTED) && statusSet.size() == 1);
}

private Event getEvent() {
Expand All @@ -98,6 +96,13 @@ private Event getEvent() {
return event;
}

private NotificationPolicyEntity getNotificationPolicy() {
NotificationPolicyEntity policy = new NotificationPolicyEntity();
policy.setMatchExpression(JsonUtils.toJson(getLabels()));
policy.setOrganizationId(ORGANIZATION_ID);
return policy;
}

private EventLabels getLabels() {
return EventUtils.buildEventLabels(TaskType.ASYNC, "failed", 1L);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void tearDown() {
public void testDispatch_SendFailed() {
MessageEntity entity = messageRepository.save(getMessage().toEntity());

dispatcher.dispatch(Arrays.asList(getNotification(entity)));
dispatcher.dispatch(getNotification(entity));

Assert.assertEquals(MessageSendingStatus.SENT_FAILED, messageRepository.findAll().get(0).getStatus());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.odc.service.notification;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Assert;
import org.junit.Test;

import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity;
import com.oceanbase.odc.service.notification.helper.NotificationPolicyFilter;
import com.oceanbase.odc.service.notification.model.EventLabels;

public class NotificationFilterTest {

@Test
public void testFilterNotificationPolicies_Succeed() {
EventLabels labels = new EventLabels();
labels.put("action", "failed");

List<NotificationPolicyEntity> filtered = NotificationPolicyFilter.filter(labels, getPolicies());
Assert.assertEquals(1, filtered.size());
}

@Test
public void testFilterNotificationPolicies_Fail() {
EventLabels labels = new EventLabels();
labels.put("action", "succeed");

List<NotificationPolicyEntity> filtered = NotificationPolicyFilter.filter(labels, getPolicies());
Assert.assertEquals(0, filtered.size());
}

List<NotificationPolicyEntity> getPolicies() {
Map<String, String> conditions = new HashMap<>();
conditions.put("action", "failed");
NotificationPolicyEntity policy = new NotificationPolicyEntity();
policy.setMatchExpression(JsonUtils.toJson(conditions));
return Collections.singletonList(policy);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
alter table `notification_event` add index `notification_event_status_time`(`status`, `trigger_time`);

alter table `notification_message` add index `idx_status_retry_times_max_retry_times`(`status`, `retry_times`, `max_retry_times`);
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,15 @@
*/
package com.oceanbase.odc.metadb.notification;

import java.util.Collection;
import java.util.List;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;

public interface ChannelRepository extends JpaRepository<ChannelEntity, Long>,
JpaSpecificationExecutor<ChannelEntity> {

List<ChannelEntity> findByIdIn(Collection<Long> ids);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.oceanbase.odc.metadb.notification;

import java.util.Collection;
import java.util.List;
import java.util.Optional;

Expand Down Expand Up @@ -50,5 +51,10 @@ public interface EventRepository extends JpaRepository<EventEntity, Long>,
nativeQuery = true)
List<EventEntity> findNByStatusForUpdate(@Param("status") EventStatus status, @Param("limit") Integer limit);

@Transactional
@Modifying
@Query(value = "update notification_event set `status`=:#{#status.name()} where `id` in (:ids)", nativeQuery = true)
int updateStatusByIds(@Param("status") EventStatus status, @Param("ids") Collection<Long> ids);


}
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,5 @@ int updateStatusById(@Param("id") Long id,
nativeQuery = true)
List<MessageEntity> findNByStatusForUpdate(@Param("status") MessageSendingStatus status,
@Param("limit") Integer limit);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
*/
package com.oceanbase.odc.metadb.notification;

import java.util.Collection;
import java.util.List;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

public interface NotificationPolicyChannelRelationRepository
extends JpaRepository<NotificationChannelRelationEntity, Long>,
Expand All @@ -27,4 +30,8 @@ public interface NotificationPolicyChannelRelationRepository

List<NotificationChannelRelationEntity> findByOrganizationIdAndNotificationPolicyId(Long organizationId,
Long notificationPolicyId);

@Query(value = "select * from notification_policy_channel_relation where notification_policy_id in (:ids)",
nativeQuery = true)
List<NotificationChannelRelationEntity> findByNotificationPolicyIds(@Param("ids") Collection<Long> ids);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,24 @@
*/
package com.oceanbase.odc.metadb.notification;

import java.util.Collection;
import java.util.List;
import java.util.Optional;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.JpaSpecificationExecutor;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

public interface NotificationPolicyRepository extends JpaRepository<NotificationPolicyEntity, Long>,
JpaSpecificationExecutor<NotificationPolicyEntity> {
Optional<NotificationPolicyEntity> findByOrganizationIdAndMatchExpression(Long organizationId,
String matchExpression);

List<NotificationPolicyEntity> findByOrganizationId(Long organizationId);

@Query(value = "select * from notification_policy where organization_id in (:organizationIds)", nativeQuery = true)
List<NotificationPolicyEntity> findByOrganizationIds(@Param("organizationIds") Collection<Long> ids);

boolean existsByOrganizationIdAndMatchExpression(Long organizationId, String matchExpression);
}
Original file line number Diff line number Diff line change
Expand Up @@ -713,7 +713,8 @@ private ConnectionConfig internalGet(Long id) {
return connection;
}

private ConnectionConfig internalGetSkipUserCheck(Long id, boolean withEnvironment) {
@SkipAuthorize("odc internal usage")
public ConnectionConfig internalGetSkipUserCheck(Long id, boolean withEnvironment) {
ConnectionConfig config = entityToModel(getEntity(id), withEnvironment);
List<ConnectionAttributeEntity> entities = this.attributeRepository.findByConnectionId(config.getId());
config.setAttributes(attrEntitiesToMap(entities));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.springframework.beans.factory.annotation.Autowired;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.oceanbase.odc.common.util.SystemUtils;
import com.oceanbase.odc.core.flow.exception.BaseFlowException;
import com.oceanbase.odc.core.shared.Verify;
import com.oceanbase.odc.metadb.flow.ServiceTaskInstanceRepository;
Expand Down Expand Up @@ -246,46 +247,36 @@ protected boolean isTimeout() {
*/
protected void onFailure(Long taskId, TaskService taskService) {
if (notificationProperties.isEnabled()) {
TaskEntity taskEntity = taskService.detail(taskId);
ConnectionConfig connection = connectionService.getWithoutPermissionCheck(taskEntity.getConnectionId());
EventLabels labels = EventUtils.buildEventLabels(taskEntity.getTaskType(), "failed",
taskEntity.getConnectionId());
Map<String, String> extend = new HashMap<>();
extend.put(EventLabelKeys.VARIABLE_KEY_CLUSTER_NAME, connection.getClusterName());
extend.put(EventLabelKeys.VARIABLE_KEY_TENANT_NAME, connection.getTenantName());
labels.addLabels(extend);
broker.enqueueEvent(Event.builder()
.status(EventStatus.CREATED)
.creatorId(taskEntity.getCreatorId())
.organizationId(taskEntity.getOrganizationId())
.triggerTime(new Date(System.currentTimeMillis()))
.labels(labels)
.build());
try {
TaskEntity taskEntity = taskService.detail(taskId);
ConnectionConfig connection =
connectionService.internalGetSkipUserCheck(taskEntity.getConnectionId(), true);
EventLabels labels = EventUtils.buildEventLabels(taskEntity.getTaskType(), "failed",
taskEntity.getConnectionId());
Map<String, String> extend = new HashMap<>();
extend.put(EventLabelKeys.VARIABLE_KEY_CLUSTER_NAME, connection.getClusterName());
extend.put(EventLabelKeys.VARIABLE_KEY_TENANT_NAME, connection.getTenantName());
extend.put(EventLabelKeys.VARIABLE_KEY_TASK_ID, taskId + "");
extend.put(EventLabelKeys.VARIABLE_KEY_REGION, SystemUtils.getEnvOrProperty("OB_ARN_PARTITION"));
labels.addLabels(extend);
broker.enqueueEvent(Event.builder()
.status(EventStatus.CREATED)
.creatorId(taskEntity.getCreatorId())
.organizationId(taskEntity.getOrganizationId())
.triggerTime(new Date(System.currentTimeMillis()))
.labels(labels)
.build());
} catch (Exception e) {
log.warn("Failed to enqueue event.", e);
}
}

}

/**
* The callback method when the task is successful, used to update the status and other operations
*/
protected void onSuccessful(Long taskId, TaskService taskService) {
if (notificationProperties.isEnabled()) {
TaskEntity taskEntity = taskService.detail(taskId);
ConnectionConfig connection = connectionService.getWithoutPermissionCheck(taskEntity.getConnectionId());
EventLabels labels = EventUtils.buildEventLabels(taskEntity.getTaskType(), "succeed",
taskEntity.getConnectionId());
Map<String, String> extend = new HashMap<>();
extend.put(EventLabelKeys.VARIABLE_KEY_CLUSTER_NAME, connection.getClusterName());
extend.put(EventLabelKeys.VARIABLE_KEY_TENANT_NAME, connection.getTenantName());
labels.addLabels(extend);
broker.enqueueEvent(Event.builder()
.status(EventStatus.CREATED)
.creatorId(taskEntity.getCreatorId())
.organizationId(taskEntity.getOrganizationId())
.triggerTime(new Date(System.currentTimeMillis()))
.labels(labels)
.build());
}
}
protected void onSuccessful(Long taskId, TaskService taskService) {}

/**
* The callback method of the task execution timeout, which is used to update the status and other
Expand Down
Loading