23
23
import java .lang .reflect .Field ;
24
24
import java .nio .charset .Charset ;
25
25
import java .util .ArrayList ;
26
- import java .util .EnumSet ;
27
26
import java .util .HashMap ;
28
27
import java .util .List ;
29
28
import java .util .Map ;
43
42
import org .apache .bookkeeper .mledger .ManagedTrash ;
44
43
import org .apache .bookkeeper .mledger .proto .MLDataFormats .ManagedLedgerInfo .LedgerInfo ;
45
44
import org .apache .bookkeeper .test .MockedBookKeeperTestCase ;
46
- import org .apache .pulsar .common .util .FutureUtil ;
47
45
import org .apache .pulsar .metadata .api .MetadataStoreConfig ;
48
- import org .apache .pulsar .metadata .api .MetadataStoreException ;
49
- import org .apache .pulsar .metadata .api .Notification ;
50
- import org .apache .pulsar .metadata .api .NotificationType ;
51
46
import org .apache .pulsar .metadata .api .Stat ;
52
- import org .apache .pulsar .metadata .api .extended .CreateOption ;
53
47
import org .apache .pulsar .metadata .api .extended .MetadataStoreExtended ;
54
48
import org .apache .pulsar .metadata .impl .FaultInjectionMetadataStore ;
55
- import org .apache .pulsar .metadata .impl .LocalMemoryMetadataStore ;
56
49
import org .awaitility .Awaitility ;
57
50
import org .testng .Assert ;
58
51
import org .testng .annotations .Test ;
@@ -72,66 +65,20 @@ public class ManagedTrashTest extends MockedBookKeeperTestCase {
72
65
73
66
@ Override
74
67
protected void setUpTestCase () throws Exception {
75
- MetadataStoreExtended metadataStoreExtended =
76
- new LocalMemoryMetadataStore ("memory:local" , MetadataStoreConfig .builder ().build ()) {
77
- @ Override
78
- public CompletableFuture <Stat > storePut (String path , byte [] data , Optional <Long > optExpectedVersion ,
79
- EnumSet <CreateOption > options ) {
80
- if (!isValidPath (path )) {
81
- return FutureUtil .failedFuture (new MetadataStoreException .InvalidPathException (path ));
82
- }
83
- synchronized (map ) {
84
- boolean hasVersion = optExpectedVersion .isPresent ();
85
- int expectedVersion = optExpectedVersion .orElse (-1L ).intValue ();
86
-
87
- if (options .contains (CreateOption .Sequential )) {
88
- path += Long .toString (sequentialIdGenerator .getAndIncrement ());
89
- }
90
-
91
- long now = System .currentTimeMillis ();
92
-
93
- if (hasVersion && expectedVersion == -1 ) {
94
- Value newValue = new Value (0 , data , now , now , options .contains (CreateOption .Ephemeral ));
95
- Value existingValue = map .putIfAbsent (path , newValue );
96
- if (existingValue != null ) {
97
- return FutureUtils .exception (new MetadataStoreException .BadVersionException ("" ));
98
- } else {
99
- persistedData .put (path , data );
100
- receivedNotification (new Notification (NotificationType .Created , path ));
101
- notifyParentChildrenChanged (path );
102
- return FutureUtils .value (new Stat (path , 0 , now , now , newValue .isEphemeral (), true ));
103
- }
104
- } else {
105
- Value existingValue = map .get (path );
106
- long existingVersion = existingValue != null ? existingValue .getVersion () : -1 ;
107
- if (hasVersion && expectedVersion != existingVersion ) {
108
- return FutureUtils .exception (new MetadataStoreException .BadVersionException ("" ));
109
- } else {
110
- long newVersion = existingValue != null ? existingValue .getVersion () + 1 : 0 ;
111
- long createdTimestamp =
112
- existingValue != null ? existingValue .getCreatedTimestamp () : now ;
113
- Value newValue = new Value (newVersion , data , createdTimestamp , now ,
114
- options .contains (CreateOption .Ephemeral ));
115
- persistedData .put (path , data );
116
- map .put (path , newValue );
117
-
118
- NotificationType type =
119
- existingValue == null ? NotificationType .Created :
120
- NotificationType .Modified ;
121
- receivedNotification (new Notification (type , path ));
122
- if (type == NotificationType .Created ) {
123
- notifyParentChildrenChanged (path );
124
- }
125
- return FutureUtils
126
- .value (new Stat (path , newValue .getVersion (), newValue .getCreatedTimestamp (),
127
- newValue .getModifiedTimestamp (),
128
- false , true ));
129
- }
130
- }
131
- }
68
+ metadataStore = new FaultInjectionMetadataStore (
69
+ MetadataStoreExtended .create ("memory:local" , MetadataStoreConfig .builder ().build ())) {
70
+ @ Override
71
+ public CompletableFuture <Stat > put (String path , byte [] value , Optional <Long > expectedVersion ) {
72
+ CompletableFuture <Stat > future = super .put (path , value , expectedVersion );
73
+ future .whenComplete ((res , e ) -> {
74
+ if (e != null ) {
75
+ return ;
132
76
}
133
- };
134
- metadataStore = new FaultInjectionMetadataStore (metadataStoreExtended );
77
+ persistedData .put (path , value );
78
+ });
79
+ return future ;
80
+ }
81
+ };
135
82
bkc = new PulsarMockBookKeeper (executor ) {
136
83
@ Override
137
84
public void asyncDeleteLedger (long lId , AsyncCallback .DeleteCallback cb , Object ctx ) {
0 commit comments