@@ -31,135 +31,143 @@ public record CodeflowGraphCommit(
3131 DateTimeOffset CommitDate ,
3232 string Author ,
3333 string Description ,
34- CodeflowGraphCommit ? IncomingCodeflow ) ;
34+ string ? IncomingCodeflowSha ) ;
3535
36- public class CodeflowHistoryManager : ICodeflowHistoryManager
36+ public class CodeflowHistoryManager (
37+ IRemoteFactory remoteFactory ,
38+ IConnectionMultiplexer connection ) : ICodeflowHistoryManager
3739{
38- private readonly IRedisCacheFactory _redisCacheFactory ;
39- private readonly IRemoteFactory _remoteFactory ;
40+ private readonly IRemoteFactory _remoteFactory = remoteFactory ;
41+ private readonly IConnectionMultiplexer _connection = connection ;
4042
41- public CodeflowHistoryManager (
42- IRedisCacheFactory cacheFactory ,
43- IRemoteFactory remoteFactory )
43+ public async Task < CodeflowHistory ? > GetCachedCodeflowHistoryAsync ( string subscriptionId , int commitFetchCount )
4444 {
45- _redisCacheFactory = cacheFactory ;
46- _remoteFactory = remoteFactory ;
47- }
45+ if ( commitFetchCount < 1 )
46+ {
47+ throw new ArgumentOutOfRangeException ( nameof ( commitFetchCount ) ) ;
48+ }
4849
49- public async Task < CodeflowHistory ? > GetCachedCodeflowHistoryAsync ( Subscription subscription )
50- {
51- string id = subscription . Id . ToString ( ) ! ;
52- var cache = _redisCacheFactory . Create < CodeflowGraphCommit > ( id ) ;
53- return await cache . TryGetStateAsync ( ) ;
54- }
50+ var cache = _connection . GetDatabase ( ) ;
5551
56- public async Task < CodeflowHistory ? > GetCachedCodeflowHistoryAsync (
57- Subscription subscription ,
58- string commitSha ,
59- int commitFetchCount )
60- {
61- // todo this method returns the codeflow history starting from commitSha.
62- // It only reads from redis and never modifies the cache
63- }
52+ var commitShas = await cache . SortedSetRangeByRankWithScores (
53+ key : subscriptionId ,
54+ start : 0 ,
55+ stop : commitFetchCount - 1 ,
56+ order : Order . Descending ( ) )
57+ . Select ( e => ( string ) e . Element )
58+ . ToList ( ) ;
6459
60+ return await cache . StringGetAsync ( commitShas )
61+ . Select ( )
62+ }
6563
66- // get cached commits
67- // fetched fresh commits & fresh codeflows
68- // erase old if no connection
69- // persist new
7064 public async Task < CodeflowHistory ? > FetchLatestCodeflowHistoryAsync (
7165 Subscription subscription ,
7266 int commitFetchCount )
7367 {
74- //todo acquire lock on the redis Zset here
75- // (or not ? Maybe the unique commit SHA as the zset key ensures that commits can't be added twice)
76- // in that case, we'd only have to check that when a write fails due to the commit already being cached,
77- // we don't fail the flow
7868 var cachedCommits = await GetCachedCodeflowHistoryAsync ( subscription . Id ) ;
7969
8070 var remote = await _remoteFactory . CreateRemoteAsync ( subscription . TargetRepository ) ;
8171
82- latestCachedCommitSha = cachedCommits ? . Commits . FirstOrDefault ( ) ? . CommitSha ;
72+ latestCachedCommitSha = cachedCommits ?
73+ . Commits
74+ . FirstOrDefault ( ) ?
75+ . CommitSha ;
8376
84- var latestCommits = await remote . FetchNewerRepoCommitsAsync (
77+ var newCommits = await remote . FetchNewerRepoCommitsAsync (
8578 subscription . TargetBranch ,
8679 subscription . TargetBranch ,
8780 latestCachedCommitSha ,
8881 commitFetchCount ) ;
8982
90- if ( latestCommits . Count == commitFetchCount &&
91- latestCommits . LastOrDefault ( ) ? . CommitSha != latestCachedCommitSha )
92- {
93- // we have a gap in the history - throw away cache because we can't form a continuous history
94- cachedCommits = [ ] ;
95- }
96- else
83+ if ( newCommits . Count == commitFetchCount
84+ && latestCommits . LastOrDefault ( ) ? . CommitSha != latestCachedCommitSha )
9785 {
98- latestCommits = latestCommits
99- . Where ( commit => commit . CommitSha != latestCachedCommitSha )
100- . ToList ( ) ;
86+ // there's a gap between the new and cached commits. clear the cache and start from scratch.
87+ ClearCodeflowCacheAsync ( subscription . Id ) ;
10188 }
10289
103- var latestCachedCodeflow = cachedCommits ? . Commits . FirstOrDefault (
104- commit => commit . IncomingCodeflows != null ) ;
90+ newCommits . Remove ( latestCachedCommitSha ) ;
10591
106- var codeFlows = await FetchLatestIncomingCodeflows (
92+ var codeFlows = await EnrichCommitsWithCodeflowDataAsync (
10793 subscription . TargetRepository ,
10894 subscription . TargetBranch ,
10995 ! string . IsNullOrEmpty ( subscription . TargetDirectory ) ,
11096 latestCommits ,
11197 remote ) ;
11298
113- foreach ( var commit in latestCommits )
114- {
115- string ? sourceCommitSha = codeflows . GetCodeflowSourceCommit ( commit . CommitSha ) ;
116- commit . IncomingCodeflow = sourceCommitSha ;
117- }
118-
119- // todo cache fresh commits and release lock on the Zset
120- await CacheCommits ( latestCommits ) ;
99+ await CacheCommitsAsync ( latestCommits ) ;
121100
122101 return null ;
123102 }
124103
125- private async Task < GraphCodeflows > FetchLatestIncomingCodeflows (
104+ private async Task < GraphCodeflows > EnrichCommitsWithCodeflowDataAsync (
126105 string repo ,
127106 string branch ,
128107 bool isForwardFlow ,
129- List < CodeflowGraphCommit > latestCommits ,
130- IRemote ? remote )
108+ List < CodeflowGraphCommit > commits )
131109 {
132- remote ??= await _remoteFactory . CreateRemoteAsync ( repo ) ;
133-
134- string ? lastFlowSha = null ;
135- string ? lastCachedFlowSha = latestCommits
136- . FirstOrDefault ( commit => commit . IncomingCodeflow != null )
137- ? . IncomingCodeflow
138- ? . CommitSha ;
139-
140- while ( last )
141- if ( isForwardFlow )
110+ if ( commits . Count == 0 )
111+ {
112+ return [ ] ;
113+ }
114+
115+ remote = await _remoteFactory . CreateRemoteAsync ( repo ) ;
116+
117+ var lastCommitSha = commits
118+ . First ( )
119+ . CommitSha ;
120+
121+ var commitLookups = commits . ToDictionary ( c => c . CommitSha , c => c ) ;
122+
123+ while ( true )
124+ {
125+ var lastFlow = isForwardFlow
126+ ? await _remoteFactory . GetLastVmrIncomingCodeflowAsync ( branch , lastCommitSha )
127+ : await _remoteFactory . GetLastRepoIncomingCodeflowAsync ( branch , lastCommitSha ) ;
128+
129+ if ( commitLookups . Contains ( lastFlow . TargetCommitSha ) )
142130 {
143- var lastFlow = remote . GetVmrLastIncomingCodeflowAsync ( branch , latestCachedCommit ? . CommitSha ) ;
131+ commitLookups [ lastFlow . TargetCommitSha ] . IncomingCodeflowSha = lastFlow . SourceCommitSha ;
132+ commitLookups . Remove ( lastFlow . TargetCommitSha ) ; // prevent the possibility of infinite loops
133+ lastCommitSha = lastFlow . TargetCommitSha ;
144134 }
145135 else
146136 {
147- var lastFlow = remote . GetRepoLastIncomingCodeflowAsync ( branch , latestCachedCommit ? . CommitSha ) ;
137+ break ;
148138 }
149-
150-
151- return null ;
139+ }
140+ return commits ;
152141 }
153142
154- private async Task CacheCommits ( List < CodeflowGraphCommit > commits )
143+ private async Task CacheCommitsAsync (
144+ string subscriptionId ,
145+ List < CodeflowGraphCommit > commits ,
146+ int latestCachedCommitScore )
155147 {
156- // Cache the commits as part of the subscription's redis ZSet of CodeflowGraphCommit objects
157148 if ( commits . Count == 0 )
158149 {
159150 return ;
160151 }
161- var cache = _redisCacheFactory . Create < CodeflowGraphCommit > ( subscription . Id . ToString ( ) ! ) ;
162- await cache . SetStateAsync ( new CodeflowHistory ( commits , codeflows ) ) ;
152+ var cache = _connection . GetDatabase ( ) ;
153+
154+ int i = latestCachedCommitScore ?? 0 ;
155+
156+ var sortedSetEntries = commits
157+ . Select ( c => new SortedSetEntry ( c . CommitSha , i ++ ) )
158+ . ToArray ( ) ;
159+
160+ await cache . SortedSetAddAsync ( subscriptionId , sortedSetEntries ) ;
161+
162+ // todo key must either be unique to mapping, or contain last flow info for all mappings
163+ // ..... or not! any one single commit is relevant only to one mapping
164+ var commitGraphEntries = commits
165+ . Select ( c => new KeyValuePair < string , CodeflowGraphCommit > ( "CodeflowGraphCommit_" + c . CommitSha , c ) )
166+ . ToArray ( ) ;
167+
168+ await cache . StringSetAsync ( commits ) ;
169+
170+ ClearCacheTail ( ) ; // remove any elements after the 3000th or so?
163171 }
164172}
165173
@@ -180,5 +188,5 @@ class GraphCodeflows
180188 return sourceCommit ;
181189 }
182190 return null ;
183- }
191+ }
184192}
0 commit comments