Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix : BackgroundJob supports isolation #531

Merged
merged 7 commits into from
Jul 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<PropertyGroup>
<MasaFrameworkPackageVersion>1.0.0-rc.3.2</MasaFrameworkPackageVersion>
<MasaFrameworkPackageVersion>1.0.0-rc.3.4</MasaFrameworkPackageVersion>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the Apache License. See LICENSE.txt in the project root for license information.

namespace Masa.Mc.Contracts.Admin.Infrastructure;

public class MultiEnvironment
{
public string Environment { get; set; } = string.Empty;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.EventHandler;

public class ExecuteMessageTaskEventHandler
{
public ExecuteMessageTaskEventHandler()
private readonly IMultiEnvironmentContext _multiEnvironmentContext;

public ExecuteMessageTaskEventHandler(IMultiEnvironmentContext multiEnvironmentContext)
{
_multiEnvironmentContext = multiEnvironmentContext;
}

[EventHandler]
Expand All @@ -17,7 +20,8 @@ public async Task HandleEventAsync(ExecuteMessageTaskEvent eto)
MessageTaskId = eto.MessageTaskId,
IsTest = eto.IsTest,
JobId = eto.JobId,
TaskId = eto.TaskId
TaskId = eto.TaskId,
Environment = _multiEnvironmentContext.CurrentEnvironment
};

await BackgroundJobManager.EnqueueAsync(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.EventHandler;
public class ResolveMessageTaskEventHandler
{
private readonly IMessageTaskRepository _messageTaskRepository;
private readonly IMultiEnvironmentContext _multiEnvironmentContext;

public ResolveMessageTaskEventHandler(IMessageTaskRepository messageTaskRepository)
public ResolveMessageTaskEventHandler(IMessageTaskRepository messageTaskRepository, IMultiEnvironmentContext multiEnvironmentContext)
{
_messageTaskRepository = messageTaskRepository;
_multiEnvironmentContext = multiEnvironmentContext;
}

[EventHandler]
Expand All @@ -25,7 +27,8 @@ public async Task HandleEventAsync(ResolveMessageTaskEvent eto)
var args = new ResolveMessageTaskJobArgs()
{
MessageTaskId = eto.MessageTaskId,
OperatorId = eto.OperatorId
OperatorId = eto.OperatorId,
Environment = _multiEnvironmentContext.CurrentEnvironment
};

await BackgroundJobManager.EnqueueAsync(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,54 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ExecuteMessageTaskJob : BackgroundJobBase<ExecuteMessageTaskJobArgs>
{
private readonly IChannelRepository _channelRepository;
private readonly IMessageTaskRepository _messageTaskRepository;
private readonly IMessageTaskHistoryRepository _messageTaskHistoryRepository;
private readonly IDomainEventBus _eventBus;
private readonly MessageTaskDomainService _domainService;
private readonly IMessageTaskJobService _messageTaskJobService;
private readonly IUnitOfWork _unitOfWork;
private readonly IServiceProvider _serviceProvider;

public ExecuteMessageTaskJob(ILogger<BackgroundJobBase<ExecuteMessageTaskJobArgs>>? logger
,IChannelRepository channelRepository
, IMessageTaskRepository messageTaskRepository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
, IDomainEventBus eventBus
, MessageTaskDomainService domainService
, IMessageTaskJobService messageTaskJobService
, IUnitOfWork unitOfWork) : base(logger)
, IServiceProvider serviceProvider) : base(logger)
{
_channelRepository = channelRepository;
_messageTaskRepository = messageTaskRepository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
_eventBus = eventBus;
_domainService = domainService;
_messageTaskJobService = messageTaskJobService;
_unitOfWork = unitOfWork;
_serviceProvider = serviceProvider;
}

protected override async Task ExecutingAsync(ExecuteMessageTaskJobArgs args)
{
var history = await _messageTaskHistoryRepository.FindWaitSendAsync(args.MessageTaskId, args.IsTest);
await using var scope = _serviceProvider.CreateAsyncScope();
var multiEnvironmentSetter = scope.ServiceProvider.GetRequiredService<IMultiEnvironmentSetter>();
multiEnvironmentSetter.SetEnvironment(args.Environment);
var channelRepository = scope.ServiceProvider.GetRequiredService<IChannelRepository>();
var messageTaskRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskRepository>();
var messageTaskHistoryRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskHistoryRepository>();
var eventBus = scope.ServiceProvider.GetRequiredService<IDomainEventBus>();
var domainService = scope.ServiceProvider.GetRequiredService<MessageTaskDomainService>();
var messageTaskJobService = scope.ServiceProvider.GetRequiredService<IMessageTaskJobService>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();

var history = await messageTaskHistoryRepository.FindWaitSendAsync(args.MessageTaskId, args.IsTest);

if (history == null)
{
var messageTask = await _messageTaskRepository.FindAsync(x => x.Id == args.MessageTaskId, false);
var messageTask = await messageTaskRepository.FindAsync(x => x.Id == args.MessageTaskId, false);
if (messageTask == null) return;

Guid userId = Guid.Empty;
await _messageTaskJobService.DisableJobAsync(messageTask.SchedulerJobId, userId);
await messageTaskJobService.DisableJobAsync(messageTask.SchedulerJobId, userId);
return;
}
history.SetTaskId(args.TaskId);
var messageData = await _domainService.GetMessageDataAsync(history.MessageTask, history.MessageTask.Variables);
var messageData = await domainService.GetMessageDataAsync(history.MessageTask, history.MessageTask.Variables);
history.SetSending();

if (!args.IsTest && !history.MessageTask.SendTime.HasValue)
{
history.MessageTask.SetSending();
}

await _messageTaskHistoryRepository.UpdateAsync(history);
await _unitOfWork.SaveChangesAsync();
await _unitOfWork.CommitAsync();
await messageTaskHistoryRepository.UpdateAsync(history);
await unitOfWork.SaveChangesAsync();
await unitOfWork.CommitAsync();

var channel = await _channelRepository.FindAsync(x => x.Id == history.MessageTask.ChannelId);
var channel = await channelRepository.FindAsync(x => x.Id == history.MessageTask.ChannelId);

var eto = channel.Type.GetSendMessageEvent(history.MessageTask.ChannelId.Value, messageData, history);
await _eventBus.PublishAsync(eto);
await eventBus.PublishAsync(eto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ExecuteMessageTaskJobArgs
public class ExecuteMessageTaskJobArgs : MultiEnvironment
{
public Guid MessageTaskId { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,43 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResendMessageTaskJob : BackgroundJobBase<ResendMessageTaskJobArgs>
{
private readonly IMessageRecordRepository _messageRecordRepository;
private readonly IMessageTaskHistoryRepository _messageTaskHistoryRepository;
private readonly IEventBus _eventBus;
private readonly IUnitOfWork _unitOfWork;
private readonly IServiceProvider _serviceProvider;

public ResendMessageTaskJob(ILogger<BackgroundJobBase<ResendMessageTaskJobArgs>>? logger
, IMessageRecordRepository messageRecordRepository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
, IEventBus eventBus
, IUnitOfWork unitOfWork) : base(logger)
, IServiceProvider serviceProvider) : base(logger)
{
_eventBus = eventBus;
_unitOfWork = unitOfWork;
_messageRecordRepository = messageRecordRepository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
_serviceProvider = serviceProvider;
}

protected override async Task ExecutingAsync(ResendMessageTaskJobArgs args)
{
var records = await (await _messageRecordRepository.WithDetailsAsync()).Where(x => x.MessageTaskId == args.MessageTaskId && x.Success == false).ToListAsync();
await using var scope = _serviceProvider.CreateAsyncScope();
var multiEnvironmentSetter = scope.ServiceProvider.GetRequiredService<IMultiEnvironmentSetter>();
multiEnvironmentSetter.SetEnvironment(args.Environment);
var messageRecordRepository = scope.ServiceProvider.GetRequiredService<IMessageRecordRepository>();
var messageTaskHistoryRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskHistoryRepository>();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();

var records = await (await messageRecordRepository.WithDetailsAsync()).Where(x => x.MessageTaskId == args.MessageTaskId && x.Success == false).ToListAsync();

foreach (var item in records)
{
var eto = item.Channel.Type.GetRetryMessageEvent(item.Id);
await _eventBus.PublishAsync(eto);
await eventBus.PublishAsync(eto);
}

await _unitOfWork.SaveChangesAsync();
await unitOfWork.SaveChangesAsync();

var historys = await _messageTaskHistoryRepository.GetListAsync(x => x.MessageTaskId == args.MessageTaskId);
var historys = await messageTaskHistoryRepository.GetListAsync(x => x.MessageTaskId == args.MessageTaskId);
foreach (var item in historys)
{
await _eventBus.PublishAsync(new UpdateMessageTaskHistoryStatusEvent(item.Id));
await eventBus.PublishAsync(new UpdateMessageTaskHistoryStatusEvent(item.Id));
}

await _unitOfWork.SaveChangesAsync();
await unitOfWork.SaveChangesAsync();

await _eventBus.PublishAsync(new UpdateMessageTaskStatusEvent(args.MessageTaskId));
await eventBus.PublishAsync(new UpdateMessageTaskStatusEvent(args.MessageTaskId));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResendMessageTaskJobArgs
public class ResendMessageTaskJobArgs : MultiEnvironment
{
public Guid MessageTaskId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,36 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResolveMessageTaskJob : BackgroundJobBase<ResolveMessageTaskJobArgs>
{
private readonly IChannelUserFinder _channelUserFinder;
private readonly IMessageTaskRepository _messageTaskRepository;
private readonly IMessageTaskHistoryRepository _messageTaskHistoryRepository;
private readonly IDomainEventBus _eventBus;
private readonly IUserContext _userContext;
private readonly IMessageTaskJobService _messageTaskJobService;
private readonly IUnitOfWork _unitOfWork;
private readonly IServiceProvider _serviceProvider;


public ResolveMessageTaskJob(ILogger<BackgroundJobBase<ResolveMessageTaskJobArgs>>? logger
, IChannelUserFinder channelUserFinder
, IMessageTaskRepository messageTaskRepository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
, IDomainEventBus eventBus
, IUserContext userContext
, IMessageTaskJobService messageTaskJobService
, IUnitOfWork unitOfWork) : base(logger)
, IServiceProvider serviceProvider) : base(logger)
{
_channelUserFinder = channelUserFinder;
_messageTaskRepository = messageTaskRepository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
_eventBus = eventBus;
_userContext = userContext;
_messageTaskJobService = messageTaskJobService;
_unitOfWork = unitOfWork;
_serviceProvider = serviceProvider;
}

protected override async Task ExecutingAsync(ResolveMessageTaskJobArgs args)
{
var messageTask = (await _messageTaskRepository.WithDetailsAsync()).FirstOrDefault(x => x.Id == args.MessageTaskId);
await using var scope = _serviceProvider.CreateAsyncScope();
var multiEnvironmentSetter = scope.ServiceProvider.GetRequiredService<IMultiEnvironmentSetter>();
multiEnvironmentSetter.SetEnvironment(args.Environment);
var channelUserFinder = scope.ServiceProvider.GetRequiredService<IChannelUserFinder>();
var messageTaskRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskRepository>();
var messageTaskHistoryRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskHistoryRepository>();
var messageTaskJobService = scope.ServiceProvider.GetRequiredService<IMessageTaskJobService>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
var userContext = scope.ServiceProvider.GetRequiredService<IUserContext>();

var messageTask = (await messageTaskRepository.WithDetailsAsync()).FirstOrDefault(x => x.Id == args.MessageTaskId);

if (messageTask == null || messageTask.ReceiverType == ReceiverTypes.Broadcast)
return;

var receiverUsers = await _channelUserFinder.GetReceiverUsersAsync(messageTask.Channel, messageTask.Variables, messageTask.Receivers);
var receiverUsers = await channelUserFinder.GetReceiverUsersAsync(messageTask.Channel, messageTask.Variables, messageTask.Receivers);
messageTask.SetReceiverUsers(receiverUsers.ToList());

await _messageTaskHistoryRepository.RemoveAsync(x => x.MessageTaskId == args.MessageTaskId);
await messageTaskHistoryRepository.RemoveAsync(x => x.MessageTaskId == args.MessageTaskId);

var sendTime = DateTimeOffset.Now;
if (messageTask.SendRules.IsCustom)
Expand All @@ -60,35 +53,35 @@ protected override async Task ExecutingAsync(ResolveMessageTaskJobArgs args)
sendTime = nextExcuteTime.Value;
var historyReceiverUsers = messageTask.GetHistoryReceiverUsers(i, sendingCount);
var history = new MessageTaskHistory(messageTask.Id, historyReceiverUsers, false, sendTime);
await _messageTaskHistoryRepository.AddAsync(history);
await messageTaskHistoryRepository.AddAsync(history);
}
}
}
else
{
var history = new MessageTaskHistory(messageTask.Id, messageTask.ReceiverUsers, false, sendTime);
history.ExecuteTask();
await _messageTaskRepository.UpdateAsync(messageTask);
await _messageTaskHistoryRepository.AddAsync(history);
await _unitOfWork.SaveChangesAsync();
await _unitOfWork.CommitAsync();
await messageTaskRepository.UpdateAsync(messageTask);
await messageTaskHistoryRepository.AddAsync(history);
await unitOfWork.SaveChangesAsync();
await unitOfWork.CommitAsync();

return;
}

var userId = _userContext.GetUserId<Guid>();
var userId = userContext.GetUserId<Guid>();
var operatorId = userId == default ? args.OperatorId : userId;

var jobId = await _messageTaskJobService.RegisterJobAsync(messageTask.SchedulerJobId, args.MessageTaskId, messageTask.SendRules.CronExpression, operatorId, messageTask.DisplayName);
var jobId = await messageTaskJobService.RegisterJobAsync(messageTask.SchedulerJobId, args.MessageTaskId, messageTask.SendRules.CronExpression, operatorId, messageTask.DisplayName);
messageTask.SetJobId(jobId);

await _messageTaskRepository.UpdateAsync(messageTask);
await _unitOfWork.SaveChangesAsync();
await _unitOfWork.CommitAsync();
await messageTaskRepository.UpdateAsync(messageTask);
await unitOfWork.SaveChangesAsync();
await unitOfWork.CommitAsync();

if (string.IsNullOrEmpty(messageTask.SendRules.CronExpression) && jobId != default)
{
await _messageTaskJobService.StartJobAsync(jobId, operatorId);
await messageTaskJobService.StartJobAsync(jobId, operatorId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResolveMessageTaskJobArgs
public class ResolveMessageTaskJobArgs : MultiEnvironment
{
public Guid MessageTaskId { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MessageTaskCommandHandler
private readonly IMessageTemplateRepository _messageTemplateRepository;
private readonly IUnitOfWork _unitOfWork;
private readonly II18n<DefaultResource> _i18n;
private readonly IMultiEnvironmentContext _multiEnvironmentContext;

public MessageTaskCommandHandler(IMessageTaskRepository repository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
Expand All @@ -23,7 +24,8 @@ public MessageTaskCommandHandler(IMessageTaskRepository repository
, IChannelRepository channelRepository
, IMessageTemplateRepository messageTemplateRepository
, IUnitOfWork unitOfWork
, II18n<DefaultResource> i18n)
, II18n<DefaultResource> i18n
, IMultiEnvironmentContext multiEnvironmentContext)
{
_repository = repository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
Expand All @@ -34,6 +36,7 @@ public MessageTaskCommandHandler(IMessageTaskRepository repository
_messageTemplateRepository = messageTemplateRepository;
_unitOfWork = unitOfWork;
_i18n = i18n;
_multiEnvironmentContext = multiEnvironmentContext;
}

[EventHandler]
Expand Down Expand Up @@ -189,7 +192,8 @@ public async Task ResendAsync(ResendMessageTaskCommand command)
{
var args = new ResendMessageTaskJobArgs()
{
MessageTaskId = command.MessageTaskId
MessageTaskId = command.MessageTaskId,
Environment = _multiEnvironmentContext.CurrentEnvironment
};

await BackgroundJobManager.EnqueueAsync(args);
Expand Down
Loading