Skip to content

Feature/workflow record #101

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c3da18b
feat(OpDict): add rollback operation and update comments
wooln Dec 24, 2024
c7ba719
feat(workflow): add support for HTTP branches interceptor in workflows
wooln Mar 12, 2025
f872c46
ci: update DTM server UpdateBranchSync: 1
wooln Mar 12, 2025
b0f192d
test: simplify workflow test
wooln Mar 13, 2025
dc5300f
test(Dtmgrpc.IntegrationTests): enhance workflow test to cover SAGA a…
wooln Mar 13, 2025
9e49ec2
refactor(workflow): fix github action CI warnings
wooln Mar 13, 2025
07dc028
test(Dtmgrpc.IntegrationTests): add test for workflow grpc success
wooln Mar 13, 2025
5fd5cd2
Execute_DoAndGrpc_Should_Success勉强通过
wooln Mar 13, 2025
16dd6ec
feat: workflow interceptor
wooln Mar 14, 2025
50e9f43
test(dtm): Add tests for Do and TCC workflow success and failure
wooln Apr 17, 2025
db8efab
refactor: rollback AddHttpClient method in ServiceCollectionExtensions
wooln Apr 21, 2025
b2cb647
feat(BusiGrpcService): implement stream TCC gRPC service and add basi…
wooln Apr 22, 2025
8786c31
test(dtm): add stream grpc tcc and do integration tests for workflow
wooln Apr 23, 2025
b10f066
refactor(Dtmgrpc.IntegrationTests): improve gRPC streaming test with …
wooln Apr 25, 2025
0f49350
refactor(grpc): stream unt test tcc
wooln Apr 25, 2025
4ac3642
替换grpc stream的双向流拦截器的实现,改为直接用workflow.Do实现分支事务实录
wooln Apr 28, 2025
1f8d88f
test(Dtmgrpc.IntegrationTests): fix expected DtmFailureException in m…
wooln May 6, 2025
c838892
refactor(Dtmgrpc.IntegrationTests): Fix MyGrpcProcesser multi-threade…
wooln May 6, 2025
387f4ad
feat(test): add DtmBranchTransInfo to StreamRequest and update relate…
wooln May 6, 2025
5379b89
refactor(BusiGrpcService): Add branchBarrier.Call demo with Transacti…
wooln May 6, 2025
a2fa337
fix: fix workflow http interceptor missed adding query strings
wooln May 6, 2025
7719c96
feat(BusiGrpcService): refactor HTTP tests and integrate Swagger
wooln May 7, 2025
ea28bcd
feature(BusiGrpcService): streamline gRPC service methods BranchBarri…
wooln May 7, 2025
dc7f64e
refactor(tests): Chinese comments to English
wooln May 7, 2025
c0fd208
test: simplify BusiClient initialization
wooln May 7, 2025
68871fb
refactor: remove NET5_0_OR_GREATER conditional compilation(project is…
wooln May 9, 2025
420f454
sample: sample for workflow ExecuteByQS(http)
wooln Dec 27, 2024
6cef550
(DrefactortmCommon): modify BranchBarrier.Call method to return execu…
wooln May 12, 2025
b2fa155
fix(BusiGrpcService): handle null compensation duplicate or pend in s…
wooln May 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/build_and_it.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ jobs:
tar -xvf dtm_1.18.0_linux_amd64.tar.gz
pwd
mkdir /home/runner/work/client-csharp/client-csharp/logs
nohup ./dtm > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 &
echo "UpdateBranchSync: 1" > ./config.yml
nohup ./dtm -c ./config.yml > /home/runner/work/client-csharp/client-csharp/logs/dtm.log 2>&1 &
sleep 5
curl "127.0.0.1:36789/api/dtmsvr/newGid"
- name: Setup Busi Service
Expand Down
81 changes: 79 additions & 2 deletions samples/DtmSample/Controllers/WfTestController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,10 @@
using Microsoft.Extensions.Options;
using System;
using System.IO;
using System.Diagnostics;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Text;
using System.Text.Json;
using System.Text.Unicode;
using System.Threading;
using System.Threading.Tasks;
using Exception = System.Exception;
Expand Down Expand Up @@ -257,5 +255,84 @@ public async Task<IActionResult> TccRollBack(CancellationToken cancellationToken
return Ok(TransResponse.BuildFailureResponse());
}
}


private static readonly string wfNameForResume = "wfNameForResume";

/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
[HttpPost("wf-crash")]
public async Task<IActionResult> Crash(CancellationToken cancellationToken)
{
if (!_globalTransaction.Exists(wfNameForResume))
{
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
{
var content = new ByteArrayContent(data);
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");

var outClient = wf.NewBranch().NewRequest();
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);

// the first branch succeed, then crashed, the dtm server will call back the flowing wf-call-back
// manual stop application
Environment.Exit(0);

var inClient = wf.NewBranch().NewRequest();
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);

return null;
});
}

var req = JsonSerializer.Serialize(new TransRequest("1", -30));
await _globalTransaction.Execute(wfNameForResume, Guid.NewGuid().ToString("N"), Encoding.UTF8.GetBytes(req), true);

return Ok(TransResponse.BuildSucceedResponse());
}

[HttpPost("wf-resume")]
public async Task<IActionResult> WfResume(CancellationToken cancellationToken)
{
try
{
if (!_globalTransaction.Exists(wfNameForResume))
{
// register again after manual crash by Environment.Exit(0);
_globalTransaction.Register(wfNameForResume, async (wf, data) =>
{
var content = new ByteArrayContent(data);
content.Headers.ContentType = new MediaTypeHeaderValue("application/json");

var outClient = wf.NewBranch().NewRequest();
await outClient.PostAsync(_settings.BusiUrl + "/TransOut", content);

var inClient = wf.NewBranch().NewRequest();
await inClient.PostAsync(_settings.BusiUrl + "/TransIn", content);

return null;
});
}

// prepared call ExecuteByQS
using var bodyMemoryStream = new MemoryStream();
await Request.Body.CopyToAsync(bodyMemoryStream, cancellationToken);
byte[] bytes = bodyMemoryStream.ToArray();
string body = Encoding.UTF8.GetString(bytes);
_logger.LogDebug($"body: {body}");

await _globalTransaction.ExecuteByQS(Request.Query, bodyMemoryStream.ToArray());

return Ok(TransResponse.BuildSucceedResponse());
}
catch (Exception ex)
{
_logger.LogError(ex, "Workflow Error");
return Ok(TransResponse.BuildFailureResponse());
}
}
}
}
11 changes: 6 additions & 5 deletions src/DtmCommon/Barrier/BranchBarrier.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@

public int BarrierID { get; set; }

public async Task Call(DbConnection db, Func<DbTransaction, Task> busiCall)
public async Task<(bool done, string reason)> Call(DbConnection db, Func<DbTransaction, Task> busiCall)
{
this.BarrierID = this.BarrierID + 1;
var bid = this.BarrierID.ToString().PadLeft(2, '0');
Expand Down Expand Up @@ -91,7 +91,7 @@
#else
await tx.CommitAsync();
#endif
return;
return (false, isNullCompensation ? "isNullCompensation" : "isDuplicateOrPend");
}

await busiCall.Invoke(tx);
Expand All @@ -118,9 +118,10 @@

throw;
}
return (true, string.Empty);
}

public async Task Call(DbConnection db, Func<Task> busiCall, TransactionScopeOption transactionScope = TransactionScopeOption.Required, IsolationLevel isolationLevel = IsolationLevel.Serializable)
public async Task<(bool done, string reason)> Call(DbConnection db, Func<Task> busiCall, TransactionScopeOption transactionScope = TransactionScopeOption.Required, IsolationLevel isolationLevel = IsolationLevel.Serializable)
{
this.BarrierID = this.BarrierID + 1;
var bid = this.BarrierID.ToString().PadLeft(2, '0');
Expand Down Expand Up @@ -158,7 +159,7 @@
if (isNullCompensation || isDuplicateOrPend)
{
Logger?.LogInformation("Will not exec busiCall, isNullCompensation={isNullCompensation}, isDuplicateOrPend={isDuplicateOrPend}", isNullCompensation, isDuplicateOrPend);
return;
return (false, isNullCompensation ? "isNullCompensation" : "isDuplicateOrPend");
}
await busiCall.Invoke();
scope.Complete();
Expand All @@ -174,7 +175,7 @@
throw;
}
}

return (true, string.Empty);

Check warning on line 178 in src/DtmCommon/Barrier/BranchBarrier.cs

View check run for this annotation

Codecov / codecov/patch

src/DtmCommon/Barrier/BranchBarrier.cs#L178

Added line #L178 was not covered by tests
}

public async Task<string> QueryPrepared(DbConnection db)
Expand Down
5 changes: 3 additions & 2 deletions src/DtmCommon/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,9 @@ public class Barrier

public static readonly Dictionary<string, string> OpDict = new Dictionary<string, string>()
{
{ "cancel", "try" },
{ "compensate", "action" },
{ "cancel", "try" }, // tcc
{ "compensate", "action" }, // saga
{ "rollback", "action" }, // workflow
};
public static readonly string REDIS_LUA_CheckAdjustAmount = @" -- RedisCheckAdjustAmount
local v = redis.call('GET', KEYS[1])
Expand Down
3 changes: 2 additions & 1 deletion src/Dtmcli/Constant.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ internal static class Constant
{
internal const string DtmClientHttpName = "dtmClient";
internal const string BranchClientHttpName = "branchClient";

internal const string WorkflowBranchClientHttpName = "WF";

internal static class Request
{
internal const string CONTENT_TYPE = "application/json";
Expand Down
4 changes: 2 additions & 2 deletions src/Dtmcli/DtmClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ public async Task<TransGlobal> Query(string gid, CancellationToken cancellationT
var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName);
var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false);
var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent);
DtmImp.Utils.CheckStatusCode(response.StatusCode);
return JsonSerializer.Deserialize<TransGlobal>(dtmContent, _jsonOptions);
}

Expand All @@ -167,7 +167,7 @@ public async Task<string> QueryStatus(string gid, CancellationToken cancellation
var client = _httpClientFactory.CreateClient(Constant.DtmClientHttpName);
var response = await client.GetAsync(url, cancellationToken).ConfigureAwait(false);
var dtmContent = await response.Content.ReadAsStringAsync().ConfigureAwait(false);
DtmImp.Utils.CheckStatus(response.StatusCode, dtmContent);
DtmImp.Utils.CheckStatusCode(response.StatusCode);
var graph = JsonSerializer.Deserialize<TransGlobalForStatus>(dtmContent, _jsonOptions);
return graph.Transaction == null
? string.Empty
Expand Down
8 changes: 8 additions & 0 deletions src/Dtmcli/DtmImp/Utils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@ public static void CheckStatus(HttpStatusCode status, string dtmResult)
}
}

public static void CheckStatusCode(HttpStatusCode status)
{
if (status != HttpStatusCode.OK)
{
throw new DtmException(string.Format(CheckStatusMsgFormat, status.ToString(), string.Empty));
}
}

/// <summary>
/// OrString return the first not null or not empty string
/// </summary>
Expand Down
6 changes: 3 additions & 3 deletions src/Dtmcli/TransGlobal.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class DtmTransaction
{
[JsonPropertyName("id")] public int Id { get; set; }

[JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; }
[JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; }

[JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; }

Expand Down Expand Up @@ -64,9 +64,9 @@ public class DtmBranch
{
[JsonPropertyName("id")] public int Id { get; set; }

[JsonPropertyName("create_time")] public DateTimeOffset CreateTime { get; set; }
[JsonPropertyName("create_time")] public DateTimeOffset? CreateTime { get; set; }

[JsonPropertyName("update_time")] public DateTimeOffset UpdateTime { get; set; }
[JsonPropertyName("update_time")] public DateTimeOffset? UpdateTime { get; set; }

[JsonPropertyName("gid")] public string Gid { get; set; }

Expand Down
18 changes: 17 additions & 1 deletion src/Dtmworkflow/ServiceCollectionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services
services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
services.TryAddSingleton<WorkflowGlobalTransaction>();

// AddHttpClient(services);

return services;
}

Expand All @@ -33,8 +35,22 @@ public static IServiceCollection AddDtmWorkflow(this IServiceCollection services

services.TryAddSingleton<IWorkflowFactory, WorkflowFactory>();
services.TryAddSingleton<WorkflowGlobalTransaction>();

// AddHttpClient(services);

return services;
}

// private static void AddHttpClient(IServiceCollection services /*, DtmOptions options*/)
// {
// services.AddHttpClient(Dtmcli.Constant.WorkflowBranchClientHttpName, client =>
// {
// // TODO DtmOptions
// // client.Timeout = TimeSpan.FromMilliseconds(options.BranchTimeout);
// }).AddHttpMessageHandler<WorkflowHttpInterceptor>();
//
// // TODO how to inject workflow instance?
// services.AddTransient<WorkflowHttpInterceptor>();
// }
}
}
}
24 changes: 12 additions & 12 deletions src/Dtmworkflow/Workflow.Imp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@
}

err = Utils.GrpcError2DtmError(err);

if (err != null && err is not DtmCommon.DtmFailureException) throw err;

try
Expand Down Expand Up @@ -164,23 +164,23 @@
};
}

private Exception StepResultToGrpc(StepResult r, IMessage reply)
internal Exception StepResultToGrpc(StepResult r, IMessage reply)
{
if (r.Error == null && r.Status == DtmCommon.Constant.StatusSucceed)
{
// Check

// TODO Check
// dtmgimp.MustProtoUnmarshal(s.Data, reply.(protoreflect.ProtoMessage));
}

return r.Error;
}

private StepResult StepResultFromGrpc(IMessage reply, Exception err)
internal StepResult StepResultFromGrpc(IMessage reply, Exception err)
{
var sr = new StepResult
{
// GRPCError2DtmError
Error = null,
// TODO GRPCError2DtmError
Error = Utils.GrpcError2DtmError(err),

Check warning on line 183 in src/Dtmworkflow/Workflow.Imp.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/Workflow.Imp.cs#L182-L183

Added lines #L182 - L183 were not covered by tests
};

sr.Status = WfErrorToStatus(sr.Error);
Expand All @@ -196,7 +196,7 @@
return sr;
}

private HttpResponseMessage StepResultToHttp(StepResult r)
internal HttpResponseMessage StepResultToHttp(StepResult r)
{
if (r.Error != null)
{
Expand All @@ -206,7 +206,7 @@
return Utils.NewJSONResponse(HttpStatusCode.OK, r.Data);
}

private StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err)
internal StepResult StepResultFromHTTP(HttpResponseMessage resp, Exception err)
{
var sr = new StepResult
{
Expand All @@ -215,7 +215,7 @@

if (err == null)
{
// HTTPResp2DtmError
(sr.Data, sr.Error) = Utils.HTTPResp2DtmError(resp); // TODO go used this.Options.HTTPResp2DtmError(resp), for custom

Check warning on line 218 in src/Dtmworkflow/Workflow.Imp.cs

View check run for this annotation

Codecov / codecov/patch

src/Dtmworkflow/Workflow.Imp.cs#L218

Added line #L218 was not covered by tests
sr.Status = WfErrorToStatus(sr.Error);
}

Expand All @@ -237,9 +237,9 @@
}


private async Task<StepResult> RecordedDo(Func<DtmCommon.BranchBarrier, Task<StepResult>> fn)
internal async Task<StepResult> RecordedDo(Func<DtmCommon.BranchBarrier, Task<StepResult>> fn)
{
var sr = await this.RecordedDoInner(fn);
StepResult sr = await this.RecordedDoInner(fn);

// do not compensate the failed branch if !CompensateErrorBranch
if (this.Options.CompensateErrorBranch && sr.Status == DtmCommon.Constant.StatusFailed)
Expand Down
5 changes: 4 additions & 1 deletion src/Dtmworkflow/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net.Http;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;

namespace Dtmworkflow
{
Expand Down Expand Up @@ -32,7 +34,8 @@ public Workflow(IDtmClient httpClient, IDtmgRPCClient grpcClient, Dtmcli.IBranch

public System.Net.Http.HttpClient NewRequest()
{
return _httpClient.GetHttpClient("WF");
// return _httpClient.GetHttpClient("WF");
return new HttpClient(new WorkflowHttpInterceptor(this));
}

/// <summary>
Expand Down
2 changes: 0 additions & 2 deletions src/Dtmworkflow/WorkflowGlobalTransaction.cs
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,13 @@ public void Register(string name, WfFunc2 handler, params Action<Workflow>[] cus
});
}

#if NET5_0_OR_GREATER
public async Task ExecuteByQS(Microsoft.AspNetCore.Http.IQueryCollection query, byte[] body)
{
_ = query.TryGetValue("gid", out var gid);
_ = query.TryGetValue("op", out var op);

await Execute(op, gid, body, true);
}
#endif

#if DEBUG // for sample only
public bool Exists(string name)
Expand Down
Loading
Loading