Skip to content

Commit

Permalink
INTERNAL: read while END/PIPE_ERROR received in the pipe operation
Browse files Browse the repository at this point in the history
  • Loading branch information
oliviarla committed Dec 16, 2024
1 parent 59e1374 commit 9be687e
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
Expand Down Expand Up @@ -90,16 +91,21 @@ public Map<String, T> get(long duration,
MemcachedConnection.opsSucceeded(ops);
}

List<Exception> exceptions = new ArrayList<>();
for (Operation op : ops) {
if (op != null && op.hasErrored()) {
throw new ExecutionException(op.getException());
exceptions.add(op.getException());
}

if (op != null && op.isCancelled()) {
throw new ExecutionException(new RuntimeException(op.getCancelCause()));
exceptions.add(new RuntimeException(op.getCancelCause()));
}
}

if (!exceptions.isEmpty()) {
throw new CompositeException(exceptions);
}

return failedResult;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,8 +260,11 @@ protected void handleError(OperationErrorType eType, String line)
default:
assert false;
}
transitionState(OperationState.COMPLETE);
throw exception;

if (!isPipeOperation()) {
transitionState(OperationState.COMPLETE);
throw exception;
}
}

public void handleRead(ByteBuffer data) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,9 @@ public void readFromBuffer(ByteBuffer data) throws IOException {
handleError(eType, line);
} else {
handleLine(line);
if (hasErrored() && isPipeOperation() && getState() == OperationState.COMPLETE) {
throw getException();
}
}
} else { // OperationReadType.DATA
handleRead(data);
Expand Down
55 changes: 55 additions & 0 deletions src/test/java/net/spy/memcached/protocol/ascii/BaseOpTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,24 @@

package net.spy.memcached.protocol.ascii;

import java.net.InetSocketAddress;
import java.nio.Buffer;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;

import net.spy.memcached.collection.CollectionPipedInsert;
import net.spy.memcached.ops.CollectionPipedInsertOperation;
import net.spy.memcached.ops.Operation;
import net.spy.memcached.ops.OperationCallback;
import net.spy.memcached.ops.OperationException;
import net.spy.memcached.ops.OperationStatus;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
Expand Down Expand Up @@ -99,6 +109,51 @@ void testPartialLine() throws Exception {
assertEquals("this is a test", op.getCurrentLine());
}

@Test
void throwExceptionAfterReadingEndOrPipeError() throws Exception {
String key = "testPipeLine";
CollectionPipedInsert.ListPipedInsert<String> insert =
new CollectionPipedInsert.ListPipedInsert<>(key, 0,
Arrays.asList("a", "b"), null, null);
OperationCallback cb = new CollectionPipedInsertOperation.Callback() {
@Override
public void receivedStatus(OperationStatus status) {
}

@Override
public void complete() {
}

@Override
public void gotStatus(Integer index, OperationStatus status) {
}
};
CollectionPipedInsertOperationImpl op =
new CollectionPipedInsertOperationImpl("test", insert, cb);
LinkedBlockingQueue<Operation> queue = new LinkedBlockingQueue<>();
op.setHandlingNode(new AsciiMemcachedNodeImpl("testnode", new InetSocketAddress(11211),
60, queue, queue, queue, 0L));

ByteBuffer b = ByteBuffer.allocate(40);
String line1 = "RESPONSE 2\r\n";
op.writeComplete();
b.put(line1.getBytes());
b.flip();
assertDoesNotThrow(() -> op.readFromBuffer(b));
b.clear();

String line2 = "SERVER_ERROR out of memory\r\n";
b.put(line2.getBytes());
b.flip();
assertDoesNotThrow(() -> op.readFromBuffer(b));
b.clear();

String line4 = "PIPE_ERROR failed\r\n";
b.put(line4.getBytes());
b.flip();
assertThrows(OperationException.class, () -> op.readFromBuffer(b));
}

private static class SimpleOp extends OperationImpl {

private final LinkedList<String> lines = new LinkedList<>();
Expand Down

0 comments on commit 9be687e

Please sign in to comment.