diff --git a/.github/workflows/build_and_it.yml b/.github/workflows/build_and_it.yml index d5fafaf..c7b3edd 100644 --- a/.github/workflows/build_and_it.yml +++ b/.github/workflows/build_and_it.yml @@ -51,7 +51,8 @@ jobs: tar -xvf dtm_1.18.0_linux_amd64.tar.gz pwd mkdir /home/runner/work/client-csharp/client-csharp/logs - nohup ./dtm > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 & + echo "UpdateBranchSync: 1" > ./config.yml + nohup ./dtm -c ./config.yml > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 & sleep 5 curl "127.0.0.1:36789/api/dtmsvr/newGid" - name: Setup Busi Service diff --git a/samples/DtmSample/Controllers/WfTestController.cs b/samples/DtmSample/Controllers/WfTestController.cs index 614e969..c1e7d5a 100644 --- a/samples/DtmSample/Controllers/WfTestController.cs +++ b/samples/DtmSample/Controllers/WfTestController.cs @@ -7,12 +7,10 @@ using Microsoft.Extensions.Options; using System; using System.IO; -using System.Diagnostics; using System.Net.Http; using System.Net.Http.Headers; using System.Text; using System.Text.Json; -using System.Text.Unicode; using System.Threading; using System.Threading.Tasks; using Exception = System.Exception; @@ -257,5 +255,84 @@ public async Task TccRollBack(CancellationToken cancellationToken return Ok(TransResponse.BuildFailureResponse()); } } + + + private static readonly string wfNameForResume = "wfNameForResume"; + + /// + /// + /// + /// + /// + [HttpPost("wf-crash")] + public async Task Crash(CancellationToken cancellationToken) + { + if (!_globalTransaction.Exists(wfNameForResume)) + { + _globalTransaction.Register(wfNameForResume, async (wf, data) => + { + var content = new ByteArrayContent(data); + content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + + var outClient = wf.NewBranch().NewRequest(); + await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content); + + // the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back + // manual stop application + Environment.Exit(0); + + var inClient = wf.NewBranch().NewRequest(); + await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content); + + return null; + }); + } + + var req = JsonSerializer.Serialize(new TransRequest("1", -30)); + await _globalTransaction.Execute(wfNameForResume, Guid.NewGuid().ToString("N"), Encoding.UTF8.GetBytes(req), true); + + return Ok(TransResponse.BuildSucceedResponse()); + } + + [HttpPost("wf-resume")] + public async Task WfResume(CancellationToken cancellationToken) + { + try + { + if (!_globalTransaction.Exists(wfNameForResume)) + { + // register again after manual crash by Environment.Exit(0); + _globalTransaction.Register(wfNameForResume, async (wf, data) => + { + var content = new ByteArrayContent(data); + content.Headers.ContentType = new MediaTypeHeaderValue("application/json"); + + var outClient = wf.NewBranch().NewRequest(); + await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content); + + var inClient = wf.NewBranch().NewRequest(); + await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content); + + return null; + }); + } + + // prepared call ExecuteByQS + using var bodyMemoryStream = new MemoryStream(); + await Request.Body.CopyToAsync(bodyMemoryStream, cancellationToken); + byte[] bytes = bodyMemoryStream.ToArray(); + string body = Encoding.UTF8.GetString(bytes); + _logger.LogDebug($"body: {body}"); + + await _globalTransaction.ExecuteByQS(Request.Query, bodyMemoryStream.ToArray()); + + return Ok(TransResponse.BuildSucceedResponse()); + } + catch (Exception ex) + { + _logger.LogError(ex, "Workflow Error"); + return Ok(TransResponse.BuildFailureResponse()); + } + } } } diff --git a/src/DtmCommon/Barrier/BranchBarrier.cs b/src/DtmCommon/Barrier/BranchBarrier.cs index 7453717..d3f9a05 100644 --- a/src/DtmCommon/Barrier/BranchBarrier.cs +++ b/src/DtmCommon/Barrier/BranchBarrier.cs @@ -42,7 +42,7 @@ public BranchBarrier(string transType, string gid, string branchID, string op, D public int BarrierID { get; set; } - public async Task Call(DbConnection db, Func busiCall) + public async Task<(bool done, string reason)> Call(DbConnection db, Func busiCall) { this.BarrierID = this.BarrierID + 1; var bid = this.BarrierID.ToString().PadLeft(2, '0'); @@ -91,7 +91,7 @@ public async Task Call(DbConnection db, Func busiCall) #else await tx.CommitAsync(); #endif - return; + return (false, isNullCompensation ? "isNullCompensation" : "isDuplicateOrPend"); } await busiCall.Invoke(tx); @@ -118,9 +118,10 @@ public async Task Call(DbConnection db, Func busiCall) throw; } + return (true, string.Empty); } - public async Task Call(DbConnection db, Func busiCall, TransactionScopeOption transactionScope = TransactionScopeOption.Required, IsolationLevel isolationLevel = IsolationLevel.Serializable) + public async Task<(bool done, string reason)> Call(DbConnection db, Func busiCall, TransactionScopeOption transactionScope = TransactionScopeOption.Required, IsolationLevel isolationLevel = IsolationLevel.Serializable) { this.BarrierID = this.BarrierID + 1; var bid = this.BarrierID.ToString().PadLeft(2, '0'); @@ -158,7 +159,7 @@ public async Task Call(DbConnection db, Func busiCall, TransactionScopeOpt if (isNullCompensation || isDuplicateOrPend) { Logger?.LogInformation("Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend); - return; + return (false, isNullCompensation ? "isNullCompensation" : "isDuplicateOrPend"); } await busiCall.Invoke(); scope.Complete(); @@ -174,7 +175,7 @@ public async Task Call(DbConnection db, Func busiCall, TransactionScopeOpt throw; } } - + return (true, string.Empty); } public async Task QueryPrepared(DbConnection db) diff --git a/src/DtmCommon/Constant.cs b/src/DtmCommon/Constant.cs index 71961cb..e3523b5 100644 --- a/src/DtmCommon/Constant.cs +++ b/src/DtmCommon/Constant.cs @@ -67,8 +67,9 @@ public class Barrier public static readonly Dictionary OpDict = new Dictionary() { - { "cancel", "try" }, - { "compensate", "action" }, + { "cancel", "try" }, // tcc + { "compensate", "action" }, // saga + { "rollback", "action" }, // workflow }; public static readonly string REDIS_LUA_CheckAdjustAmount = @" -- RedisCheckAdjustAmount local v = redis.call('GET', KEYS[1]) diff --git a/src/Dtmcli/Constant.cs b/src/Dtmcli/Constant.cs index 3bc3959..a132433 100644 --- a/src/Dtmcli/Constant.cs +++ b/src/Dtmcli/Constant.cs @@ -4,7 +4,8 @@ internal static class Constant { internal const string DtmClientHttpName = "dtmClient"; internal const string BranchClientHttpName = "branchClient"; - + internal const string WorkflowBranchClientHttpName = "WF"; + internal static class Request { internal const string CONTENT_TYPE = "application/json"; diff --git a/src/Dtmcli/DtmClient.cs b/src/Dtmcli/DtmClient.cs index 7a811e3..246b69b 100644 --- a/src/Dtmcli/DtmClient.cs +++ b/src/Dtmcli/DtmClient.cs @@ -148,7 +148,7 @@ public async Task Query(string gid, CancellationToken cancellationT var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName); var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false); var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false); - DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent); + DtmImp.Utils.CheckStatusCode(response.StatusCode); return JsonSerializer.Deserialize(dtmContent, _jsonOptions); } @@ -167,7 +167,7 @@ public async Task QueryStatus(string gid, CancellationToken cancellation var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName); var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false); var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false); - DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent); + DtmImp.Utils.CheckStatusCode(response.StatusCode); var graph = JsonSerializer.Deserialize(dtmContent, _jsonOptions); return graph.Transaction == null ? string.Empty diff --git a/src/Dtmcli/DtmImp/Utils.cs b/src/Dtmcli/DtmImp/Utils.cs index 0b374e9..54f7f16 100644 --- a/src/Dtmcli/DtmImp/Utils.cs +++ b/src/Dtmcli/DtmImp/Utils.cs @@ -40,6 +40,14 @@ public static void CheckStatus(HttpStatusCode status, string dtmResult) } } + public static void CheckStatusCode(HttpStatusCode status) + { + if (status != HttpStatusCode.OK) + { + throw new DtmException(string.Format(CheckStatusMsgFormat, status.ToString(), string.Empty)); + } + } + /// /// OrString return the first not null or not empty string /// diff --git a/src/Dtmcli/TransGlobal.cs b/src/Dtmcli/TransGlobal.cs index 03f3002..4953604 100644 --- a/src/Dtmcli/TransGlobal.cs +++ b/src/Dtmcli/TransGlobal.cs @@ -28,7 +28,7 @@ public class DtmTransaction { [JsonPropertyName("id")] public int Id { get; set; } - [JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; } + [JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; } [JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; } @@ -64,9 +64,9 @@ public class DtmBranch { [JsonPropertyName("id")] public int Id { get; set; } - [JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; } + [JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; } - [JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; } + [JsonPropertyName("update_time")] public DateTimeOffset? UpdateTime { get; set; } [JsonPropertyName("gid")] public string Gid { get; set; } diff --git a/src/Dtmworkflow/ServiceCollectionExtensions.cs b/src/Dtmworkflow/ServiceCollectionExtensions.cs index 55d759b..c440ff0 100644 --- a/src/Dtmworkflow/ServiceCollectionExtensions.cs +++ b/src/Dtmworkflow/ServiceCollectionExtensions.cs @@ -23,6 +23,8 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services services.TryAddSingleton(); services.TryAddSingleton(); + // AddHttpClient(services); + return services; } @@ -33,8 +35,22 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services services.TryAddSingleton(); services.TryAddSingleton(); + + // AddHttpClient(services); return services; } + + // private static void AddHttpClient(IServiceCollection services /*, DtmOptions options*/) + // { + // services.AddHttpClient(Dtmcli.Constant.WorkflowBranchClientHttpName, client => + // { + // // TODO DtmOptions + // // client.Timeout = TimeSpan.FromMilliseconds(options.BranchTimeout); + // }).AddHttpMessageHandler(); + // + // // TODO how to inject workflow instance? + // services.AddTransient(); + // } } -} +} \ No newline at end of file diff --git a/src/Dtmworkflow/Workflow.Imp.cs b/src/Dtmworkflow/Workflow.Imp.cs index b1264eb..d4e92ed 100644 --- a/src/Dtmworkflow/Workflow.Imp.cs +++ b/src/Dtmworkflow/Workflow.Imp.cs @@ -64,7 +64,7 @@ internal async Task Process(WfFunc2 handler, byte[] data) } err = Utils.GrpcError2DtmError(err); - + if (err != null && err is not DtmCommon.DtmFailureException) throw err; try @@ -164,23 +164,23 @@ private StepResult StepResultFromLocal(byte[] data, Exception err) }; } - private Exception StepResultToGrpc(StepResult r, IMessage reply) + internal Exception StepResultToGrpc(StepResult r, IMessage reply) { if (r.Error == null && r.Status == DtmCommon.Constant.StatusSucceed) { - // Check - + // TODO Check + // dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage)); } return r.Error; } - private StepResult StepResultFromGrpc(IMessage reply, Exception err) + internal StepResult StepResultFromGrpc(IMessage reply, Exception err) { var sr = new StepResult { - // GRPCError2DtmError - Error = null, + // TODO GRPCError2DtmError + Error = Utils.GrpcError2DtmError(err), }; sr.Status = WfErrorToStatus(sr.Error); @@ -196,7 +196,7 @@ private StepResult StepResultFromGrpc(IMessage reply, Exception err) return sr; } - private HttpResponseMessage StepResultToHttp(StepResult r) + internal HttpResponseMessage StepResultToHttp(StepResult r) { if (r.Error != null) { @@ -206,7 +206,7 @@ private HttpResponseMessage StepResultToHttp(StepResult r) return Utils.NewJSONResponse(HttpStatusCode.OK, r.Data); } - private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err) + internal StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err) { var sr = new StepResult { @@ -215,7 +215,7 @@ private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err) if (err == null) { - // HTTPResp2DtmError + (sr.Data, sr.Error) = Utils.HTTPResp2DtmError(resp); // TODO go used this.Options.HTTPResp2DtmError(resp), for custom sr.Status = WfErrorToStatus(sr.Error); } @@ -237,9 +237,9 @@ private string WfErrorToStatus(Exception err) } - private async Task RecordedDo(Func> fn) + internal async Task RecordedDo(Func> fn) { - var sr = await this.RecordedDoInner(fn); + StepResult sr = await this.RecordedDoInner(fn); // do not compensate the failed branch if !CompensateErrorBranch if (this.Options.CompensateErrorBranch && sr.Status == DtmCommon.Constant.StatusFailed) diff --git a/src/Dtmworkflow/Workflow.cs b/src/Dtmworkflow/Workflow.cs index 930a02c..21d86c7 100644 --- a/src/Dtmworkflow/Workflow.cs +++ b/src/Dtmworkflow/Workflow.cs @@ -3,7 +3,9 @@ using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Net.Http; using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; namespace Dtmworkflow { @@ -32,7 +34,8 @@ public Workflow(IDtmClient httpClient, IDtmgRPCClient grpcClient, Dtmcli.IBranch public System.Net.Http.HttpClient NewRequest() { - return _httpClient.GetHttpClient("WF"); + // return _httpClient.GetHttpClient("WF"); + return new HttpClient(new WorkflowHttpInterceptor(this)); } /// diff --git a/src/Dtmworkflow/WorkflowGlobalTransaction.cs b/src/Dtmworkflow/WorkflowGlobalTransaction.cs index ce4bbec..44214db 100644 --- a/src/Dtmworkflow/WorkflowGlobalTransaction.cs +++ b/src/Dtmworkflow/WorkflowGlobalTransaction.cs @@ -52,7 +52,6 @@ public void Register(string name, WfFunc2 handler, params Action[] cus }); } -#if NET5_0_OR_GREATER public async Task ExecuteByQS(Microsoft.AspNetCore.Http.IQueryCollection query, byte[] body) { _ = query.TryGetValue("gid", out var gid); @@ -60,7 +59,6 @@ public async Task ExecuteByQS(Microsoft.AspNetCore.Http.IQueryCollection query, await Execute(op, gid, body, true); } -#endif #if DEBUG // for sample only public bool Exists(string name) diff --git a/src/Dtmworkflow/WorkflowGrpcInterceptor.cs b/src/Dtmworkflow/WorkflowGrpcInterceptor.cs new file mode 100644 index 0000000..3f46660 --- /dev/null +++ b/src/Dtmworkflow/WorkflowGrpcInterceptor.cs @@ -0,0 +1,114 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Google.Protobuf; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Microsoft.Extensions.Logging; + +namespace Dtmworkflow; + +// [gRPC interceptors on .NET | Microsoft Learn](https://learn.microsoft.com/en-us/aspnet/core/grpc/interceptors?view=aspnetcore-9.0) +public class WorkflowGrpcInterceptor(Workflow wf, ILogger logger) : Interceptor +{ + public WorkflowGrpcInterceptor(Workflow wf) : this(wf, null) + { + } + + public override AsyncUnaryCall AsyncUnaryCall( + TRequest request, + ClientInterceptorContext context, + AsyncUnaryCallContinuation continuation) + { + logger?.LogDebug($"grpc client calling: {context.Host}{context.Method.FullName}"); + + if (wf == null) + { + return base.AsyncUnaryCall(request, context, continuation); + } + + async Task<(AsyncUnaryCall, TResponse, Status)> Origin() + { + var newContext = Dtmgimp.TransInfo2Ctx(context, wf.TransBase.Gid, wf.TransBase.TransType, wf.WorkflowImp.CurrentBranch, wf.WorkflowImp.CurrentOp, wf.TransBase.Dtm); + + var call = continuation(request, newContext); + TResponse response; + try + { + response = await call.ResponseAsync; + } + catch (Exception e) + { + logger?.LogDebug($"grpc client: {context.Host}{context.Method.FullName} ex: {e}"); + response = null; + } + + Status status = call.GetStatus(); + return ( + new AsyncUnaryCall( + call.ResponseAsync, + call.ResponseHeadersAsync, + call.GetStatus, + call.GetTrailers, + call.Dispose), + response, + status + ); + } + + // intercept phase1 only. CallPhase2 comes with RecordedDo + if (wf.WorkflowImp.CurrentOp != DtmCommon.Constant.OpAction) + { + var (newCall, _, _) = Origin().GetAwaiter().GetResult(); + return newCall; + } + + AsyncUnaryCall call = null; + StepResult sr = wf.RecordedDo(bb => + { + (call, TResponse data, Status status) = Origin().GetAwaiter().GetResult(); + RpcException err = status.StatusCode != StatusCode.OK ? new RpcException(status) : null; + return Task.FromResult(wf.StepResultFromGrpc(data as IMessage, err)); + }).GetAwaiter().GetResult(); + Exception exception = wf.StepResultToGrpc(sr, null); + + return call; + } + + private class Dtmgimp + { + public static ClientInterceptorContext TransInfo2Ctx( + ClientInterceptorContext ctx, + string gid, + string transType, + string branchID, + string op, + string dtm) where TRequest : class where TResponse : class + { + var headers = new Metadata(); + if (ctx.Options.Headers != null) + { + foreach (Metadata.Entry entity in ctx.Options.Headers) + { + headers.Add(entity.Key, entity.Value); + } + } + + const string dtmpre = "dtm-"; + headers.Add(dtmpre + "gid", gid); + headers.Add(dtmpre + "trans_type", transType); + headers.Add(dtmpre + "branch_id", branchID); + headers.Add(dtmpre + "op", op); + headers.Add(dtmpre + "dtm", dtm); + + headers.Add("sub-call-id", $"{op}-{Guid.NewGuid()}"); + + var nctx = new ClientInterceptorContext( + ctx.Method, + ctx.Host, + new CallOptions(headers: headers, deadline: ctx.Options.Deadline, cancellationToken: ctx.Options.CancellationToken)); + + return nctx; + } + } +} \ No newline at end of file diff --git a/src/Dtmworkflow/WorkflowHttpInterceptor.cs b/src/Dtmworkflow/WorkflowHttpInterceptor.cs new file mode 100644 index 0000000..86b4829 --- /dev/null +++ b/src/Dtmworkflow/WorkflowHttpInterceptor.cs @@ -0,0 +1,50 @@ +using System; +using System.Net.Http; +using System.Threading; +using System.Threading.Tasks; +using System.Web; + +namespace Dtmworkflow; + +internal class WorkflowHttpInterceptor : DelegatingHandler +{ + private readonly Workflow _wf; + + public WorkflowHttpInterceptor(Workflow wf) + { + this._wf = wf; + InnerHandler = new HttpClientHandler(); + } + + protected override async Task SendAsync(HttpRequestMessage request, CancellationToken cancellationToken) + { + Func> origin = async (barrier) => + { + var uriBuilder = new UriBuilder(request.RequestUri); + var query = HttpUtility.ParseQueryString(uriBuilder.Query); + query["branch_id"] = _wf.WorkflowImp.CurrentBranch; + query["gid"] = _wf.TransBase.Gid; + query["op"] = _wf.WorkflowImp.CurrentOp; + query["trans_type"] = _wf.TransBase.TransType; + query["dtm"] = _wf.TransBase.Dtm; + uriBuilder.Query = query.ToString(); + request.RequestUri = uriBuilder.Uri; + + var response = await base.SendAsync(request, cancellationToken); + return _wf.StepResultFromHTTP(response, null); + }; + + StepResult sr; + // in phase 2, do not save, because it is saved outer + if (_wf.WorkflowImp.CurrentOp != DtmCommon.Constant.OpAction) + { + sr = await origin(null); + } + else + { + sr = await _wf.RecordedDo(origin); + } + + return _wf.StepResultToHttp(sr); + } +} \ No newline at end of file diff --git a/tests/BusiGrpcService/BusiGrpcService.csproj b/tests/BusiGrpcService/BusiGrpcService.csproj index b896af4..0f66750 100644 --- a/tests/BusiGrpcService/BusiGrpcService.csproj +++ b/tests/BusiGrpcService/BusiGrpcService.csproj @@ -8,7 +8,9 @@ + + @@ -20,4 +22,12 @@ + + + + + + + + diff --git a/tests/BusiGrpcService/Controllers/WorkflowHttpTestController.cs b/tests/BusiGrpcService/Controllers/WorkflowHttpTestController.cs new file mode 100644 index 0000000..34e0d90 --- /dev/null +++ b/tests/BusiGrpcService/Controllers/WorkflowHttpTestController.cs @@ -0,0 +1,30 @@ +using Microsoft.AspNetCore.Mvc; + +namespace BusiGrpcService.Controllers +{ + [ApiController] + public class WorkflowHttpTestController : ControllerBase + { + [HttpGet("test-http-ok1")] + public IActionResult TestHttpOk1() + { + Console.Out.WriteLine($"QueryString: {Request.QueryString}"); + return Content("SUCCESS"); + } + + [HttpGet("test-http-ok2")] + public IActionResult TestHttpOk2() + { + Console.Out.WriteLine($"QueryString: {Request.QueryString}"); + return Content("SUCCESS"); + } + + [HttpGet("409")] + public IActionResult Test409() + { + Console.Out.WriteLine($"QueryString: {Request.QueryString}"); + Response.StatusCode = 409; + return Content("i am body, the http branch is 409"); + } + } +} \ No newline at end of file diff --git a/tests/BusiGrpcService/Program.cs b/tests/BusiGrpcService/Program.cs index be2cc0d..9a18d11 100644 --- a/tests/BusiGrpcService/Program.cs +++ b/tests/BusiGrpcService/Program.cs @@ -8,18 +8,34 @@ { // Setup a HTTP/2 endpoint without TLS. options.ListenLocalhost(5005, o => o.Protocols = HttpProtocols.Http2); + // test for workflow http branch + options.ListenLocalhost(5006, o => o.Protocols = HttpProtocols.Http1); }); builder.Services.AddGrpc(); -builder.Services.AddDtmGrpc(x => -{ - x.DtmGrpcUrl = "http://localhost:36790"; -}); +builder.Services.AddGrpcReflection(); +builder.Services.AddDtmGrpc(x => { x.DtmGrpcUrl = "http://localhost:36790"; }); +builder.Services.AddControllers(); +builder.Services.AddEndpointsApiExplorer(); +builder.Services.AddSwaggerGen(); var app = builder.Build(); // Configure the HTTP request pipeline. app.MapGrpcService(); + +IWebHostEnvironment env = app.Environment; +if (env.IsDevelopment()) +{ + app.MapGrpcReflectionService(); + app.UseSwagger(); + app.UseSwaggerUI(); +} + + +app.MapSwagger(); +app.MapDefaultControllerRoute(); +app.MapControllers(); app.MapGet("/", () => "Communication with gRPC endpoints must be made through a gRPC client. To learn how to create a client, visit: https://go.microsoft.com/fwlink/?linkid=2086909"); -app.Run(); +app.Run(); \ No newline at end of file diff --git a/tests/BusiGrpcService/Services/BusiApiService.cs b/tests/BusiGrpcService/Services/BusiApiService.cs index a8f1370..36fe0ce 100644 --- a/tests/BusiGrpcService/Services/BusiApiService.cs +++ b/tests/BusiGrpcService/Services/BusiApiService.cs @@ -10,7 +10,7 @@ namespace BusiGrpcService.Services { - public class BusiApiService : Busi.BusiBase + public partial class BusiApiService : Busi.BusiBase { private readonly ILogger _logger; @@ -26,28 +26,8 @@ public BusiApiService(ILogger logger, Dtmgrpc.IDtmgRPCClient cli public override async Task TransIn(BusiReq request, ServerCallContext context) { - _logger.LogInformation("TransIn req={req}", JsonSerializer.Serialize(request)); - - if (string.IsNullOrWhiteSpace(request.TransInResult) || request.TransInResult.Equals("SUCCESS")) - { - await Task.CompletedTask; - return new Empty(); - } - else if (request.TransInResult.Equals("FAILURE")) - { - throw new Grpc.Core.RpcException(new Status(StatusCode.Aborted, "FAILURE")); - } - else if (request.TransInResult.Equals("ONGOING")) - { - throw new Grpc.Core.RpcException(new Status(StatusCode.FailedPrecondition, "ONGOING")); - } - - throw new Grpc.Core.RpcException(new Status(StatusCode.Internal, $"unknow result {request.TransInResult}")); - } - - public override async Task TransInTcc(BusiReq request, ServerCallContext context) - { - _logger.LogInformation("TransIn req={req}", JsonSerializer.Serialize(request)); + string gid = context.RequestHeaders.Get("dtm-gid")?.Value; + _logger.LogInformation("TransIn gid={gid} req={req}", gid, JsonSerializer.Serialize(request)); if (string.IsNullOrWhiteSpace(request.TransInResult) || request.TransInResult.Equals("SUCCESS")) { @@ -86,16 +66,31 @@ public override async Task TransInRevert(BusiReq request, ServerCallConte public override async Task TransOut(BusiReq request, ServerCallContext context) { - _logger.LogInformation("TransOut req={req}", JsonSerializer.Serialize(request)); + string gid = context.RequestHeaders.Get("dtm-gid")?.Value; + _logger.LogInformation("TransOut gid={gid} req={req}", gid, JsonSerializer.Serialize(request)); await Task.CompletedTask; return new Empty(); } public override async Task TransOutTcc(BusiReq request, ServerCallContext context) { - _logger.LogInformation("TransOut req={req}", JsonSerializer.Serialize(request)); - await Task.CompletedTask; - return new Empty(); + _logger.LogInformation("TransOutTry req={req}", JsonSerializer.Serialize(request)); + + if (string.IsNullOrWhiteSpace(request.TransOutResult) || request.TransOutResult.Equals("SUCCESS")) + { + await Task.CompletedTask; + return new Empty(); + } + else if (request.TransOutResult.Equals("FAILURE")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Aborted, "FAILURE")); + } + else if (request.TransOutResult.Equals("ONGOING")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.FailedPrecondition, "ONGOING")); + } + + throw new Grpc.Core.RpcException(new Status(StatusCode.Internal, $"unknow result {request.TransOutResult}")); } public override async Task TransOutConfirm(BusiReq request, ServerCallContext context) diff --git a/tests/BusiGrpcService/Services/BusiApiService_Stream.cs b/tests/BusiGrpcService/Services/BusiApiService_Stream.cs new file mode 100644 index 0000000..d81ca10 --- /dev/null +++ b/tests/BusiGrpcService/Services/BusiApiService_Stream.cs @@ -0,0 +1,177 @@ +using System.Text; +using System.Text.Json; +using System.Transactions; +using busi; +using DtmCommon; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core; +using MySqlConnector; + +namespace BusiGrpcService.Services; + +public partial class BusiApiService +{ + public override async Task StreamTransOutTcc(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + // stream try -> confirm/cancel + await foreach (var request in requestStream.ReadAllAsync()) + { + if (request.DtmBranchTransInfo != null) + { + BranchBarrier branchBarrier = _barrierFactory.CreateBranchBarrier( + request.DtmBranchTransInfo.TransType, + request.DtmBranchTransInfo.Gid, + request.DtmBranchTransInfo.BranchId, + request.DtmBranchTransInfo.Op, _logger); + _logger.LogInformation( + $"{nameof(StreamTransOutTcc)} gid={branchBarrier.Gid} branch_id={branchBarrier.BranchID} op={branchBarrier.Op}, req={JsonSerializer.Serialize(request)}"); + + await using MySqlConnection conn = GetBarrierConn(); + (bool done, string reason) = await branchBarrier.Call(conn, async () => + { + // business logic + await TransOutFn(responseStream, request); + }); + if (!done) + { + _logger.LogInformation($"NOT done, reason:{reason} {nameof(StreamTransOutTcc)} gid={branchBarrier.Gid} branch_id={branchBarrier.BranchID} op={branchBarrier.Op}"); + await responseStream.WriteAsync(new StreamReply { OperateType = request.OperateType, Message = reason }); + } + } + else + { + await TransOutFn(responseStream, request); + } + } + + _logger.LogInformation($"{nameof(StreamTransOutTcc)} completed"); + } + + private static async Task TransOutFn(IServerStreamWriter responseStream, StreamRequest request) + { + switch (request.OperateType) + { + case OperateType.Try: + { + if (string.IsNullOrWhiteSpace(request.BusiRequest.TransOutResult) || request.BusiRequest.TransOutResult.Equals("SUCCESS")) + { + await responseStream.WriteAsync(new StreamReply { OperateType = request.OperateType, Message = "Tried, waiting your confirm..." }); + } + else if (request.BusiRequest.TransOutResult.Equals("FAILURE")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Aborted, "FAILURE")); + } + else if (request.BusiRequest.TransOutResult.Equals("ONGOING")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.FailedPrecondition, "ONGOING")); + } + else + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Internal, $"unknow result {request.BusiRequest.TransOutResult}")); + } + + break; + } + case OperateType.Confirm: + { + await responseStream.WriteAsync(new StreamReply { OperateType = request.OperateType, Message = "Confirmed" }); + break; + } + case OperateType.Cancel: + { + await responseStream.WriteAsync(new StreamReply { OperateType = request.OperateType, Message = "Canceled" }); + break; + } + default: + throw new ArgumentOutOfRangeException(); + } + } + + public override async Task StreamTransInTcc(IAsyncStreamReader requestStream, IServerStreamWriter responseStream, ServerCallContext context) + { + // stream try -> confirm/cancel + await foreach (var request in requestStream.ReadAllAsync()) + { + if (request.DtmBranchTransInfo != null) + { + BranchBarrier branchBarrier = _barrierFactory.CreateBranchBarrier( + request.DtmBranchTransInfo.TransType, + request.DtmBranchTransInfo.Gid, + request.DtmBranchTransInfo.BranchId, + request.DtmBranchTransInfo.Op, _logger); + _logger.LogInformation( + $"{nameof(StreamTransInTcc)} gid={branchBarrier.Gid} branch_id={branchBarrier.BranchID} op={branchBarrier.Op}, req={JsonSerializer.Serialize(request)}"); + + await using MySqlConnection conn = GetBarrierConn(); + (bool done, string reason) = await branchBarrier.Call(conn, async () => + { + // business logic + await TransInFn(responseStream, request); + }); + if (!done) + { + _logger.LogInformation($"NOT done, reason:{reason} {nameof(StreamTransInTcc)} gid={branchBarrier.Gid} branch_id={branchBarrier.BranchID} op={branchBarrier.Op}"); + await responseStream.WriteAsync(new StreamReply { OperateType = request.OperateType, Message = reason }); + } + } + else + { + await TransInFn(responseStream, request); + } + } + + _logger.LogInformation($"{nameof(StreamTransInTcc)} completed"); + } + + private static async Task TransInFn(IServerStreamWriter responseStream, StreamRequest request) + { + switch (request.OperateType) + { + case OperateType.Try: + { + if (string.IsNullOrWhiteSpace(request.BusiRequest.TransInResult) || request.BusiRequest.TransInResult.Equals("SUCCESS")) + { + await responseStream.WriteAsync(new StreamReply + { + OperateType = request.OperateType, + Message = "Tried, waiting your confirm..." + }); + } + else if (request.BusiRequest.TransInResult.Equals("FAILURE")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Aborted, "FAILURE")); + } + else if (request.BusiRequest.TransInResult.Equals("ONGOING")) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.FailedPrecondition, "ONGOING")); + } + else + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Internal, $"unknow result {request.BusiRequest.TransInResult}")); + } + + break; + } + case OperateType.Confirm: + { + await responseStream.WriteAsync(new StreamReply + { + OperateType = request.OperateType, + Message = "Confirmed" + }); + break; + } + case OperateType.Cancel: + { + await responseStream.WriteAsync(new StreamReply + { + OperateType = request.OperateType, + Message = "Canceled" + }); + break; + } + default: + throw new ArgumentOutOfRangeException(); + } + } +} \ No newline at end of file diff --git a/tests/Dtmgrpc.IntegrationTests/BusiApiServiceTest.cs b/tests/Dtmgrpc.IntegrationTests/BusiApiServiceTest.cs new file mode 100644 index 0000000..f509ef5 --- /dev/null +++ b/tests/Dtmgrpc.IntegrationTests/BusiApiServiceTest.cs @@ -0,0 +1,147 @@ +using System.ComponentModel; +using System.Threading; +using System.Threading.Tasks; +using busi; +using Dtmworkflow; +using Grpc.Core; +using Grpc.Core.Interceptors; +using Grpc.Net.Client; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using Xunit; +using Xunit.Abstractions; + +namespace Dtmgrpc.IntegrationTests; + +// [Call gRPC services with the .NET client | Microsoft Learn](https://learn.microsoft.com/en-us/aspnet/core/grpc/client?view=aspnetcore-9.0#bi-directional-streaming-call) + +public class BusiApiServiceTest(ITestOutputHelper testOutputHelper) +{ + [Fact] + public async Task StreamTransOutTcc_Try_Confirm() + { + var provider = ITTestHelper.AddDtmGrpc(); + Busi.BusiClient busiClient = GetBusiClientWithWf(null, provider); + + using AsyncDuplexStreamingCall call = busiClient.StreamTransOutTcc(); + testOutputHelper.WriteLine("Starting background task to receive messages"); + var myGrpcProcesser = new MyGrpcProcesser(call, testOutputHelper); + var readTask = myGrpcProcesser.HandleResponse(); + + testOutputHelper.WriteLine("Starting to send messages"); + BusiReq busiRequest = ITTestHelper.GenBusiReq(false, false); + + // try + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Try, + BusiRequest = busiRequest, + }); + Grpc.Core.Status tryStatus = await myGrpcProcesser.GetResult(OperateType.Try); + Assert.Equal(StatusCode.OK, tryStatus.StatusCode); + + // confirm + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Confirm, + BusiRequest = busiRequest, + }); + // wait Confirm + Grpc.Core.Status confirmStatus = await myGrpcProcesser.GetResult(OperateType.Confirm); + Assert.Equal(StatusCode.OK, confirmStatus.StatusCode); + + await call.RequestStream.CompleteAsync(); + await readTask; + } + + [Fact] + public async Task StreamTransOutTcc_Try_Failed() + { + var provider = ITTestHelper.AddDtmGrpc(); + Busi.BusiClient busiClient = GetBusiClientWithWf(null, provider); + + using AsyncDuplexStreamingCall call = busiClient.StreamTransOutTcc(); + testOutputHelper.WriteLine("Starting background task to receive messages"); + var myGrpcProcesser = new MyGrpcProcesser(call, testOutputHelper); + var readTask = myGrpcProcesser.HandleResponse(); + + testOutputHelper.WriteLine("Starting to send messages"); + BusiReq busiRequest = ITTestHelper.GenBusiReq(true, false); + + // try + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Try, + BusiRequest = busiRequest, + }); + // wait try + var tryStatus = await myGrpcProcesser.GetResult(OperateType.Try); + Assert.Equal(StatusCode.Aborted, tryStatus.StatusCode); + Assert.Equal("FAILURE", tryStatus.Detail); + + await call.RequestStream.CompleteAsync(); + await Assert.ThrowsAsync(async () => { await readTask; }); // because try action aborted. + } + + + [Description("try-cancel")] + [Fact] + public async Task StreamTransOutTcc_Try_Cancel() + { + var provider = ITTestHelper.AddDtmGrpc(); + Busi.BusiClient busiClient = GetBusiClientWithWf(null, provider); + + using AsyncDuplexStreamingCall call = busiClient.StreamTransOutTcc(); + testOutputHelper.WriteLine("Starting background task to receive messages"); + var myGrpcProcesser = new MyGrpcProcesser(call, testOutputHelper); + var readTask = myGrpcProcesser.HandleResponse(); + + testOutputHelper.WriteLine("Starting to send messages"); + BusiReq busiRequest = ITTestHelper.GenBusiReq(false, false); + + // try + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Try, + BusiRequest = busiRequest, + }); + // wait try + var tryStatus = await myGrpcProcesser.GetResult(OperateType.Try); + Assert.Equal(StatusCode.OK, tryStatus.StatusCode); + + // cancel + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Cancel, + BusiRequest = busiRequest, + }); + // wait cancel + var cancelStatus = await myGrpcProcesser.GetResult(OperateType.Cancel); + Assert.Equal(StatusCode.OK, cancelStatus.StatusCode); + + await call.RequestStream.CompleteAsync(); + await readTask; + } + + private static Busi.BusiClient GetBusiClientWithWf(Workflow wf, ServiceProvider provider) + { + var loggerFactory = provider.GetRequiredService(); + var channel = GrpcChannel.ForAddress(ITTestHelper.BuisgRPCUrlWithProtocol); + var logger = loggerFactory.CreateLogger(); + + Busi.BusiClient busiClient; + if (wf != null) + { + var interceptor = new WorkflowGrpcInterceptor(wf, logger); // inject client interceptor, and workflow instance + var callInvoker = channel.Intercept(interceptor); + busiClient = new Busi.BusiClient(callInvoker); + } + else + { + busiClient = new Busi.BusiClient(channel); + } + + return busiClient; + } +} \ No newline at end of file diff --git a/tests/Dtmgrpc.IntegrationTests/MyGrpcProcesser.cs b/tests/Dtmgrpc.IntegrationTests/MyGrpcProcesser.cs new file mode 100644 index 0000000..40da610 --- /dev/null +++ b/tests/Dtmgrpc.IntegrationTests/MyGrpcProcesser.cs @@ -0,0 +1,57 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Threading.Tasks; +using busi; +using Grpc.Core; +using Xunit.Abstractions; + +namespace Dtmgrpc.IntegrationTests; + +public class MyGrpcProcesser(AsyncDuplexStreamingCall call, ITestOutputHelper testOutputHelper) +{ + private TaskCompletionSource _callDisposed = new TaskCompletionSource(); + private readonly ConcurrentDictionary> progress = new(); + + public Task HandleResponse() + { + IAsyncEnumerable asyncEnumerable = call.ResponseStream.ReadAllAsync(); + Task readTask = Task.Run(async () => + { + try + { + await foreach (var response in asyncEnumerable) + { + testOutputHelper.WriteLine($"{response.OperateType}: {response.Message}"); + TaskCompletionSource tcs = progress.GetOrAdd(response.OperateType, type => new TaskCompletionSource()); + tcs.SetResult(new Status(StatusCode.OK, "")); + } + } + catch (RpcException ex) + { + testOutputHelper.WriteLine($"Exception caught: {ex.Status.StatusCode} - {ex.Status.Detail}"); + + // TODO which request does the response correspond to? + var tcs = progress.GetOrAdd(OperateType.Try, type => new TaskCompletionSource()); + tcs.SetResult(ex.Status); + + _callDisposed.SetResult(); + throw; + } + catch (Exception ex) + { + _callDisposed.SetResult(); + throw; + } + }); + return readTask; + } + + public async Task GetResult(OperateType operateType) + { + TaskCompletionSource tcs = progress.GetOrAdd(operateType, type => new TaskCompletionSource()); + + Task.WaitAny(_callDisposed.Task, tcs.Task); + return await tcs.Task; + } +} \ No newline at end of file diff --git a/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcStreamTest.cs b/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcStreamTest.cs new file mode 100644 index 0000000..a1cf6ef --- /dev/null +++ b/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcStreamTest.cs @@ -0,0 +1,782 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Net.Http; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using busi; +using Dtmcli; +using DtmCommon; +using Dtmworkflow; +using Grpc.Core; +using Grpc.Net.Client; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using Xunit; +using Xunit.Abstractions; + +namespace Dtmgrpc.IntegrationTests +{ + public class WorkflowGrpcStreamTest + { + private readonly ITestOutputHelper _testOutputHelper; + + public WorkflowGrpcStreamTest(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + + [Fact] + public async Task Execute_StreamGrpcTccAndDo_TryConfirm() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_StreamGrpcTccAndDo_TryConfirm)}-{Guid.NewGuid().ToString("D")[..8]}"; + AsyncDuplexStreamingCall call = null; + MyGrpcProcesser myGrpcProcesser = null; + Task readTask = null; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq busiRequest = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Confirm, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait Confirm + var result = await myGrpcProcesser.GetResult(OperateType.Confirm); + Assert.Equal(StatusCode.OK, result.StatusCode); + }) + .OnRollback(async (barrier) => // cancel + { + Assert.Fail("should not run OnRollback"); + }); + Busi.BusiClient busiClient = GetBusiClient(); + call = busiClient.StreamTransOutTcc(); + myGrpcProcesser = new MyGrpcProcesser(call, _testOutputHelper); + readTask = myGrpcProcesser.HandleResponse(); + // try + var (_, stepEx) = await workflow.Do(async (barrier) => + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Try, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait try + var result = await myGrpcProcesser.GetResult(OperateType.Try); + Assert.Equal(StatusCode.OK, result.StatusCode); + return (""u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + // 2. local, maybe SAG, at the end, no need to write the reverse rollback. + (_, stepEx) = await workflow.NewBranch() + // .OnRollback(async (barrier) => + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + _testOutputHelper.WriteLine("2. local do"); + return ("my result"u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + + using var call2 = call; + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + + await call.RequestStream.CompleteAsync(); + await readTask; + + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + TransGlobal trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action succeed + // 02 action succeed + // 01 commit succeed + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[2].Op); + + // same gid again + result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[2].Op); + } + + + [Fact] + public async Task Execute_StreamGrpcTccAndDo_TryCancel() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_StreamGrpcTccAndDo_TryConfirm)}-{Guid.NewGuid().ToString("D")[..8]}"; + AsyncDuplexStreamingCall call = null; + MyGrpcProcesser myGrpcProcesser = null; + Task readTask = null; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq busiRequest = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + Assert.Fail("should not run OnCommit"); + }) + .OnRollback(async (barrier) => // cancel + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Cancel, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait Confirm + var result = await myGrpcProcesser.GetResult(OperateType.Cancel); + Assert.Equal(StatusCode.OK, result.StatusCode); + }); + Busi.BusiClient busiClient = GetBusiClient(); + call = busiClient.StreamTransOutTcc(); + myGrpcProcesser = new MyGrpcProcesser(call, _testOutputHelper); + readTask = myGrpcProcesser.HandleResponse(); + // try + var (_, stepEx) = await workflow.Do(async (barrier) => + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Try, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait try + var result = await myGrpcProcesser.GetResult(OperateType.Try); + Assert.Equal(StatusCode.OK, result.StatusCode); + return (""u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + // 2. local, maybe SAG, at the end, no need to write the reverse rollback. + (_, stepEx) = await workflow.NewBranch() + // .OnRollback(async (barrier) => + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + _testOutputHelper.WriteLine("2. db do with throw failed"); + // throw new DtmFailureException("db do failed"); // can't throw + var ex = new DtmFailureException("db do failed"); + return ("my result"u8.ToArray(), ex); + }); + if (stepEx != null) + throw stepEx; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + + using var call2 = call; + // first + + // same gid again + await Assert.ThrowsAsync(async () => + { + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + + await call.RequestStream.CompleteAsync(); + await readTask; + + TransGlobal trans = await dtmClient.Query(gid, CancellationToken.None); + + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action failed + // 01 rollback succeed + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("failed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[2].Op); + + // same gid again + Assert.ThrowsAsync(async () => + { + var result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + // DtmCommon.DtmFailureException + // db do failed + // at Dtmworkflow.Workflow.Process(WfFunc2 handler, Byte[] data) in src/Dtmworkflow/Workflow.Imp.cs + // at Dtmworkflow.WorkflowGlobalTransaction.Execute(String name, String gid, Byte[] data, Boolean isHttp) in src/Dtmworkflow/WorkflowGlobalTransaction.cs + // at Dtmgrpc.IntegrationTests.WorkflowGrpcTest.Execute_GrpcTccAndDo_Should_DoFailed() in tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs + }); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("failed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[2].Op); + } + + [Fact] + public async Task Execute_StreamGrpcTccAndDo_TryServerFailed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_StreamGrpcTccAndDo_TryConfirm)}-{Guid.NewGuid().ToString("D")[..8]}"; + AsyncDuplexStreamingCall call = null; + MyGrpcProcesser myGrpcProcesser = null; + Task readTask = null; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq busiRequest = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + Assert.Fail("should not run OnCommit"); + }) + .OnRollback(async (barrier) => // cancel + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Cancel, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait Confirm + var result = await myGrpcProcesser.GetResult(OperateType.Cancel); + Assert.Equal(StatusCode.OK, result.StatusCode); + }); + Busi.BusiClient busiClient = GetBusiClient(); + call = busiClient.StreamTransOutTcc(); + myGrpcProcesser = new MyGrpcProcesser(call, _testOutputHelper); + readTask = myGrpcProcesser.HandleResponse(); + // try + var (_, stepEx) = await workflow.Do(async (barrier) => + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Try, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait try + var result = await myGrpcProcesser.GetResult(OperateType.Try); + Assert.Equal(StatusCode.Aborted, result.StatusCode); + Assert.Equal("FAILURE", result.Detail); + + return (""u8.ToArray(), new DtmFailureException("Try grpc error")); + }); + if (stepEx != null) + throw stepEx; + + // 2. local, maybe SAG, at the end, no need to write the reverse rollback. + (_, stepEx) = await workflow.NewBranch() + // .OnRollback(async (barrier) => + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + _testOutputHelper.WriteLine("2. local do"); + return ("my result"u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(true, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + + using var call2 = call; + // first + await Assert.ThrowsAsync(async () => + { + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + } + ); + await Assert.ThrowsAsync(async () => { await readTask; }); // grpc aborted by server try method + + var trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action failed + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("failed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + + // same gid again + await Assert.ThrowsAsync(async () => + { + var result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + + await call.RequestStream.CompleteAsync(); + await Assert.ThrowsAsync(async () => { await readTask; }); // grpc aborted by server try method + + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action failed + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("failed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + } + + + [Fact] + public async Task Execute_StreamGrpcTccAndDo_TryClientThrowFailed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_StreamGrpcTccAndDo_TryConfirm)}-{Guid.NewGuid().ToString("D")[..8]}"; + AsyncDuplexStreamingCall call = null; + MyGrpcProcesser myGrpcProcesser = null; + Task readTask = null; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq busiRequest = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + Assert.Fail("should not run OnCommit"); + }) + .OnRollback(async (barrier) => // cancel + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Cancel, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait Confirm + var result = await myGrpcProcesser.GetResult(OperateType.Cancel); + Assert.Equal(StatusCode.OK, result.StatusCode); + }); + Busi.BusiClient busiClient = GetBusiClient(); + call = busiClient.StreamTransOutTcc(); + myGrpcProcesser = new MyGrpcProcesser(call, _testOutputHelper); + readTask = myGrpcProcesser.HandleResponse(); + // try + var (_, stepEx) = await workflow.Do((barrier) => + { + // throw + throw new DtmFailureException("try do manual client error"); + }); + if (stepEx != null) + throw stepEx; + + // 2. local, maybe SAG, at the end, no need to write the reverse rollback. + (_, stepEx) = await workflow.NewBranch() + // .OnRollback(async (barrier) => + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + _testOutputHelper.WriteLine("2. local do"); + return ("my result"u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(true, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + + using var call2 = call; + // first + await Assert.ThrowsAsync(async () => + { + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + } + ); + await call.RequestStream.CompleteAsync(); + await readTask; + + var trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 rollback succeed + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("rollback", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + + // same gid again + await Assert.ThrowsAsync(async () => + { + var result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + + await call.RequestStream.CompleteAsync(); + await readTask; + + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 rollback succeed + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("rollback", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + } + + [Fact] + public async Task Execute_StreamGrpcTccAndDo_TrySucceed_RegisterBranch_ThenCrash_ThenExecuteByQs_Continue() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_StreamGrpcTccAndDo_TryConfirm)}-{Guid.NewGuid().ToString("D")[..8]}"; + AsyncDuplexStreamingCall call = null; + MyGrpcProcesser myGrpcProcesser = null; + Task readTask = null; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq busiRequest = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Confirm, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait Confirm + var result = await myGrpcProcesser.GetResult(OperateType.Confirm); + Assert.Equal(StatusCode.Aborted, result.StatusCode); + Assert.Equal("FAILURE", result.Detail); + }) + .OnRollback(async (barrier) => // cancel + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Cancel, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait Confirm + var result = await myGrpcProcesser.GetResult(OperateType.Cancel); + Assert.Equal(StatusCode.OK, result.StatusCode); + }); + Busi.BusiClient busiClient = GetBusiClient(); + call = busiClient.StreamTransOutTcc(); + myGrpcProcesser = new MyGrpcProcesser(call, _testOutputHelper); + readTask = myGrpcProcesser.HandleResponse(); + // try + var (_, stepEx) = await workflow.Do((barrier) => + { + // Assert should skip + return Task.FromResult<(byte[], Exception)>(([], new DtmFailureException("should skip branch 01 try"))); + }); + if (stepEx != null) + throw stepEx; + + // 2. local, maybe SAG, at the end, no need to write the reverse rollback. + (_, stepEx) = await workflow.NewBranch() + // .OnRollback(async (barrier) => + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + _testOutputHelper.WriteLine("2. local do"); + return ("my result"u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(true, false); + var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req)); + + // init: try failed, then crash + var tb = TransBase.NewTransBase(gid, "workflow", "i am dtm", "1"); + // 1. Prepare + await dtmClient.PrepareWorkflow(tb, CancellationToken.None); + // 2. branch 1 action + await dtmClient.TransRegisterBranch(tb, new() + { + { "data", Encoding.UTF8.GetString(data) }, + { "branch_id", "01" }, + { "op", "action" }, + { "status", "succeed" }, + }, "registerBranch", CancellationToken.None); + + // Asserts + var trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("prepared", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + + using var call2 = call; + + // dtm callback executeByQs + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, data); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + await call.RequestStream.CompleteAsync(); + await readTask; + + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + Assert.Equal(2, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("02", trans.Branches[1].BranchId); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + + // same gid again + result = await workflowGlobalTransaction.Execute(wfName1, gid, data); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + await call.RequestStream.CompleteAsync(); + await readTask; + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + Assert.Equal(2, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("02", trans.Branches[1].BranchId); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + } + + [Fact] + public async Task Execute_StreamGrpcTccAndDo_TrySucceedButMissed_ThenCrash_ThenExecuteByQs_Continue() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_StreamGrpcTccAndDo_TryConfirm)}-{Guid.NewGuid().ToString("D")[..8]}"; + AsyncDuplexStreamingCall call = null; + MyGrpcProcesser myGrpcProcesser = null; + Task readTask = null; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq busiRequest = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Confirm, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait Confirm + var result = await myGrpcProcesser.GetResult(OperateType.Confirm); + Assert.Equal(StatusCode.Aborted, result.StatusCode); + Assert.Equal("FAILURE", result.Detail); + }) + .OnRollback(async (barrier) => // cancel + { + Assert.Fail("should not run OnRollback"); + }); + Busi.BusiClient busiClient = GetBusiClient(); + call = busiClient.StreamTransOutTcc(); + myGrpcProcesser = new MyGrpcProcesser(call, _testOutputHelper); + readTask = myGrpcProcesser.HandleResponse(); + // try + var (_, stepEx) = await workflow.Do(async (barrier) => + { + await call.RequestStream.WriteAsync(new StreamRequest() + { + OperateType = OperateType.Try, + DtmBranchTransInfo = this.CurrentBranchTransInfo(workflow), + BusiRequest = busiRequest, + }); + // wait try + var result = await myGrpcProcesser.GetResult(OperateType.Try); + Assert.Equal(StatusCode.OK, result.StatusCode); + return (""u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + // 2. local, maybe SAG, at the end, no need to write the reverse rollback. + (_, stepEx) = await workflow.NewBranch() + // .OnRollback(async (barrier) => + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => + { + _testOutputHelper.WriteLine("2. local do"); + return ("my result"u8.ToArray(), null); + }); + if (stepEx != null) + throw stepEx; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + var data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req)); + + // init: try failed, then crash + var tb = TransBase.NewTransBase(gid, "workflow", "i am dtm", "1"); + // Prepare + await dtmClient.PrepareWorkflow(tb, CancellationToken.None); + + // Asserts + var trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("prepared", trans.Transaction.Status); + Assert.Empty(trans.Branches); + + using var call2 = call; + + // dtm callback executeByQs + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, data); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + await call.RequestStream.CompleteAsync(); + await readTask; + + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + Assert.Equal(2, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("02", trans.Branches[1].BranchId); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + + // same gid again + result = await workflowGlobalTransaction.Execute(wfName1, gid, data); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + await call.RequestStream.CompleteAsync(); + await readTask; + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + Assert.Equal(2, trans.Branches.Count); + Assert.Equal("01", trans.Branches[0].BranchId); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("02", trans.Branches[1].BranchId); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + } + + private static Busi.BusiClient GetBusiClient() + { + var channel = GrpcChannel.ForAddress(ITTestHelper.BuisgRPCUrlWithProtocol); + return new Busi.BusiClient(channel); + } + + private DtmBranchTransInfo CurrentBranchTransInfo(Workflow wf) + { + return new DtmBranchTransInfo() + { + Gid = wf.TransBase.Gid, + TransType = wf.TransBase.TransType, + BranchId = wf.WorkflowImp.CurrentBranch, + Op = wf.WorkflowImp.CurrentOp, + Dtm = wf.TransBase.Dtm, + }; + } + } +} \ No newline at end of file diff --git a/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs b/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs index 9d3735e..3f23545 100644 --- a/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs +++ b/tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs @@ -1,19 +1,35 @@ using Microsoft.Extensions.DependencyInjection; using System; +using System.Net; +using System.Net.Http; using System.Text; using System.Threading; using System.Threading.Tasks; using busi; +using Dtmcli; +using DtmCommon; using Dtmworkflow; +using Google.Protobuf.WellKnownTypes; +using Grpc.Core.Interceptors; using Grpc.Net.Client; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using MySqlConnector; using Newtonsoft.Json; using Xunit; +using Xunit.Abstractions; namespace Dtmgrpc.IntegrationTests { public class WorkflowGrpcTest { + private readonly ITestOutputHelper _testOutputHelper; + + public WorkflowGrpcTest(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + [Fact] public async Task Execute_Http_Should_Succeed() { @@ -42,7 +58,7 @@ public async Task Execute_gPRC_Should_Succeed() WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); string wfName1 = $"wf-simple-{Guid.NewGuid().ToString("D")[..8]}"; - workflowGlobalTransaction.Register(wfName1, async (workflow, data) => await Task.FromResult("my result"u8.ToArray())); + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => await Task.FromResult("fmy result"u8.ToArray())); string gid = wfName1 + Guid.NewGuid().ToString()[..8]; var req = ITTestHelper.GenBusiReq(false, false); @@ -53,46 +69,553 @@ public async Task Execute_gPRC_Should_Succeed() } [Fact] - public async Task Execute_Success() + public async Task Execute_DoAndHttp_ShouldSuccess() { var provider = ITTestHelper.AddDtmGrpc(); var workflowFactory = provider.GetRequiredService(); var loggerFactory = provider.GetRequiredService(); WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); - Busi.BusiClient busiClient = new Busi.BusiClient(GrpcChannel.ForAddress(ITTestHelper.BuisgRPCUrlWithProtocol)); - string wfName1 = $"wf-simple-{Guid.NewGuid().ToString("D")[..8]}"; workflowGlobalTransaction.Register(wfName1, async (workflow, data) => { BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. local + workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("1. local rollback"); + await Task.CompletedTask; + }).Do(async (barrier) => { return await Task.FromResult<(byte[], Exception)>(("my result"u8.ToArray(), null)); }); + + // 2. http1, SAGA + HttpResponseMessage httpResult1 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http1 rollback"); + await workflow.NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + }).NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + + // 3. http2, TCC + HttpResponseMessage httpResult2 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http2 cancel"); + await workflow.NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + }).OnCommit(async (barrier) => + { + _testOutputHelper.WriteLine("4. http2 commit"); + // NOT must use workflow.NewRequest() + await workflow.NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + }).NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // BranchID Op Status + // 01 action succeed + // 02 action succeed + // 03 action succeed + // 03 commit succeed + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(4, trans.Branches.Count); // 1.Do x1, 2.http, saga x1, 3.Http tcc x2 + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + + // same gid again + result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(4, trans.Branches.Count); // 1.Do x1, 2.http, saga x1, 3.Http tcc x2 + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + } + + [Fact] + public async Task Execute_DoAndHttp_Failed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"wf-simple-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + // 1. local workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("1. local rollback"); + await Task.CompletedTask; + }).Do(async (barrier) => { return await Task.FromResult<(byte[], Exception)>(("my result"u8.ToArray(), null)); }); + + // 2. http1 + HttpResponseMessage httpResult1 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http1 rollback"); + await Task.CompletedTask; + }).NewRequest().GetAsync("http://localhost:5006/test-http-ok1"); + + // 3. http2 + HttpResponseMessage httpResult2 = await workflow.NewBranch().OnRollback(async (barrier) => + { + _testOutputHelper.WriteLine("4. http2 rollback"); + await Task.CompletedTask; + }).NewRequest().GetAsync("http://localhost:5006/409"); // 409 + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + await Assert.ThrowsAsync(async () => + { + byte[] _ = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + + // same gid again + await Assert.ThrowsAsync(async () => { await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); }); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + // 03 action failed + // 02 rollback succeed + // 01 rollback succeed + Assert.Equal(5, trans.Branches.Count); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("failed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + Assert.Equal("rollback", trans.Branches[4].Op); + Assert.Equal("succeed", trans.Branches[4].Status); + } + + [Fact] + public async Task Execute_DoAndGrpcSAGA_Should_Success() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_DoAndGrpcSAGA_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. local + workflow.NewBranch().OnRollback(async (barrier) => { _testOutputHelper.WriteLine("1. local rollback"); }).Do(async (barrier) => { return ("my result"u8.ToArray(), null); }); + + // 2. grpc1 + Busi.BusiClient busiClient = null; + var wf = workflow.NewBranch().OnRollback(async (barrier) => { await busiClient.TransInRevertAsync(request); + _testOutputHelper.WriteLine("2. grpc1 rollback"); + }); + busiClient = GetBusiClientWithWf(wf, provider); + await busiClient.TransOutAsync(request); + + // 3. grpc2 + wf = workflow.NewBranch().OnRollback(async (barrier) => + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("3. grpc2 rollback"); }); await busiClient.TransInAsync(request); - workflow.NewBranch().OnRollback(async (barrier) => + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); // 1.Do 2.grpc 3.grpc + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("succeed", trans.Branches[2].Status); + + // same gid again + result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + Assert.Equal("my result", Encoding.UTF8.GetString(result)); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); // 1.Do 2.Http 3.Http + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("succeed", trans.Branches[2].Status); + } + + [Fact] + public async Task Execute_DoAndGrpcSAGA_Should_Failed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_DoAndGrpcSAGA_Should_Failed)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. local + workflow.NewBranch().OnRollback(async (barrier) => { _testOutputHelper.WriteLine("1. local rollback"); }).Do(async (barrier) => + { + return await Task.FromResult<(byte[], Exception)>(("my result"u8.ToArray(), null)); + }); + + // 2. grpc1 + Busi.BusiClient busiClient = null; + var wf = workflow.NewBranch().OnRollback(async (barrier) => + { + await busiClient.TransInRevertAsync(request); + _testOutputHelper.WriteLine("2. grpc1 rollback"); + }); + busiClient = GetBusiClientWithWf(wf, provider); + Empty response1 = await busiClient.TransOutAsync(request); + + // 3. grpc2 + wf = workflow.NewBranch().OnRollback(async (barrier) => { await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("3. grpc2 rollback"); }); - await busiClient.TransOutAsync(request); + Empty response2 = await busiClient.TransInAsync(request); + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(outFailed: false, inFailed: true); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + await Assert.ThrowsAsync(async () => + { + byte[] _ = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("failed", trans.Transaction.Status); + // BranchID Op Status CreateTime UpdateTime Url + // 01 action succeed + // 02 action succeed + // 03 action failed + // 02 rollback succeed + // 01 rollback succeed + Assert.Equal(5, trans.Branches.Count); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[2].Op); + Assert.Equal("failed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[3].Op); + Assert.Equal("succeed", trans.Branches[3].Status); + Assert.Equal("rollback", trans.Branches[4].Op); + Assert.Equal("succeed", trans.Branches[4].Status); + + // same gid again + await Assert.ThrowsAsync(async () => + { + byte[] _ = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + } + + + [Fact] + public async Task Execute_GrpcTccAndDo_Should_Success() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_GrpcTccAndDo_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + Busi.BusiClient busiClient = null; + Workflow wf = workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await busiClient.TransOutConfirmAsync(request); + }) + .OnRollback(async (barrier) => // cancel + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("1. grpc1 cancel"); + }); + busiClient = GetBusiClientWithWf(wf, provider); // The construction of busiClient dependence on the Workflow instance, must ugly code + // try + await busiClient.TransOutTccAsync(request); + + // 2. local, maybe SAG, at the end, no need to write the reverse rollback. + workflow.NewBranch() + // .OnRollback(async (barrier) => + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => { return ("my result"u8.ToArray(), null); }); return await Task.FromResult("my result"u8.ToArray()); }); string gid = wfName1 + Guid.NewGuid().ToString()[..8]; var req = ITTestHelper.GenBusiReq(false, false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first byte[] result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); Assert.Equal("my result", Encoding.UTF8.GetString(result)); - string status = await ITTestHelper.GetTranStatus(gid); - Assert.Equal("succeed", status); - - // again + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action succeed + // 02 action succeed + // 01 commit succeed + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[2].Op); + + // same gid again result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); Assert.Equal("my result", Encoding.UTF8.GetString(result)); - status = await ITTestHelper.GetTranStatus(gid); - Assert.Equal("succeed", status); + trans = await dtmClient.Query(gid, CancellationToken.None); + Assert.Equal("succeed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("succeed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("commit", trans.Branches[2].Op); + } + + [Fact] + public async Task Execute_GrpcTccAndDo_Should_TryFailed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_GrpcTccAndDo_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + Busi.BusiClient busiClient = null; + Workflow wf = workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await busiClient.TransOutConfirmAsync(request); + }) + .OnRollback(async (barrier) => // cancel + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("1. grpc1 cancel"); + }); + busiClient = GetBusiClientWithWf(wf, provider); // busiClient reference Workflow instance + // try + await busiClient.TransOutTccAsync(request); + + // 2. local, it's the tail, rollback is NOT necessary + workflow.NewBranch() + // .OnRollback(async (barrier) => // rollback + // { + // _testOutputHelper.WriteLine("1. local rollback"); + // }) + .Do(async (barrier) => { return ("my result"u8.ToArray(), null); }); + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(outFailed: true, inFailed: false); // 1. trans out try failed + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + await Assert.ThrowsAsync(async () => + { + byte[] _ = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action failed + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("failed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + + // same gid again + await Assert.ThrowsAsync(async () => + { + byte[] _ = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + // DtmCommon.DtmFailureException: Status(StatusCode="Aborted", Detail="FAILURE") + // + // DtmCommon.DtmFailureException + // Status(StatusCode="Aborted", Detail="FAILURE") + // at Dtmworkflow.Workflow.Process(WfFunc2 handler, Byte[] data) in src/Dtmworkflow/Workflow.Imp.cs + // at Dtmworkflow.WorkflowGlobalTransaction.Execute(String name, String gid, Byte[] data, Boolean isHttp) in src/Dtmworkflow/WorkflowGlobalTransaction.cs + }); + + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action failed + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(1, trans.Branches.Count); + Assert.Equal("failed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + } + + [Fact] + public async Task Execute_GrpcTccAndDo_Should_DoFailed() + { + var provider = ITTestHelper.AddDtmGrpc(); + var workflowFactory = provider.GetRequiredService(); + var loggerFactory = provider.GetRequiredService(); + WorkflowGlobalTransaction workflowGlobalTransaction = new WorkflowGlobalTransaction(workflowFactory, loggerFactory); + + string wfName1 = $"{nameof(this.Execute_GrpcTccAndDo_Should_Success)}-{Guid.NewGuid().ToString("D")[..8]}"; + workflowGlobalTransaction.Register(wfName1, async (workflow, data) => + { + BusiReq request = JsonConvert.DeserializeObject(Encoding.UTF8.GetString(data)); + + // 1. grpc1 TCC + Busi.BusiClient busiClient = null; + Workflow wf = workflow.NewBranch() + .OnCommit(async (barrier) => // confirm + { + await busiClient.TransOutConfirmAsync(request); + }) + .OnRollback(async (barrier) => // cancel + { + await busiClient.TransOutRevertAsync(request); + _testOutputHelper.WriteLine("1. grpc1 cancel"); + }); + busiClient = GetBusiClientWithWf(wf, provider); // busiClient reference Workflow instance + // try + await busiClient.TransOutTccAsync(request); + + // 2. local, it's the tail, rollback is NOT necessary + (byte[] doResult, Exception ex) = await workflow.NewBranch() + .OnRollback(async (barrier) => // rollback + { + _testOutputHelper.WriteLine("1. local rollback"); + }) + .Do(async (barrier) => + { + // throw new DtmFailureException("db do failed"); // can't throw + var ex = new DtmFailureException("db do failed"); + return ("my result"u8.ToArray(), ex); + }); + if (ex != null) + throw ex; + + return await Task.FromResult("my result"u8.ToArray()); + }); + + string gid = wfName1 + Guid.NewGuid().ToString()[..8]; + var req = ITTestHelper.GenBusiReq(outFailed: false, inFailed: false); + + DtmClient dtmClient = new DtmClient(provider.GetRequiredService(), provider.GetRequiredService>()); + TransGlobal trans; + + // first + await Assert.ThrowsAsync(async () => + { + byte[] _ = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + }); + + trans = await dtmClient.Query(gid, CancellationToken.None); + // BranchID Op Status + // 01 action succeed + // 02 action failed + // 01 rollback succeed + Assert.Equal("failed", trans.Transaction.Status); + Assert.Equal(3, trans.Branches.Count); + Assert.Equal("succeed", trans.Branches[0].Status); + Assert.Equal("action", trans.Branches[0].Op); + Assert.Equal("failed", trans.Branches[1].Status); + Assert.Equal("action", trans.Branches[1].Op); + Assert.Equal("succeed", trans.Branches[2].Status); + Assert.Equal("rollback", trans.Branches[2].Op); + + // same gid again + await Assert.ThrowsAsync(async () => + { + var result = await workflowGlobalTransaction.Execute(wfName1, gid, Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(req))); + // DtmCommon.DtmFailureException + // db do failed + // at Dtmworkflow.Workflow.Process(WfFunc2 handler, Byte[] data) in src/Dtmworkflow/Workflow.Imp.cs + // at Dtmworkflow.WorkflowGlobalTransaction.Execute(String name, String gid, Byte[] data, Boolean isHttp) in src/Dtmworkflow/WorkflowGlobalTransaction.cs + // at Dtmgrpc.IntegrationTests.WorkflowGrpcTest.Execute_GrpcTccAndDo_Should_DoFailed() in tests/Dtmgrpc.IntegrationTests/WorkflowGrpcTest.cs + }); + } + + private static Busi.BusiClient GetBusiClientWithWf(Workflow wf, ServiceProvider provider) + { + var loggerFactory = provider.GetRequiredService(); + var channel = GrpcChannel.ForAddress(ITTestHelper.BuisgRPCUrlWithProtocol); + var logger = loggerFactory.CreateLogger(); + var interceptor = new WorkflowGrpcInterceptor(wf, logger); // inject client interceptor, and workflow instance + var callInvoker = channel.Intercept(interceptor); + Busi.BusiClient busiClient = new Busi.BusiClient(callInvoker); + return busiClient; } } } \ No newline at end of file diff --git a/tests/protos/busi.proto b/tests/protos/busi.proto index 932ef1e..b992a42 100644 --- a/tests/protos/busi.proto +++ b/tests/protos/busi.proto @@ -16,6 +16,35 @@ message BusiReq { message BusiReply { string message = 1; } + +enum OperateType { + Try = 0; + Confirm = 1; + Cancel = 2; +} + +message DtmBranchTransInfo { + string Gid = 1; + string TransType = 2; + string BranchId = 3; + string Op = 4; + string Dtm = 5; +} + +enum OperateResult { + Success = 0; + Fail = 1; +} +message StreamRequest { + OperateType OperateType = 1; + DtmBranchTransInfo DtmBranchTransInfo = 2; + BusiReq busiRequest = 3; +} +message StreamReply { + OperateType OperateType = 1; + string Message = 3; +} + // The dtm service definition. service Busi { rpc TransIn(BusiReq) returns (google.protobuf.Empty) {} @@ -47,4 +76,8 @@ service Busi { rpc QueryPrepared(BusiReq) returns (BusiReply) {} rpc QueryPreparedMySqlReal(BusiReq) returns (google.protobuf.Empty) {} rpc QueryPreparedRedis(BusiReq) returns (google.protobuf.Empty) {} + + // stream TCC + rpc StreamTransInTcc(stream StreamRequest) returns (stream StreamReply) {} + rpc StreamTransOutTcc(stream StreamRequest) returns (stream StreamReply) {} } \ No newline at end of file