1+ //-----------------------------------------------------------------------
2+ // <copyright file="RememberEntitiesFailureSpec.cs" company="Akka.NET Project">
3+ // Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
4+ // Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
5+ // </copyright>
6+ //-----------------------------------------------------------------------
7+
8+ using System ;
9+ using System . Collections . Generic ;
10+ using System . Collections . Immutable ;
11+ using System . Linq ;
12+ using System . Threading ;
13+ using System . Threading . Tasks ;
14+ using Akka . Actor ;
15+ using Akka . Actor . Internal ;
16+ using Akka . Cluster . Sharding . Internal ;
17+ using Akka . Cluster . Tools . Singleton ;
18+ using Akka . Configuration ;
19+ using Akka . Event ;
20+ using Akka . TestKit ;
21+ using Akka . Util ;
22+ using Akka . Util . Internal ;
23+ using FluentAssertions ;
24+ using FluentAssertions . Extensions ;
25+ using Xunit ;
26+ using Xunit . Abstractions ;
27+
28+ namespace Akka . Cluster . Sharding . Tests ;
29+
30+ public class RememberEntitiesSupervisionStrategyDecisionSpec : AkkaSpec
31+ {
32+ private sealed record EntityEnvelope ( long Id , object Payload ) ;
33+
34+ private class ConstructorFailActor : ActorBase
35+ {
36+ private readonly ILoggingAdapter _log = Context . GetLogger ( ) ;
37+
38+ public ConstructorFailActor ( )
39+ {
40+ throw new Exception ( "EXPLODING CONSTRUCTOR!" ) ;
41+ }
42+
43+ protected override bool Receive ( object message )
44+ {
45+ _log . Info ( "Msg {0}" , message ) ;
46+ Sender . Tell ( $ "ack { message } ") ;
47+ return true ;
48+ }
49+ }
50+
51+ private class PreStartFailActor : ActorBase
52+ {
53+ private readonly ILoggingAdapter _log = Context . GetLogger ( ) ;
54+
55+ protected override void PreStart ( )
56+ {
57+ base . PreStart ( ) ;
58+ throw new Exception ( "EXPLODING PRE-START!" ) ;
59+ }
60+
61+ protected override bool Receive ( object message )
62+ {
63+ _log . Info ( "Msg {0}" , message ) ;
64+ Sender . Tell ( $ "ack { message } ") ;
65+ return true ;
66+ }
67+ }
68+
69+ private sealed class TestMessageExtractor : IMessageExtractor
70+ {
71+ public string EntityId ( object message )
72+ => message switch
73+ {
74+ EntityEnvelope env => env . Id . ToString ( ) ,
75+ _ => null
76+ } ;
77+
78+ public object EntityMessage ( object message )
79+ => message switch
80+ {
81+ EntityEnvelope env => env . Payload ,
82+ _ => message
83+ } ;
84+
85+ public string ShardId ( object message )
86+ => message switch
87+ {
88+ EntityEnvelope msg => msg . Id . ToString ( ) ,
89+ _ => null
90+ } ;
91+
92+ public string ShardId ( string entityId , object messageHint = null )
93+ => entityId ;
94+ }
95+
96+ private class FakeShardRegion : ReceiveActor
97+ {
98+ private readonly ClusterShardingSettings _settings ;
99+ private readonly Props _entityProps ;
100+ private IActorRef ? _shard ;
101+
102+ public FakeShardRegion ( ClusterShardingSettings settings , Props entityProps )
103+ {
104+ _settings = settings ;
105+ _entityProps = entityProps ;
106+
107+ Receive < ShardInitialized > ( _ =>
108+ {
109+ // no-op
110+ } ) ;
111+ Receive < ShardRegion . StartEntity > ( msg =>
112+ {
113+ _shard . Forward ( msg ) ;
114+ } ) ;
115+ }
116+
117+ protected override void PreStart ( )
118+ {
119+ base . PreStart ( ) ;
120+ var provider = new FakeStore ( _settings , "cats" ) ;
121+
122+ var props = Props . Create ( ( ) => new Shard (
123+ "cats" ,
124+ "shard-1" ,
125+ _ => _entityProps ,
126+ _settings ,
127+ new TestMessageExtractor ( ) ,
128+ PoisonPill . Instance ,
129+ provider ,
130+ null
131+ ) ) ;
132+ _shard = Context . ActorOf ( props ) ;
133+ }
134+ }
135+
136+ private class ShardStoreCreated
137+ {
138+ public ShardStoreCreated ( IActorRef store , string shardId )
139+ {
140+ Store = store ;
141+ ShardId = shardId ;
142+ }
143+
144+ public IActorRef Store { get ; }
145+ public string ShardId { get ; }
146+ }
147+
148+ private class CoordinatorStoreCreated
149+ {
150+ public CoordinatorStoreCreated ( IActorRef store )
151+ {
152+ Store = store ;
153+ }
154+
155+ public IActorRef Store { get ; }
156+ }
157+
158+ private class FakeStore : IRememberEntitiesProvider
159+ {
160+ public FakeStore ( ClusterShardingSettings settings , string typeName )
161+ {
162+ }
163+
164+ public Props ShardStoreProps ( string shardId )
165+ {
166+ return FakeShardStoreActor . Props ( shardId ) ;
167+ }
168+
169+ public Props CoordinatorStoreProps ( )
170+ {
171+ return FakeCoordinatorStoreActor . Props ( ) ;
172+ }
173+ }
174+
175+ private class FakeShardStoreActor : ActorBase , IWithTimers
176+ {
177+ public static Props Props ( string shardId ) => Actor . Props . Create ( ( ) => new FakeShardStoreActor ( shardId ) ) ;
178+
179+ private readonly string _shardId ;
180+ private readonly ILoggingAdapter _log = Context . GetLogger ( ) ;
181+
182+ public FakeShardStoreActor ( string shardId )
183+ {
184+ _shardId = shardId ;
185+ Context . System . EventStream . Publish ( new ShardStoreCreated ( Self , shardId ) ) ;
186+ }
187+
188+ public ITimerScheduler Timers { get ; set ; }
189+
190+ protected override bool Receive ( object message )
191+ {
192+ switch ( message )
193+ {
194+ case RememberEntitiesShardStore . GetEntities _:
195+ Sender . Tell ( new RememberEntitiesShardStore . RememberedEntities ( ImmutableHashSet < string > . Empty . Add ( "1" ) ) ) ;
196+ return true ;
197+ case RememberEntitiesShardStore . Update m :
198+ Sender . Tell ( new RememberEntitiesShardStore . UpdateDone ( m . Started , m . Stopped ) ) ;
199+ return true ;
200+ }
201+ return false ;
202+ }
203+ }
204+
205+ private class FakeCoordinatorStoreActor : ActorBase , IWithTimers
206+ {
207+ public static Props Props ( ) => Actor . Props . Create ( ( ) => new FakeCoordinatorStoreActor ( ) ) ;
208+
209+ private readonly ILoggingAdapter _log = Context . GetLogger ( ) ;
210+
211+ public ITimerScheduler Timers { get ; set ; }
212+
213+ public FakeCoordinatorStoreActor ( )
214+ {
215+ Context . System . EventStream . Publish ( new CoordinatorStoreCreated ( Context . Self ) ) ;
216+ }
217+
218+ protected override bool Receive ( object message )
219+ {
220+ switch ( message )
221+ {
222+ case RememberEntitiesCoordinatorStore . GetShards _:
223+ Sender . Tell ( new RememberEntitiesCoordinatorStore . RememberedShards ( ImmutableHashSet < string > . Empty . Add ( "1" ) ) ) ;
224+ return true ;
225+ case RememberEntitiesCoordinatorStore . AddShard m :
226+ Sender . Tell ( new RememberEntitiesCoordinatorStore . UpdateDone ( m . ShardId ) ) ;
227+ return true ;
228+ }
229+ return false ;
230+ }
231+ }
232+
233+ private class TestSupervisionStrategy : ShardSupervisionStrategy
234+ {
235+ private readonly AtomicCounter _counter ;
236+
237+ public TestSupervisionStrategy ( AtomicCounter counter , int maxRetry , int window , Func < Exception , Directive > localOnlyDecider )
238+ : base ( maxRetry , window , localOnlyDecider )
239+ {
240+ _counter = counter ;
241+ }
242+
243+ public override void ProcessFailure ( IActorContext context , bool restart , IActorRef child , Exception cause , ChildRestartStats stats ,
244+ IReadOnlyCollection < ChildRestartStats > children )
245+ {
246+ _counter . GetAndIncrement ( ) ;
247+ base . ProcessFailure ( context , restart , child , cause , stats , children ) ;
248+ }
249+ }
250+
251+ private static Config SpecConfig =>
252+ ConfigurationFactory . ParseString (
253+ """
254+ akka {
255+ loglevel = DEBUG
256+ actor.provider = cluster
257+ remote.dot-netty.tcp.port = 0
258+
259+ cluster.sharding {
260+ distributed-data.durable.keys = []
261+ state-store-mode = ddata
262+ remember-entities = on
263+ remember-entities-store = custom
264+ remember-entities-custom-store = "Akka.Cluster.Sharding.Tests.RememberEntitiesSupervisionStrategyDecisionSpec+FakeStore, Akka.Cluster.Sharding.Tests"
265+ verbose-debug-logging = on
266+ }
267+ }
268+ """ )
269+ . WithFallback ( ClusterSingleton . DefaultConfig ( ) )
270+ . WithFallback ( ClusterSharding . DefaultConfig ( ) ) ;
271+
272+ public RememberEntitiesSupervisionStrategyDecisionSpec ( ITestOutputHelper helper ) : base ( SpecConfig , helper )
273+ {
274+ }
275+
276+ protected override void AtStartup ( )
277+ {
278+ // Form a one node cluster
279+ var cluster = Cluster . Get ( Sys ) ;
280+ cluster . Join ( cluster . SelfAddress ) ;
281+ AwaitAssert ( ( ) =>
282+ {
283+ cluster . ReadView . Members . Count ( m => m . Status == MemberStatus . Up ) . Should ( ) . Be ( 1 ) ;
284+ } ) ;
285+ }
286+
287+ public Directive TestDecider ( Exception cause )
288+ {
289+ return Directive . Restart ;
290+ }
291+
292+ [ Fact ( DisplayName = "Persistent shard must stop remembered entity with excessive failures" ) ]
293+ public async Task Persistent_Shard_must_stop_remembered_entity_with_excessive_restart_attempt ( )
294+ {
295+ var strategyCounter = new AtomicCounter ( 0 ) ;
296+
297+ var settings = ClusterShardingSettings . Create ( Sys ) ;
298+ settings = settings
299+ . WithTuningParameters ( settings . TuningParameters . WithEntityRestartBackoff ( 0.1 . Seconds ( ) ) )
300+ . WithRememberEntities ( true )
301+ . WithSupervisorStrategy ( new TestSupervisionStrategy ( strategyCounter , 3 , 1000 , TestDecider ) ) ;
302+
303+ var storeProbe = CreateTestProbe ( ) ;
304+ Sys . EventStream . Subscribe < ShardStoreCreated > ( storeProbe ) ;
305+ Sys . EventStream . Subscribe < Error > ( TestActor ) ;
306+
307+ var entityProps = Props . Create ( ( ) => new PreStartFailActor ( ) ) ;
308+ await EventFilter . Error ( contains : "cats: Remembered entity 1 was stopped: entity failed repeatedly" )
309+ . ExpectOneAsync ( async ( ) =>
310+ {
311+ _ = Sys . ActorOf ( Props . Create ( ( ) => new FakeShardRegion ( settings , entityProps ) ) ) ;
312+ storeProbe . ExpectMsg < ShardStoreCreated > ( ) ;
313+ await Task . Yield ( ) ;
314+ } ) ;
315+
316+ // Failed on the 4th call
317+ strategyCounter . Current . Should ( ) . Be ( 4 ) ;
318+ }
319+
320+ [ Fact ( DisplayName = "Persistent shard must stop remembered entity when stopped using Directive.Stop decision" ) ]
321+ public async Task Persistent_Shard_must_stop_remembered_entity_with_stop_directive_on_constructor_failure ( )
322+ {
323+ var strategyCounter = new AtomicCounter ( 0 ) ;
324+
325+ var settings = ClusterShardingSettings . Create ( Sys ) ;
326+ settings = settings
327+ . WithTuningParameters ( settings . TuningParameters . WithEntityRestartBackoff ( 0.1 . Seconds ( ) ) )
328+ . WithRememberEntities ( true )
329+ . WithSupervisorStrategy ( new TestSupervisionStrategy ( strategyCounter , 3 , 1000 , SupervisorStrategy . DefaultDecider . Decide ) ) ;
330+
331+ var storeProbe = CreateTestProbe ( ) ;
332+ Sys . EventStream . Subscribe < ShardStoreCreated > ( storeProbe ) ;
333+ Sys . EventStream . Subscribe < Error > ( TestActor ) ;
334+
335+ var entityProps = Props . Create ( ( ) => new ConstructorFailActor ( ) ) ;
336+ await EventFilter . Error ( contains : "cats: Remembered entity 1 was stopped: entity stopped by Directive.Stop decision" )
337+ . ExpectOneAsync ( async ( ) =>
338+ {
339+ _ = Sys . ActorOf ( Props . Create ( ( ) => new FakeShardRegion ( settings , entityProps ) ) ) ;
340+ storeProbe . ExpectMsg < ShardStoreCreated > ( ) ;
341+ await Task . Yield ( ) ;
342+ } ) ;
343+
344+ // Failed on the 1st call
345+ strategyCounter . Current . Should ( ) . Be ( 1 ) ;
346+ }
347+
348+ }
0 commit comments