Skip to content

[DE-371] cluster dirty reads #455

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Sep 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions src/main/java/com/arangodb/ArangoCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,10 @@ public interface ArangoCursor<T> extends ArangoIterable<T>, ArangoIterator<T>, C
*/
List<T> asListRemaining();

/**
* @return true if the result is a potential dirty read
* @since ArangoDB 3.10
*/
boolean isPotentialDirtyRead();

}
1 change: 1 addition & 0 deletions src/main/java/com/arangodb/entity/CursorEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public VPackSlice getResult() {
}

public Map<String, String> getMeta() {
if (meta == null) return Collections.emptyMap();
return meta;
}

Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/arangodb/entity/MultiDocumentEntity.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class MultiDocumentEntity<E> implements Entity {
private Collection<E> documents;
private Collection<ErrorEntity> errors;
private Collection<Object> documentsAndErrors;
private boolean isPotentialDirtyRead = false;

public MultiDocumentEntity() {
super();
Expand Down Expand Up @@ -68,4 +69,15 @@ public void setDocumentsAndErrors(final Collection<Object> documentsAndErrors) {
this.documentsAndErrors = documentsAndErrors;
}

/**
* @return true if the result is a potential dirty read
* @since ArangoDB 3.10
*/
public Boolean isPotentialDirtyRead() {
return isPotentialDirtyRead;
}

public void setPotentialDirtyRead(final Boolean isPotentialDirtyRead) {
this.isPotentialDirtyRead = isPotentialDirtyRead;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<T>> getDocumentsResponseD
final Class<T> type, final DocumentReadOptions options) {
return response -> {
final MultiDocumentEntity<T> multiDocument = new MultiDocumentEntity<>();
boolean potentialDirtyRead = Boolean.parseBoolean(response.getMeta().get("X-Arango-Potential-Dirty-Read"));
multiDocument.setPotentialDirtyRead(potentialDirtyRead);
final Collection<T> docs = new ArrayList<>();
final Collection<ErrorEntity> errors = new ArrayList<>();
final Collection<Object> documentsAndErrors = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,13 @@ protected <T> ResponseDeserializer<T> transactionResponseDeserializer(final Clas
}

protected Request beginStreamTransactionRequest(final StreamTransactionOptions options) {
return request(dbName, RequestType.POST, PATH_API_BEGIN_STREAM_TRANSACTION)
.setBody(util().serialize(options != null ? options : new StreamTransactionOptions()));
StreamTransactionOptions opts = options != null ? options : new StreamTransactionOptions();
Request r = request(dbName, RequestType.POST, PATH_API_BEGIN_STREAM_TRANSACTION)
.setBody(util().serialize(opts));
if(Boolean.TRUE.equals(opts.getAllowDirtyRead())) {
RequestUtils.allowDirtyRead(r);
}
return r;
}

protected Request abortStreamTransactionRequest(String id) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public class ArangoCursorImpl<T> extends AbstractArangoIterable<T> implements Ar
protected final ArangoCursorIterator<T> iterator;
private final String id;
private final ArangoCursorExecute execute;
private final boolean isPontentialDirtyRead;

public ArangoCursorImpl(final InternalArangoDatabase<?, ?> db, final ArangoCursorExecute execute,
final Class<T> type, final CursorEntity result) {
Expand All @@ -51,6 +52,7 @@ public ArangoCursorImpl(final InternalArangoDatabase<?, ?> db, final ArangoCurso
this.type = type;
iterator = createIterator(this, db, execute, result);
id = result.getId();
this.isPontentialDirtyRead = Boolean.parseBoolean(result.getMeta().get("X-Arango-Potential-Dirty-Read"));
}

protected ArangoCursorIterator<T> createIterator(
Expand Down Expand Up @@ -120,6 +122,11 @@ public List<T> asListRemaining() {
return remaining;
}

@Override
public boolean isPotentialDirtyRead() {
return isPontentialDirtyRead;
}

@Override
public void remove() {
throw new UnsupportedOperationException();
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/com/arangodb/model/StreamTransactionOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

package com.arangodb.model;

import com.arangodb.velocypack.annotations.Expose;

/**
* @author Mark Vollmary
* @author Michele Rastelli
Expand All @@ -33,6 +35,8 @@ public class StreamTransactionOptions {
private Boolean waitForSync;
private Long maxTransactionSize;
private Boolean allowImplicit;
@Expose(serialize = false)
private Boolean allowDirtyRead;

public StreamTransactionOptions() {
super();
Expand Down Expand Up @@ -123,4 +127,20 @@ public StreamTransactionOptions maxTransactionSize(final Long maxTransactionSize
return this;
}

public Boolean getAllowDirtyRead() {
return allowDirtyRead;
}

/**
* @param allowDirtyRead Set to {@code true} allows reading from followers in an active-failover setup.
* @return options
* @see <a href="https://www.arangodb.com/docs/stable/administration-active-failover.html#reading-from-follower">API
* Documentation</a>
* @since ArangoDB 3.4.0
*/
public StreamTransactionOptions allowDirtyRead(final Boolean allowDirtyRead) {
this.allowDirtyRead = allowDirtyRead;
return this;
}

}
3 changes: 3 additions & 0 deletions src/test/java/com/arangodb/ArangoCollectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -524,6 +524,9 @@ void getDocumentsDirtyRead(ArangoCollection collection) {
.getDocuments(Arrays.asList("1", "2", "3"), BaseDocument.class,
new DocumentReadOptions().allowDirtyRead(true));
assertThat(documents).isNotNull();
if (isAtLeastVersion(3, 10)) {
assertThat(documents.isPotentialDirtyRead()).isTrue();
}
assertThat(documents.getDocuments()).hasSize(3);
for (final BaseDocument document : documents.getDocuments()) {
assertThat(document.getId()).isIn(COLLECTION_NAME + "/" + "1", COLLECTION_NAME + "/" + "2", COLLECTION_NAME + "/" + "3");
Expand Down
3 changes: 3 additions & 0 deletions src/test/java/com/arangodb/ArangoDatabaseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -959,6 +959,9 @@ void queryAllowDirtyRead(ArangoDatabase db) throws IOException {
final ArangoCursor<BaseDocument> cursor = db.query("FOR i IN @@col FILTER i.test == @test RETURN i",
new MapBuilder().put("@col", CNAME1).put("test", null).get(),
new AqlQueryOptions().allowDirtyRead(true), BaseDocument.class);
if (isAtLeastVersion(3, 10)) {
assertThat(cursor.isPotentialDirtyRead()).isTrue();
}
cursor.close();
}

Expand Down
32 changes: 32 additions & 0 deletions src/test/java/com/arangodb/StreamTransactionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.util.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -788,4 +789,35 @@ void transactionAllowImplicitFalse(ArangoDatabase db) {

db.abortStreamTransaction(tx.getId());
}

@ParameterizedTest(name = "{index}")
@MethodSource("dbs")
void transactionDirtyRead(ArangoDatabase db) throws IOException {
assumeTrue(isCluster());
assumeTrue(isAtLeastVersion(3, 10));

ArangoCollection collection = db.collection(COLLECTION_NAME);
DocumentCreateEntity<BaseDocument> doc = collection.insertDocument(new BaseDocument());

StreamTransactionEntity tx = db
.beginStreamTransaction(new StreamTransactionOptions()
.readCollections(COLLECTION_NAME)
.allowDirtyRead(true));

MultiDocumentEntity<BaseDocument> readDocs = collection.getDocuments(Collections.singletonList(doc.getKey()),
BaseDocument.class,
new DocumentReadOptions().streamTransactionId(tx.getId()));

assertThat(readDocs.isPotentialDirtyRead()).isTrue();
assertThat(readDocs.getDocuments()).hasSize(1);

final ArangoCursor<BaseDocument> cursor = db.query("FOR i IN @@col RETURN i",
Collections.singletonMap("@col", COLLECTION_NAME),
new AqlQueryOptions().streamTransactionId(tx.getId()), BaseDocument.class);
assertThat(cursor.isPotentialDirtyRead()).isTrue();
cursor.close();

db.abortStreamTransaction(tx.getId());
}

}