Skip to content

Commit

Permalink
fix : BackgroundJob supports isolation
Browse files Browse the repository at this point in the history
  • Loading branch information
wzh425 committed Jul 6, 2023
1 parent 524afdf commit a5ec2fa
Show file tree
Hide file tree
Showing 16 changed files with 112 additions and 99 deletions.
2 changes: 1 addition & 1 deletion Directory.Build.props
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
<Project>
<PropertyGroup>
<MasaFrameworkPackageVersion>1.0.0-rc.3.2</MasaFrameworkPackageVersion>
<MasaFrameworkPackageVersion>1.0.0-rc.3.4</MasaFrameworkPackageVersion>
</PropertyGroup>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) MASA Stack All rights reserved.
// Licensed under the Apache License. See LICENSE.txt in the project root for license information.

namespace Masa.Mc.Contracts.Admin.Infrastructure;

public class MultiEnvironment
{
public string Environment { get; set; } = string.Empty;
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,11 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.EventHandler;

public class ExecuteMessageTaskEventHandler
{
public ExecuteMessageTaskEventHandler()
private readonly IMultiEnvironmentContext _multiEnvironmentContext;

public ExecuteMessageTaskEventHandler(IMultiEnvironmentContext multiEnvironmentContext)
{
_multiEnvironmentContext = multiEnvironmentContext;
}

[EventHandler]
Expand All @@ -17,7 +20,8 @@ public async Task HandleEventAsync(ExecuteMessageTaskEvent eto)
MessageTaskId = eto.MessageTaskId,
IsTest = eto.IsTest,
JobId = eto.JobId,
TaskId = eto.TaskId
TaskId = eto.TaskId,
Environment = _multiEnvironmentContext.CurrentEnvironment
};

await BackgroundJobManager.EnqueueAsync(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.EventHandler;
public class ResolveMessageTaskEventHandler
{
private readonly IMessageTaskRepository _messageTaskRepository;
private readonly IMultiEnvironmentContext _multiEnvironmentContext;

public ResolveMessageTaskEventHandler(IMessageTaskRepository messageTaskRepository)
public ResolveMessageTaskEventHandler(IMessageTaskRepository messageTaskRepository, IMultiEnvironmentContext multiEnvironmentContext)
{
_messageTaskRepository = messageTaskRepository;
_multiEnvironmentContext = multiEnvironmentContext;
}

[EventHandler]
Expand All @@ -25,7 +27,8 @@ public async Task HandleEventAsync(ResolveMessageTaskEvent eto)
var args = new ResolveMessageTaskJobArgs()
{
MessageTaskId = eto.MessageTaskId,
OperatorId = eto.OperatorId
OperatorId = eto.OperatorId,
Environment = _multiEnvironmentContext.CurrentEnvironment
};

await BackgroundJobManager.EnqueueAsync(args);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,61 +5,54 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ExecuteMessageTaskJob : BackgroundJobBase<ExecuteMessageTaskJobArgs>
{
private readonly IChannelRepository _channelRepository;
private readonly IMessageTaskRepository _messageTaskRepository;
private readonly IMessageTaskHistoryRepository _messageTaskHistoryRepository;
private readonly IDomainEventBus _eventBus;
private readonly MessageTaskDomainService _domainService;
private readonly IMessageTaskJobService _messageTaskJobService;
private readonly IUnitOfWork _unitOfWork;
private readonly IServiceProvider _serviceProvider;

public ExecuteMessageTaskJob(ILogger<BackgroundJobBase<ExecuteMessageTaskJobArgs>>? logger
,IChannelRepository channelRepository
, IMessageTaskRepository messageTaskRepository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
, IDomainEventBus eventBus
, MessageTaskDomainService domainService
, IMessageTaskJobService messageTaskJobService
, IUnitOfWork unitOfWork) : base(logger)
, IServiceProvider serviceProvider) : base(logger)
{
_channelRepository = channelRepository;
_messageTaskRepository = messageTaskRepository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
_eventBus = eventBus;
_domainService = domainService;
_messageTaskJobService = messageTaskJobService;
_unitOfWork = unitOfWork;
_serviceProvider = serviceProvider;
}

protected override async Task ExecutingAsync(ExecuteMessageTaskJobArgs args)
{
var history = await _messageTaskHistoryRepository.FindWaitSendAsync(args.MessageTaskId, args.IsTest);
await using var scope = _serviceProvider.CreateAsyncScope();
var multiEnvironmentSetter = scope.ServiceProvider.GetRequiredService<IMultiEnvironmentSetter>();
multiEnvironmentSetter.SetEnvironment(args.Environment);
var channelRepository = scope.ServiceProvider.GetRequiredService<IChannelRepository>();
var messageTaskRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskRepository>();
var messageTaskHistoryRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskHistoryRepository>();
var eventBus = scope.ServiceProvider.GetRequiredService<IDomainEventBus>();
var domainService = scope.ServiceProvider.GetRequiredService<MessageTaskDomainService>();
var messageTaskJobService = scope.ServiceProvider.GetRequiredService<IMessageTaskJobService>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();

var history = await messageTaskHistoryRepository.FindWaitSendAsync(args.MessageTaskId, args.IsTest);

if (history == null)
{
var messageTask = await _messageTaskRepository.FindAsync(x => x.Id == args.MessageTaskId, false);
var messageTask = await messageTaskRepository.FindAsync(x => x.Id == args.MessageTaskId, false);
if (messageTask == null) return;

Guid userId = Guid.Empty;
await _messageTaskJobService.DisableJobAsync(messageTask.SchedulerJobId, userId);
await messageTaskJobService.DisableJobAsync(messageTask.SchedulerJobId, userId);
return;
}
history.SetTaskId(args.TaskId);
var messageData = await _domainService.GetMessageDataAsync(history.MessageTask, history.MessageTask.Variables);
var messageData = await domainService.GetMessageDataAsync(history.MessageTask, history.MessageTask.Variables);
history.SetSending();

if (!args.IsTest && !history.MessageTask.SendTime.HasValue)
{
history.MessageTask.SetSending();
}

await _messageTaskHistoryRepository.UpdateAsync(history);
await _unitOfWork.SaveChangesAsync();
await _unitOfWork.CommitAsync();
await messageTaskHistoryRepository.UpdateAsync(history);
await unitOfWork.SaveChangesAsync();
await unitOfWork.CommitAsync();

var channel = await _channelRepository.FindAsync(x => x.Id == history.MessageTask.ChannelId);
var channel = await channelRepository.FindAsync(x => x.Id == history.MessageTask.ChannelId);

var eto = channel.Type.GetSendMessageEvent(history.MessageTask.ChannelId.Value, messageData, history);
await _eventBus.PublishAsync(eto);
await eventBus.PublishAsync(eto);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ExecuteMessageTaskJobArgs
public class ExecuteMessageTaskJobArgs : MultiEnvironment
{
public Guid MessageTaskId { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,43 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResendMessageTaskJob : BackgroundJobBase<ResendMessageTaskJobArgs>
{
private readonly IMessageRecordRepository _messageRecordRepository;
private readonly IMessageTaskHistoryRepository _messageTaskHistoryRepository;
private readonly IEventBus _eventBus;
private readonly IUnitOfWork _unitOfWork;
private readonly IServiceProvider _serviceProvider;

public ResendMessageTaskJob(ILogger<BackgroundJobBase<ResendMessageTaskJobArgs>>? logger
, IMessageRecordRepository messageRecordRepository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
, IEventBus eventBus
, IUnitOfWork unitOfWork) : base(logger)
, IServiceProvider serviceProvider) : base(logger)
{
_eventBus = eventBus;
_unitOfWork = unitOfWork;
_messageRecordRepository = messageRecordRepository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
_serviceProvider = serviceProvider;
}

protected override async Task ExecutingAsync(ResendMessageTaskJobArgs args)
{
var records = await (await _messageRecordRepository.WithDetailsAsync()).Where(x => x.MessageTaskId == args.MessageTaskId && x.Success == false).ToListAsync();
await using var scope = _serviceProvider.CreateAsyncScope();
var multiEnvironmentSetter = scope.ServiceProvider.GetRequiredService<IMultiEnvironmentSetter>();
multiEnvironmentSetter.SetEnvironment(args.Environment);
var messageRecordRepository = scope.ServiceProvider.GetRequiredService<IMessageRecordRepository>();
var messageTaskHistoryRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskHistoryRepository>();
var eventBus = scope.ServiceProvider.GetRequiredService<IEventBus>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();

var records = await (await messageRecordRepository.WithDetailsAsync()).Where(x => x.MessageTaskId == args.MessageTaskId && x.Success == false).ToListAsync();

foreach (var item in records)
{
var eto = item.Channel.Type.GetRetryMessageEvent(item.Id);
await _eventBus.PublishAsync(eto);
await eventBus.PublishAsync(eto);
}

await _unitOfWork.SaveChangesAsync();
await unitOfWork.SaveChangesAsync();

var historys = await _messageTaskHistoryRepository.GetListAsync(x => x.MessageTaskId == args.MessageTaskId);
var historys = await messageTaskHistoryRepository.GetListAsync(x => x.MessageTaskId == args.MessageTaskId);
foreach (var item in historys)
{
await _eventBus.PublishAsync(new UpdateMessageTaskHistoryStatusEvent(item.Id));
await eventBus.PublishAsync(new UpdateMessageTaskHistoryStatusEvent(item.Id));
}

await _unitOfWork.SaveChangesAsync();
await unitOfWork.SaveChangesAsync();

await _eventBus.PublishAsync(new UpdateMessageTaskStatusEvent(args.MessageTaskId));
await eventBus.PublishAsync(new UpdateMessageTaskStatusEvent(args.MessageTaskId));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResendMessageTaskJobArgs
public class ResendMessageTaskJobArgs : MultiEnvironment
{
public Guid MessageTaskId { get; set; }
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,36 @@ namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResolveMessageTaskJob : BackgroundJobBase<ResolveMessageTaskJobArgs>
{
private readonly IChannelUserFinder _channelUserFinder;
private readonly IMessageTaskRepository _messageTaskRepository;
private readonly IMessageTaskHistoryRepository _messageTaskHistoryRepository;
private readonly IDomainEventBus _eventBus;
private readonly IUserContext _userContext;
private readonly IMessageTaskJobService _messageTaskJobService;
private readonly IUnitOfWork _unitOfWork;
private readonly IServiceProvider _serviceProvider;


public ResolveMessageTaskJob(ILogger<BackgroundJobBase<ResolveMessageTaskJobArgs>>? logger
, IChannelUserFinder channelUserFinder
, IMessageTaskRepository messageTaskRepository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
, IDomainEventBus eventBus
, IUserContext userContext
, IMessageTaskJobService messageTaskJobService
, IUnitOfWork unitOfWork) : base(logger)
, IServiceProvider serviceProvider) : base(logger)
{
_channelUserFinder = channelUserFinder;
_messageTaskRepository = messageTaskRepository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
_eventBus = eventBus;
_userContext = userContext;
_messageTaskJobService = messageTaskJobService;
_unitOfWork = unitOfWork;
_serviceProvider = serviceProvider;
}

protected override async Task ExecutingAsync(ResolveMessageTaskJobArgs args)
{
var messageTask = (await _messageTaskRepository.WithDetailsAsync()).FirstOrDefault(x => x.Id == args.MessageTaskId);
await using var scope = _serviceProvider.CreateAsyncScope();
var multiEnvironmentSetter = scope.ServiceProvider.GetRequiredService<IMultiEnvironmentSetter>();
multiEnvironmentSetter.SetEnvironment(args.Environment);
var channelUserFinder = scope.ServiceProvider.GetRequiredService<IChannelUserFinder>();
var messageTaskRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskRepository>();
var messageTaskHistoryRepository = scope.ServiceProvider.GetRequiredService<IMessageTaskHistoryRepository>();
var messageTaskJobService = scope.ServiceProvider.GetRequiredService<IMessageTaskJobService>();
var unitOfWork = scope.ServiceProvider.GetRequiredService<IUnitOfWork>();
var userContext = scope.ServiceProvider.GetRequiredService<IUserContext>();

var messageTask = (await messageTaskRepository.WithDetailsAsync()).FirstOrDefault(x => x.Id == args.MessageTaskId);

if (messageTask == null || messageTask.ReceiverType == ReceiverTypes.Broadcast)
return;

var receiverUsers = await _channelUserFinder.GetReceiverUsersAsync(messageTask.Channel, messageTask.Variables, messageTask.Receivers);
var receiverUsers = await channelUserFinder.GetReceiverUsersAsync(messageTask.Channel, messageTask.Variables, messageTask.Receivers);
messageTask.SetReceiverUsers(receiverUsers.ToList());

await _messageTaskHistoryRepository.RemoveAsync(x => x.MessageTaskId == args.MessageTaskId);
await messageTaskHistoryRepository.RemoveAsync(x => x.MessageTaskId == args.MessageTaskId);

var sendTime = DateTimeOffset.Now;
if (messageTask.SendRules.IsCustom)
Expand All @@ -60,35 +53,35 @@ protected override async Task ExecutingAsync(ResolveMessageTaskJobArgs args)
sendTime = nextExcuteTime.Value;
var historyReceiverUsers = messageTask.GetHistoryReceiverUsers(i, sendingCount);
var history = new MessageTaskHistory(messageTask.Id, historyReceiverUsers, false, sendTime);
await _messageTaskHistoryRepository.AddAsync(history);
await messageTaskHistoryRepository.AddAsync(history);
}
}
}
else
{
var history = new MessageTaskHistory(messageTask.Id, messageTask.ReceiverUsers, false, sendTime);
history.ExecuteTask();
await _messageTaskRepository.UpdateAsync(messageTask);
await _messageTaskHistoryRepository.AddAsync(history);
await _unitOfWork.SaveChangesAsync();
await _unitOfWork.CommitAsync();
await messageTaskRepository.UpdateAsync(messageTask);
await messageTaskHistoryRepository.AddAsync(history);
await unitOfWork.SaveChangesAsync();
await unitOfWork.CommitAsync();

return;
}

var userId = _userContext.GetUserId<Guid>();
var userId = userContext.GetUserId<Guid>();
var operatorId = userId == default ? args.OperatorId : userId;

var jobId = await _messageTaskJobService.RegisterJobAsync(messageTask.SchedulerJobId, args.MessageTaskId, messageTask.SendRules.CronExpression, operatorId, messageTask.DisplayName);
var jobId = await messageTaskJobService.RegisterJobAsync(messageTask.SchedulerJobId, args.MessageTaskId, messageTask.SendRules.CronExpression, operatorId, messageTask.DisplayName);
messageTask.SetJobId(jobId);

await _messageTaskRepository.UpdateAsync(messageTask);
await _unitOfWork.SaveChangesAsync();
await _unitOfWork.CommitAsync();
await messageTaskRepository.UpdateAsync(messageTask);
await unitOfWork.SaveChangesAsync();
await unitOfWork.CommitAsync();

if (string.IsNullOrEmpty(messageTask.SendRules.CronExpression) && jobId != default)
{
await _messageTaskJobService.StartJobAsync(jobId, operatorId);
await messageTaskJobService.StartJobAsync(jobId, operatorId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

namespace Masa.Mc.Service.Admin.Application.MessageTasks.Jobs;

public class ResolveMessageTaskJobArgs
public class ResolveMessageTaskJobArgs : MultiEnvironment
{
public Guid MessageTaskId { get; set; }

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ public class MessageTaskCommandHandler
private readonly IMessageTemplateRepository _messageTemplateRepository;
private readonly IUnitOfWork _unitOfWork;
private readonly II18n<DefaultResource> _i18n;
private readonly IMultiEnvironmentContext _multiEnvironmentContext;

public MessageTaskCommandHandler(IMessageTaskRepository repository
, IMessageTaskHistoryRepository messageTaskHistoryRepository
Expand All @@ -23,7 +24,8 @@ public MessageTaskCommandHandler(IMessageTaskRepository repository
, IChannelRepository channelRepository
, IMessageTemplateRepository messageTemplateRepository
, IUnitOfWork unitOfWork
, II18n<DefaultResource> i18n)
, II18n<DefaultResource> i18n
, IMultiEnvironmentContext multiEnvironmentContext)
{
_repository = repository;
_messageTaskHistoryRepository = messageTaskHistoryRepository;
Expand All @@ -34,6 +36,7 @@ public MessageTaskCommandHandler(IMessageTaskRepository repository
_messageTemplateRepository = messageTemplateRepository;
_unitOfWork = unitOfWork;
_i18n = i18n;
_multiEnvironmentContext = multiEnvironmentContext;
}

[EventHandler]
Expand Down Expand Up @@ -189,7 +192,8 @@ public async Task ResendAsync(ResendMessageTaskCommand command)
{
var args = new ResendMessageTaskJobArgs()
{
MessageTaskId = command.MessageTaskId
MessageTaskId = command.MessageTaskId,
Environment = _multiEnvironmentContext.CurrentEnvironment
};

await BackgroundJobManager.EnqueueAsync(args);
Expand Down
Loading

0 comments on commit a5ec2fa

Please sign in to comment.