Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(notification): support send notification when schedule job failed #711

Merged
merged 31 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
1aff355
fix NPE when no recipients configured
LuckyPickleZZ Nov 3, 2023
0446dbe
add NotificationPolicyFilter to match expression
LuckyPickleZZ Nov 3, 2023
4defcae
enqueue event when job failed
LuckyPickleZZ Nov 3, 2023
d5e1c04
fix message would be resent on exception
LuckyPickleZZ Nov 3, 2023
9d7dc1b
remove unused imports
LuckyPickleZZ Nov 3, 2023
9deaeba
restore debug
LuckyPickleZZ Nov 3, 2023
b65b8f6
add region into message
LuckyPickleZZ Nov 3, 2023
e7ea78c
add UT
LuckyPickleZZ Nov 3, 2023
aa841df
format code
LuckyPickleZZ Nov 3, 2023
ec4cdd0
fix UT failure
LuckyPickleZZ Nov 6, 2023
4672d2c
Merge branch 'dev/4.2.2' into wenmu_422_feat_schedule_failed_notifica…
LuckyPickleZZ Nov 7, 2023
4f09cdc
resolve conflicts
LuckyPickleZZ Nov 7, 2023
b366b16
fix UT
LuckyPickleZZ Nov 7, 2023
899043f
format code
LuckyPickleZZ Nov 7, 2023
261ac6e
a failed notification shouldn't block others
LuckyPickleZZ Nov 8, 2023
50a99c2
optimize performance
LuckyPickleZZ Nov 17, 2023
016667a
format code
LuckyPickleZZ Nov 17, 2023
6cd0f00
response to CR
LuckyPickleZZ Nov 17, 2023
41c2eca
use NotificationScheduleConfiguration to replace NotificationSchedules
LuckyPickleZZ Nov 17, 2023
13f84e0
fix JPA error
LuckyPickleZZ Nov 17, 2023
fe19c93
fix UT
LuckyPickleZZ Nov 18, 2023
7c16975
format code
LuckyPickleZZ Nov 18, 2023
5d1419c
format code
LuckyPickleZZ Nov 18, 2023
2ae7d82
ignore UT
LuckyPickleZZ Nov 18, 2023
245dc74
Merge branch 'dev/4.2.2' into wenmu_422_feat_schedule_failed_notifica…
LuckyPickleZZ Nov 18, 2023
757f4a3
fix
LuckyPickleZZ Nov 19, 2023
cc08238
rename script
LuckyPickleZZ Nov 20, 2023
9f8edd4
remove async
LuckyPickleZZ Nov 20, 2023
e42b8d6
add `taskId` into labels
LuckyPickleZZ Nov 20, 2023
ea0bdfc
add `region` into labels
LuckyPickleZZ Nov 20, 2023
21e9eef
remove unused code
LuckyPickleZZ Nov 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public void tearDown() {
public void testDispatch_SendFailed() {
MessageEntity entity = messageRepository.save(getMessage().toEntity());

dispatcher.dispatch(Arrays.asList(getNotification(entity)));
dispatcher.dispatch(getNotification(entity));

Assert.assertEquals(MessageSendingStatus.SENT_FAILED, messageRepository.findAll().get(0).getStatus());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.odc.service.notification;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.junit.Assert;
import org.junit.Test;

import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity;
import com.oceanbase.odc.service.notification.helper.NotificationPolicyFilter;
import com.oceanbase.odc.service.notification.model.EventLabels;

public class NotificationFilterTest {

@Test
public void testFilterNotificationPolicies_Succeed() {
EventLabels labels = new EventLabels();
labels.put("action", "failed");

List<NotificationPolicyEntity> filtered = NotificationPolicyFilter.filter(labels, getPolicies());
Assert.assertEquals(1, filtered.size());
}

@Test
public void testFilterNotificationPolicies_Fail() {
EventLabels labels = new EventLabels();
labels.put("action", "succeed");

List<NotificationPolicyEntity> filtered = NotificationPolicyFilter.filter(labels, getPolicies());
Assert.assertEquals(0, filtered.size());
}

List<NotificationPolicyEntity> getPolicies() {
Map<String, String> conditions = new HashMap<>();
conditions.put("action", "failed");
NotificationPolicyEntity policy = new NotificationPolicyEntity();
policy.setMatchExpression(JsonUtils.toJson(conditions));
return Collections.singletonList(policy);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.oceanbase.odc.metadb.notification;

import java.util.List;
import java.util.Optional;

import org.springframework.data.jpa.repository.JpaRepository;
Expand All @@ -25,5 +26,7 @@ public interface NotificationPolicyRepository extends JpaRepository<Notification
Optional<NotificationPolicyEntity> findByOrganizationIdAndMatchExpression(Long organizationId,
String matchExpression);

List<NotificationPolicyEntity> findByOrganizationId(Long organizationId);

boolean existsByOrganizationIdAndMatchExpression(Long organizationId, String matchExpression);
}
LuckyPickleZZ marked this conversation as resolved.
Show resolved Hide resolved
LuckyPickleZZ marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ public void enqueueEvent(Event event) {
}


@Transactional(rollbackFor = Exception.class)
public void dequeueNotification(MessageSendingStatus status) {
List<Notification> notifications =
notificationQueue.peek(notificationProperties.getNotificationDequeueBatchSize(), status);
notificationDispatcher.dispatch(notifications);
for (Notification notification : notifications) {
notificationDispatcher.dispatch(notification);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,10 @@
package com.oceanbase.odc.service.notification;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;

import org.apache.commons.collections.ListUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
Expand All @@ -32,8 +31,8 @@
import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity;
import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository;
import com.oceanbase.odc.service.notification.helper.ChannelMapper;
import com.oceanbase.odc.service.notification.helper.EventUtils;
import com.oceanbase.odc.service.notification.helper.MessageTemplateProcessor;
import com.oceanbase.odc.service.notification.helper.NotificationPolicyFilter;
import com.oceanbase.odc.service.notification.model.ChannelConfig;
import com.oceanbase.odc.service.notification.model.Event;
import com.oceanbase.odc.service.notification.model.Message;
Expand Down Expand Up @@ -65,49 +64,49 @@ public class Converter {

public List<Notification> convert(List<Event> events) {
if (CollectionUtils.isEmpty(events)) {
return ListUtils.EMPTY_LIST;
return Collections.emptyList();
}
List<Notification> notifications = new ArrayList<>();
for (Event event : events) {
Optional<NotificationPolicyEntity> policyOpt =
notificationPolicyRepository.findByOrganizationIdAndMatchExpression(event.getOrganizationId(),
EventUtils.generateMatchExpression(event.getLabels()));
if (!policyOpt.isPresent()) {
return null;
List<NotificationPolicyEntity> policies = NotificationPolicyFilter.filter(event.getLabels(),
notificationPolicyRepository.findByOrganizationId(event.getOrganizationId()));
if (policies.isEmpty()) {
continue;
}
NotificationPolicyEntity policy = policyOpt.get();
List<NotificationChannelRelationEntity> policyChannelEntity =
policyChannelRepository.findByOrganizationIdAndNotificationPolicyId(event.getOrganizationId(),
policy.getId());
List<ChannelConfig> channels = channelRepository.findAllById(
policyChannelEntity.stream()
.map(NotificationChannelRelationEntity::getChannelId)
.collect(Collectors.toSet()))
.stream().map(entity -> channelMapper.fromEntity(entity)).collect(Collectors.toList());
for (NotificationPolicyEntity policy : policies) {
List<NotificationChannelRelationEntity> policyChannelEntity =
policyChannelRepository.findByOrganizationIdAndNotificationPolicyId(event.getOrganizationId(),
policy.getId());
List<ChannelConfig> channels = channelRepository.findAllById(
policyChannelEntity.stream()
.map(NotificationChannelRelationEntity::getChannelId)
.collect(Collectors.toSet()))
.stream().map(entity -> channelMapper.fromEntity(entity)).collect(Collectors.toList());

if (CollectionUtils.isEmpty(channels)) {
return null;
}
channels.stream().forEach(channel -> {
Notification notification = new Notification();
if (CollectionUtils.isEmpty(channels)) {
return null;
}
channels.forEach(channel -> {
Notification notification = new Notification();

Message message = new Message();
message.setTitle(
MessageTemplateProcessor.replaceVariables(policy.getTitleTemplate(), event.getLabels()));
message.setContent(
MessageTemplateProcessor.replaceVariables(policy.getContentTemplate(), event.getLabels()));
message.setOrganizationId(policy.getOrganizationId());
message.setCreatorId(event.getCreatorId());
message.setEventId(event.getId());
message.setChannelId(channel.getId());
message.setStatus(MessageSendingStatus.CREATED);
message.setRetryTimes(0);
message.setMaxRetryTimes(notificationProperties.getMaxResendTimes());
message.setToRecipients(policy.getToRecipients());
message.setCcRecipients(policy.getCcRecipients());
notification.setMessage(message);
notifications.add(notification);
});
Message message = new Message();
message.setTitle(
MessageTemplateProcessor.replaceVariables(policy.getTitleTemplate(), event.getLabels()));
message.setContent(
MessageTemplateProcessor.replaceVariables(policy.getContentTemplate(), event.getLabels()));
message.setOrganizationId(policy.getOrganizationId());
message.setCreatorId(event.getCreatorId());
message.setEventId(event.getId());
message.setChannelId(channel.getId());
message.setStatus(MessageSendingStatus.CREATED);
message.setRetryTimes(0);
message.setMaxRetryTimes(notificationProperties.getMaxResendTimes());
message.setToRecipients(policy.getToRecipients());
message.setCcRecipients(policy.getCcRecipients());
notification.setMessage(message);
notifications.add(notification);
});
}
}
return notifications;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

import java.util.stream.Collectors;

import org.apache.commons.collections4.CollectionUtils;

import com.dingtalk.api.DefaultDingTalkClient;
import com.dingtalk.api.DingTalkClient;
import com.dingtalk.api.request.OapiRobotSendRequest;
Expand Down Expand Up @@ -61,13 +63,14 @@ public boolean send(Message message) {
DingTalkClient client = new DefaultDingTalkClient(this.serviceUri);
OapiRobotSendRequest request = new OapiRobotSendRequest();
OapiRobotSendRequest.At at = new OapiRobotSendRequest.At();
at.setAtUserIds(userRepository
.findByIdIn(message.getToRecipients().stream().map(recipient -> Long.valueOf(recipient))
.collect(Collectors.toSet()))
.stream().map(
userEntity -> ChannelUtils.getRecipientAttributeName(this.atUserIdsAttributeName,
userEntity))
.collect(Collectors.toList()));
if (CollectionUtils.isNotEmpty(message.getToRecipients())) {
at.setAtUserIds(userRepository.findByIdIn(message.getToRecipients()
.stream()
.map(Long::valueOf).collect(Collectors.toSet()))
.stream()
.map(userEntity -> ChannelUtils.getRecipientAttributeName(this.atUserIdsAttributeName, userEntity))
.collect(Collectors.toList()));
}
at.setIsAtAll(this.atAll);
request.setAt(at);
request.setMsgtype("text");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
package com.oceanbase.odc.service.notification;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.commons.collections.ListUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -28,7 +28,7 @@
import com.oceanbase.odc.metadb.notification.EventRepository;
import com.oceanbase.odc.metadb.notification.NotificationPolicyRepository;
import com.oceanbase.odc.service.notification.helper.EventMapper;
import com.oceanbase.odc.service.notification.helper.EventUtils;
import com.oceanbase.odc.service.notification.helper.NotificationPolicyFilter;
import com.oceanbase.odc.service.notification.model.Event;
import com.oceanbase.odc.service.notification.model.EventStatus;

Expand All @@ -53,17 +53,16 @@ public class EventFilter {
public List<Event> filter(List<Event> events) {
List<Event> filtered = new ArrayList<>();
if (CollectionUtils.isEmpty(events)) {
return ListUtils.EMPTY_LIST;
return Collections.emptyList();
}
for (Event event : events) {
String matchExpression = EventUtils.generateMatchExpression(event.getLabels());
EventStatus status;
if (notificationPolicyRepository.existsByOrganizationIdAndMatchExpression(event.getOrganizationId(),
matchExpression)) {
if (NotificationPolicyFilter.filter(event.getLabels(),
notificationPolicyRepository.findByOrganizationId(event.getOrganizationId())).isEmpty()) {
status = EventStatus.THROWN;
} else {
status = EventStatus.CONVERTED;
filtered.add(event);
} else {
status = EventStatus.THROWN;
}
event.setStatus(status);
eventRepository.updateStatusById(event.getId(), status);
LuckyPickleZZ marked this conversation as resolved.
Show resolved Hide resolved
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.commons.collections.ListUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.CollectionUtils;

import com.oceanbase.odc.core.authority.util.SkipAuthorize;
Expand Down Expand Up @@ -67,6 +68,7 @@ public boolean offer(List<Notification> notifications) {
}

@Override
@Transactional(rollbackFor = Exception.class)
public List<Notification> peek(int batchSize, MessageSendingStatus status) {
List<Notification> notifications = new ArrayList<>();
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
*/
package com.oceanbase.odc.service.notification;

import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
Expand All @@ -27,8 +25,6 @@
import com.oceanbase.odc.service.notification.model.MessageSendingStatus;
import com.oceanbase.odc.service.notification.model.Notification;

import lombok.NonNull;

/**
* @Author: Lebie
* @Date: 2023/3/20 14:45
Expand All @@ -44,17 +40,15 @@ public class NotificationDispatcher {
private ChannelFactory channelFactory;

@Transactional(rollbackFor = Exception.class)
LuckyPickleZZ marked this conversation as resolved.
Show resolved Hide resolved
public void dispatch(@NonNull List<Notification> notifications) {
notifications.stream().forEach(notification -> {
ChannelConfig channelConfig = notification.getChannel();
Channel channel = channelFactory.generate(channelConfig);
if (channel.send(notification.getMessage())) {
messageRepository.updateStatusById(notification.getMessage().getId(),
MessageSendingStatus.SENT_SUCCESSFULLY);
} else {
messageRepository.updateStatusAndRetryTimesById(notification.getMessage().getId(),
MessageSendingStatus.SENT_FAILED);
}
});
public void dispatch(Notification notification) {
ChannelConfig channelConfig = notification.getChannel();
Channel channel = channelFactory.generate(channelConfig);
if (channel.send(notification.getMessage())) {
messageRepository.updateStatusById(notification.getMessage().getId(),
MessageSendingStatus.SENT_SUCCESSFULLY);
} else {
messageRepository.updateStatusAndRetryTimesById(notification.getMessage().getId(),
MessageSendingStatus.SENT_FAILED);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Copyright (c) 2023 OceanBase.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.oceanbase.odc.service.notification.helper;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;

import com.oceanbase.odc.common.json.JsonUtils;
import com.oceanbase.odc.common.util.StringUtils;
import com.oceanbase.odc.metadb.notification.NotificationPolicyEntity;
import com.oceanbase.odc.service.notification.model.EventLabels;

public class NotificationPolicyFilter {

public static List<NotificationPolicyEntity> filter(EventLabels labels, List<NotificationPolicyEntity> policies) {
if (CollectionUtils.isEmpty(policies) || MapUtils.isEmpty(labels)) {
return policies;
}
List<NotificationPolicyEntity> filtered = new ArrayList<>();
for (NotificationPolicyEntity policy : policies) {
Map<String, String> conditions =
JsonUtils.fromJsonMap(policy.getMatchExpression(), String.class, String.class);
if (conditions.entrySet().stream().allMatch(entry -> labels.containsKey(entry.getKey())
&& StringUtils.equals(entry.getValue(), labels.get(entry.getKey())))) {
filtered.add(policy);
}
}
return filtered;
}

}
Loading
Loading