8585import java .util .concurrent .atomic .AtomicInteger ;
8686import java .util .concurrent .atomic .AtomicLong ;
8787import java .util .concurrent .atomic .AtomicReference ;
88+ import java .util .stream .Collectors ;
8889
8990import static org .hamcrest .Matchers .equalTo ;
9091import static org .hamcrest .Matchers .greaterThan ;
@@ -206,53 +207,6 @@ private String randomNonTranslogPatternString(int min, int max) {
206207 return string ;
207208 }
208209
209- public void testRead () throws IOException {
210- Location loc0 = translog .getLastWriteLocation ();
211- assertNotNull (loc0 );
212-
213- Translog .Location loc1 = translog .add (new Translog .Index ("test" , "1" , new byte []{1 }));
214- assertThat (loc1 , greaterThan (loc0 ));
215- assertThat (translog .getLastWriteLocation (), greaterThan (loc1 ));
216- Translog .Location loc2 = translog .add (new Translog .Index ("test" , "2" , new byte []{2 }));
217- assertThat (loc2 , greaterThan (loc1 ));
218- assertThat (translog .getLastWriteLocation (), greaterThan (loc2 ));
219- assertThat (translog .read (loc1 ).getSource ().source , equalTo (new BytesArray (new byte []{1 })));
220- assertThat (translog .read (loc2 ).getSource ().source , equalTo (new BytesArray (new byte []{2 })));
221-
222- Translog .Location lastLocBeforeSync = translog .getLastWriteLocation ();
223- translog .sync ();
224- assertEquals (lastLocBeforeSync , translog .getLastWriteLocation ());
225- assertThat (translog .read (loc1 ).getSource ().source , equalTo (new BytesArray (new byte []{1 })));
226- assertThat (translog .read (loc2 ).getSource ().source , equalTo (new BytesArray (new byte []{2 })));
227-
228- Translog .Location loc3 = translog .add (new Translog .Index ("test" , "2" , new byte []{3 }));
229- assertThat (loc3 , greaterThan (loc2 ));
230- assertThat (translog .getLastWriteLocation (), greaterThan (loc3 ));
231- assertThat (translog .read (loc3 ).getSource ().source , equalTo (new BytesArray (new byte []{3 })));
232-
233- lastLocBeforeSync = translog .getLastWriteLocation ();
234- translog .sync ();
235- assertEquals (lastLocBeforeSync , translog .getLastWriteLocation ());
236- assertThat (translog .read (loc3 ).getSource ().source , equalTo (new BytesArray (new byte []{3 })));
237- translog .prepareCommit ();
238- /*
239- * The commit adds to the lastWriteLocation even though is isn't really a write. This is just an implementation artifact but it can
240- * safely be ignored because the lastWriteLocation continues to be greater than the Location returned from the last write operation
241- * and less than the location of the next write operation.
242- */
243- assertThat (translog .getLastWriteLocation (), greaterThan (lastLocBeforeSync ));
244- assertThat (translog .read (loc3 ).getSource ().source , equalTo (new BytesArray (new byte []{3 })));
245- translog .commit ();
246- assertNull (translog .read (loc1 ));
247- assertNull (translog .read (loc2 ));
248- assertNull (translog .read (loc3 ));
249- try {
250- translog .read (new Translog .Location (translog .currentFileGeneration () + 1 , 17 , 35 ));
251- fail ("generation is greater than the current" );
252- } catch (IllegalStateException ex ) {
253- // expected
254- }
255- }
256210
257211 public void testSimpleOperations () throws IOException {
258212 ArrayList <Translog .Operation > ops = new ArrayList <>();
@@ -441,7 +395,7 @@ public void assertFileDeleted(Translog translog, long id) {
441395 assertFalse ("translog [" + id + "] still exists" , Files .exists (translog .location ().resolve (Translog .getFilename (id ))));
442396 }
443397
444- static class LocationOperation {
398+ static class LocationOperation implements Comparable < LocationOperation > {
445399 final Translog .Operation operation ;
446400 final Translog .Location location ;
447401
@@ -450,6 +404,10 @@ public LocationOperation(Translog.Operation operation, Translog.Location locatio
450404 this .location = location ;
451405 }
452406
407+ @ Override
408+ public int compareTo (LocationOperation o ) {
409+ return location .compareTo (o .location );
410+ }
453411 }
454412
455413 public void testConcurrentWritesWithVaryingSize () throws Throwable {
@@ -478,8 +436,12 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable {
478436 threads [i ].join (60 * 1000 );
479437 }
480438
481- for (LocationOperation locationOperation : writtenOperations ) {
482- Translog .Operation op = translog .read (locationOperation .location );
439+ List <LocationOperation > collect = writtenOperations .stream ().collect (Collectors .toList ());
440+ Collections .sort (collect );
441+ Translog .Snapshot snapshot = translog .newSnapshot ();
442+ for (LocationOperation locationOperation : collect ) {
443+ Translog .Operation op = snapshot .next ();
444+ assertNotNull (op );
483445 Translog .Operation expectedOp = locationOperation .operation ;
484446 assertEquals (expectedOp .opType (), op .opType ());
485447 switch (op .opType ()) {
@@ -505,6 +467,7 @@ public void testConcurrentWritesWithVaryingSize() throws Throwable {
505467 }
506468
507469 }
470+ assertNull (snapshot .next ());
508471
509472 }
510473
@@ -521,13 +484,16 @@ public void testTranslogChecksums() throws Exception {
521484 corruptTranslogs (translogDir );
522485
523486 AtomicInteger corruptionsCaught = new AtomicInteger (0 );
487+ Translog .Snapshot snapshot = translog .newSnapshot ();
524488 for (Translog .Location location : locations ) {
525489 try {
526- translog .read (location );
490+ Translog .Operation next = snapshot .next ();
491+ assertNotNull (next );
527492 } catch (TranslogCorruptedException e ) {
528493 corruptionsCaught .incrementAndGet ();
529494 }
530495 }
496+ expectThrows (TranslogCorruptedException .class , () -> snapshot .next ());
531497 assertThat ("at least one corruption was caused and caught" , corruptionsCaught .get (), greaterThanOrEqualTo (1 ));
532498 }
533499
@@ -544,15 +510,12 @@ public void testTruncatedTranslogs() throws Exception {
544510 truncateTranslogs (translogDir );
545511
546512 AtomicInteger truncations = new AtomicInteger (0 );
513+ Translog .Snapshot snap = translog .newSnapshot ();
547514 for (Translog .Location location : locations ) {
548515 try {
549- translog .read (location );
550- } catch (ElasticsearchException e ) {
551- if (e .getCause () instanceof EOFException ) {
552- truncations .incrementAndGet ();
553- } else {
554- throw e ;
555- }
516+ assertNotNull (snap .next ());
517+ } catch (EOFException e ) {
518+ truncations .incrementAndGet ();
556519 }
557520 }
558521 assertThat ("at least one truncation was caused and caught" , truncations .get (), greaterThanOrEqualTo (1 ));
@@ -860,8 +823,14 @@ public void testLocationComparison() throws IOException {
860823 }
861824
862825 assertEquals (max .generation , translog .currentFileGeneration ());
863- final Translog .Operation read = translog .read (max );
864- assertEquals (read .getSource ().source .utf8ToString (), Integer .toString (count ));
826+ Translog .Snapshot snap = translog .newSnapshot ();
827+ Translog .Operation next ;
828+ Translog .Operation maxOp = null ;
829+ while ((next = snap .next ()) != null ) {
830+ maxOp = next ;
831+ }
832+ assertNotNull (maxOp );
833+ assertEquals (maxOp .getSource ().source .utf8ToString (), Integer .toString (count ));
865834 }
866835
867836 public static Translog .Location max (Translog .Location a , Translog .Location b ) {
@@ -884,30 +853,24 @@ public void testBasicCheckpoint() throws IOException {
884853 }
885854 }
886855 assertEquals (translogOperations , translog .totalOperations ());
887- final Translog . Location lastLocation = translog .add (new Translog .Index ("test" , "" + translogOperations , Integer .toString (translogOperations ).getBytes (Charset .forName ("UTF-8" ))));
856+ translog .add (new Translog .Index ("test" , "" + translogOperations , Integer .toString (translogOperations ).getBytes (Charset .forName ("UTF-8" ))));
888857
889858 final Checkpoint checkpoint = Checkpoint .read (translog .location ().resolve (Translog .CHECKPOINT_FILE_NAME ));
890859 try (final TranslogReader reader = translog .openReader (translog .location ().resolve (Translog .getFilename (translog .currentFileGeneration ())), checkpoint )) {
891860 assertEquals (lastSynced + 1 , reader .totalOperations ());
861+ Translog .Snapshot snapshot = reader .newSnapshot ();
862+
892863 for (int op = 0 ; op < translogOperations ; op ++) {
893- Translog .Location location = locations .get (op );
894864 if (op <= lastSynced ) {
895- final Translog .Operation read = reader . read ( location );
865+ final Translog .Operation read = snapshot . next ( );
896866 assertEquals (Integer .toString (op ), read .getSource ().source .utf8ToString ());
897867 } else {
898- try {
899- reader .read (location );
900- fail ("read past checkpoint" );
901- } catch (EOFException ex ) {
902-
903- }
868+ Translog .Operation next = snapshot .next ();
869+ assertNull (next );
904870 }
905871 }
906- try {
907- reader .read (lastLocation );
908- fail ("read past checkpoint" );
909- } catch (EOFException ex ) {
910- }
872+ Translog .Operation next = snapshot .next ();
873+ assertNull (next );
911874 }
912875 assertEquals (translogOperations + 1 , translog .totalOperations ());
913876 translog .close ();
@@ -1618,11 +1581,6 @@ ChannelFactory getChannelFactory() {
16181581 }
16191582 };
16201583 }
1621-
1622- @ Override
1623- protected boolean assertBytesAtLocation (Location location , BytesReference expectedBytes ) throws IOException {
1624- return true ; // we don't wanna fail in the assert
1625- }
16261584 };
16271585 }
16281586
0 commit comments