Skip to content

Commit

Permalink
Attempts fixing logic for retryAllFailed() (#46)
Browse files Browse the repository at this point in the history
* Attempts fixing logic for retryAllFailed()

* workQueue to TreeSet

* Revert some changes

* Readd retryAllFailed but better
  • Loading branch information
StrongestNumber9 authored Aug 22, 2023
1 parent a050231 commit 35b8b37
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 8 deletions.
11 changes: 4 additions & 7 deletions src/main/java/com/teragrep/rlp_01/RelpBatch.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,7 @@

package com.teragrep.rlp_01;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Set;
import java.util.TreeMap;
import java.util.*;

/**
* A class that is used to send RELP messages to the server.
Expand All @@ -37,13 +34,13 @@ public class RelpBatch {
private TreeMap<Long, RelpFrameRX> responses;

// Not processed queue, for asynchronous use.
private LinkedList<Long> workQueue;
private TreeSet<Long> workQueue;

public RelpBatch() {
this.reqID = new requestID();
this.requests = new TreeMap<Long, RelpFrameTX>();
this.responses = new TreeMap<Long, RelpFrameRX>();
this.workQueue = new LinkedList<Long>();
this.workQueue = new TreeSet<>();
}

/**
Expand Down Expand Up @@ -153,7 +150,7 @@ public int getWorkQueueLength() {
}

public Long popWorkQueue() {
return this.workQueue.pop();
return this.workQueue.pollFirst();
}
}

25 changes: 24 additions & 1 deletion src/test/java/com/teragrep/rlp_01/RelpBatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -126,16 +126,17 @@ public void testVerifyTransactionAll() {
Assertions.assertTrue(batch.verifyTransactionAll(), "Did not verify all transactions");
}

// FIXME: https://github.com/teragrep/rlp_01/issues/12
@Test
public void testRetryAllFailed() {
RelpBatch batch = new RelpBatch();
int messages = 5;
Long[] ids = new Long[messages];
// Fill 5 messages
for(int i=0; i<messages; i++) {
ids[i] = batch.insert(message.getBytes(StandardCharsets.UTF_8));
}
Assertions.assertEquals(messages, batch.getWorkQueueLength(), "Worker queue did not match");
// Resolve 3 of them
int resolve = 3;
for(int i=0; i<resolve; i++) {
String response = "200 OK";
Expand All @@ -144,8 +145,30 @@ public void testRetryAllFailed() {
buffer.flip();
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), buffer));
}
// Clean work queue to pretend we have sent all
int len = batch.getWorkQueueLength();
for(int i=0; i<len; i++) {
batch.popWorkQueue();
}
// Refill the work queue as some wasn't sent
batch.retryAllFailed();
Assertions.assertEquals(messages-resolve, batch.getWorkQueueLength(), "Worker queue count did not match");
// Resolve last messages
for(int i=resolve; i<messages; i++) {
String response = "200 OK";
ByteBuffer buffer = ByteBuffer.allocateDirect(response.length());
buffer.put(response.getBytes(StandardCharsets.UTF_8));
buffer.flip();
batch.putResponse(ids[i], new RelpFrameRX(ids[i].intValue(), RelpCommand.SYSLOG, response.length(), buffer));
}
// Resolve work queue again
len = batch.getWorkQueueLength();
for(int i=0; i<len; i++) {
batch.popWorkQueue();
}
// Refill the work queue
batch.retryAllFailed();
Assertions.assertEquals(0, batch.getWorkQueueLength(), "Worker queue count did not match");
}

@Test
Expand Down

0 comments on commit 35b8b37

Please sign in to comment.