Skip to content

Commit

Permalink
[DE-771] transaction aware deserializer (#554)
Browse files Browse the repository at this point in the history
* TransactionalOptions

* deserialization context

* SerdeContext tests

* fixed arch tests

* tests fixes

* tests fixes

* tests fixes

* fix native tests

* fixed integration tests

* fix ssl tests

* upd javadoc
  • Loading branch information
rashtao authored Apr 4, 2024
1 parent 779d7ed commit 54c680d
Show file tree
Hide file tree
Showing 55 changed files with 661 additions and 464 deletions.
7 changes: 4 additions & 3 deletions core/src/main/java/com/arangodb/internal/ArangoExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.net.CommunicationProtocol;
import com.arangodb.internal.serde.InternalSerde;
import com.arangodb.serde.SerdeContext;

import java.io.IOException;
import java.lang.reflect.Type;
Expand Down Expand Up @@ -58,8 +59,8 @@ public void setJwt(String jwt) {
protocol.setJwt(jwt);
}

protected <T> T createResult(final Type type, final InternalResponse response) {
return serde.deserialize(response.getBody(), type);
protected <T> T createResult(final Type type, final InternalResponse response, final SerdeContext ctx) {
return serde.deserialize(response.getBody(), type, ctx);
}

protected final void interceptResponse(InternalResponse response) {
Expand All @@ -79,6 +80,6 @@ public QueueTimeMetrics getQueueTimeMetrics() {
}

public interface ResponseDeserializer<T> {
T deserialize(InternalResponse response);
T deserialize(InternalResponse response, SerdeContext ctx);
}
}
24 changes: 20 additions & 4 deletions core/src/main/java/com/arangodb/internal/ArangoExecutorAsync.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.net.CommunicationProtocol;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.serde.SerdeUtils;
import com.arangodb.serde.SerdeContext;

import java.lang.reflect.Type;
import java.util.concurrent.CompletableFuture;
Expand All @@ -48,7 +50,7 @@ public <T> CompletableFuture<T> execute(final Supplier<InternalRequest> requestS
}

public <T> CompletableFuture<T> execute(final Supplier<InternalRequest> requestSupplier, final Type type, final HostHandle hostHandle) {
return execute(requestSupplier, response -> createResult(type, response), hostHandle);
return execute(requestSupplier, (response, ctx) -> createResult(type, response, ctx), hostHandle);
}

public <T> CompletableFuture<T> execute(final Supplier<InternalRequest> requestSupplier, final ResponseDeserializer<T> responseDeserializer) {
Expand All @@ -62,13 +64,16 @@ public <T> CompletableFuture<T> execute(

CompletableFuture<T> cf = CompletableFuture.completedFuture(requestSupplier)
.thenApply(Supplier::get)
.thenCompose(request -> protocol.executeAsync(interceptRequest(request), hostHandle))
.thenCompose(request -> protocol
.executeAsync(interceptRequest(request), hostHandle)
.thenApply(resp -> new ResponseWithContext(resp, SerdeUtils.createSerdeContext(request)))
)
.handle((r, e) -> {
if (e != null) {
throw ArangoDBException.of(e);
} else {
interceptResponse(r);
return responseDeserializer.deserialize(r);
interceptResponse(r.response);
return responseDeserializer.deserialize(r.response, r.context);
}
});

Expand All @@ -79,4 +84,15 @@ public <T> CompletableFuture<T> execute(
}
}

private static class ResponseWithContext {
final InternalResponse response;
final SerdeContext context;

ResponseWithContext(InternalResponse response, SerdeContext context) {
this.response = response;
this.context = context;
}
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.arangodb.internal.config.ArangoConfig;
import com.arangodb.internal.net.CommunicationProtocol;
import com.arangodb.internal.net.HostHandle;
import com.arangodb.internal.serde.SerdeUtils;

import java.lang.reflect.Type;

Expand All @@ -40,7 +41,7 @@ public <T> T execute(final InternalRequest request, final Type type) {
}

public <T> T execute(final InternalRequest request, final Type type, final HostHandle hostHandle) {
return execute(request, response -> createResult(type, response), hostHandle);
return execute(request, (response, ctx) -> createResult(type, response, ctx), hostHandle);
}

public <T> T execute(final InternalRequest request, final ResponseDeserializer<T> responseDeserializer) {
Expand All @@ -54,7 +55,7 @@ public <T> T execute(

final InternalResponse response = protocol.execute(interceptRequest(request), hostHandle);
interceptResponse(response);
return responseDeserializer.deserialize(response);
return responseDeserializer.deserialize(response, SerdeUtils.createSerdeContext(request));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ private InternalRequest createInsertDocumentRequest(final DocumentCreateOptions
}

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentCreateEntity<T>>> insertDocumentsResponseDeserializer(Class<T> userDataClass) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentCreateEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentCreateEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -119,12 +119,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentCreateEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentCreateEntity.class, userDataClass);
final DocumentCreateEntity<T> doc = getSerde().deserialize(next, type);
final DocumentCreateEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -168,7 +168,7 @@ protected InternalRequest getDocumentRequest(final String key, final DocumentRea
}

protected <T> ResponseDeserializer<T> getDocumentResponseDeserializer(final Class<T> type) {
return response -> getSerde().deserializeUserData(response.getBody(), type);
return (response, ctx) -> getSerde().deserializeUserData(response.getBody(), type, ctx);
}

protected InternalRequest getDocumentsRequest(final Iterable<String> keys, final DocumentReadOptions options) {
Expand All @@ -186,7 +186,7 @@ protected InternalRequest getDocumentsRequest(final Iterable<String> keys, final

protected <T> ResponseDeserializer<MultiDocumentEntity<T>> getDocumentsResponseDeserializer(
final Class<T> type) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<T> multiDocument = new MultiDocumentEntity<>();
boolean potentialDirtyRead = Boolean.parseBoolean(response.getMeta("X-Arango-Potential-Dirty-Read"));
multiDocument.setPotentialDirtyRead(potentialDirtyRead);
Expand All @@ -197,11 +197,11 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<T>> getDocumentsResponseD
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
final T doc = getSerde().deserializeUserData(getSerde().serialize(next), type);
final T doc = getSerde().deserializeUserData(getSerde().serialize(next), type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -249,7 +249,7 @@ private InternalRequest createReplaceDocumentRequest(final DocumentReplaceOption

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>> replaceDocumentsResponseDeserializer(
final Class<T> returnType) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentUpdateEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentUpdateEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -258,12 +258,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentUpdateEntity.class, returnType);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -312,7 +312,7 @@ private InternalRequest createUpdateDocumentRequest(final DocumentUpdateOptions

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>> updateDocumentsResponseDeserializer(
final Class<T> returnType) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentUpdateEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentUpdateEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -321,12 +321,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentUpdateEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentUpdateEntity.class, returnType);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type);
final DocumentUpdateEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -368,7 +368,7 @@ private InternalRequest createDeleteDocumentRequest(final DocumentDeleteOptions

protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentDeleteEntity<T>>> deleteDocumentsResponseDeserializer(
final Class<T> userDataClass) {
return response -> {
return (response, ctx) -> {
final MultiDocumentEntity<DocumentDeleteEntity<T>> multiDocument = new MultiDocumentEntity<>();
final List<DocumentDeleteEntity<T>> docs = new ArrayList<>();
final List<ErrorEntity> errors = new ArrayList<>();
Expand All @@ -377,12 +377,12 @@ protected <T> ResponseDeserializer<MultiDocumentEntity<DocumentDeleteEntity<T>>>
for (final JsonNode next : body) {
JsonNode isError = next.get(ArangoResponseField.ERROR_FIELD_NAME);
if (isError != null && isError.booleanValue()) {
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class);
final ErrorEntity error = getSerde().deserialize(next, ErrorEntity.class, ctx);
errors.add(error);
documentsAndErrors.add(error);
} else {
Type type = constructParametricType(DocumentDeleteEntity.class, userDataClass);
final DocumentDeleteEntity<T> doc = getSerde().deserialize(next, type);
final DocumentDeleteEntity<T> doc = getSerde().deserialize(next, type, ctx);
docs.add(doc);
documentsAndErrors.add(doc);
}
Expand Down Expand Up @@ -413,7 +413,7 @@ protected InternalRequest deleteIndexRequest(final String id) {
}

protected ResponseDeserializer<String> deleteIndexResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), "/id", String.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), "/id", String.class, ctx);
}

private String createIndexId(final String id) {
Expand Down Expand Up @@ -495,23 +495,23 @@ protected InternalRequest getIndexesRequest() {
}

protected ResponseDeserializer<Collection<IndexEntity>> getIndexesResponseDeserializer() {
return response -> {
return (response, ctx) -> {
Collection<IndexEntity> indexes = new ArrayList<>();
for (JsonNode idx : getSerde().parse(response.getBody(), "/indexes")) {
if (!"inverted".equals(idx.get("type").textValue())) {
indexes.add(getSerde().deserialize(idx, IndexEntity.class));
indexes.add(getSerde().deserialize(idx, IndexEntity.class, ctx));
}
}
return indexes;
};
}

protected ResponseDeserializer<Collection<InvertedIndexEntity>> getInvertedIndexesResponseDeserializer() {
return response -> {
return (response, ctx) -> {
Collection<InvertedIndexEntity> indexes = new ArrayList<>();
for (JsonNode idx : getSerde().parse(response.getBody(), "/indexes")) {
if ("inverted".equals(idx.get("type").textValue())) {
indexes.add(getSerde().deserialize(idx, InvertedIndexEntity.class));
indexes.add(getSerde().deserialize(idx, InvertedIndexEntity.class, ctx));
}
}
return indexes;
Expand Down Expand Up @@ -583,8 +583,8 @@ protected InternalRequest getPermissionsRequest(final String user) {
}

protected ResponseDeserializer<Permissions> getPermissionsResponseDeserialzer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Permissions.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Permissions.class, ctx);
}

}
22 changes: 11 additions & 11 deletions core/src/main/java/com/arangodb/internal/InternalArangoDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,11 @@ protected InternalRequest getServerIdRequest() {
}

protected ResponseDeserializer<ServerRole> getRoleResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), "/role", ServerRole.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), "/role", ServerRole.class, ctx);
}

protected ResponseDeserializer<String> getServerIdResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), "/id", String.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), "/id", String.class, ctx);
}

protected InternalRequest createDatabaseRequest(final DBCreateOptions options) {
Expand All @@ -81,25 +81,25 @@ protected InternalRequest createDatabaseRequest(final DBCreateOptions options) {
}

protected ResponseDeserializer<Boolean> createDatabaseResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Boolean.class);
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
Boolean.class, ctx);
}

protected InternalRequest getDatabasesRequest(final String dbName) {
return request(dbName, RequestType.GET, InternalArangoDatabase.PATH_API_DATABASE);
}

protected ResponseDeserializer<Collection<String>> getDatabaseResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(String.class));
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(String.class), ctx);
}

protected InternalRequest getAccessibleDatabasesForRequest(final String dbName, final String user) {
return request(dbName, RequestType.GET, PATH_API_USER, user, ArangoRequestParam.DATABASE);
}

protected ResponseDeserializer<Collection<String>> getAccessibleDatabasesForResponseDeserializer() {
return response -> {
return (response, ctx) -> {
Iterator<String> names =
getSerde().parse(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER).fieldNames();
final Collection<String> dbs = new ArrayList<>();
Expand Down Expand Up @@ -136,8 +136,8 @@ protected InternalRequest getUserRequest(final String dbName, final String user)
}

protected ResponseDeserializer<Collection<UserEntity>> getUsersResponseDeserializer() {
return response -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(UserEntity.class));
return (response, ctx) -> getSerde().deserialize(response.getBody(), ArangoResponseField.RESULT_JSON_POINTER,
constructListType(UserEntity.class), ctx);
}

protected InternalRequest updateUserRequest(final String dbName, final String user, final UserUpdateOptions options) {
Expand Down Expand Up @@ -173,10 +173,10 @@ protected InternalRequest executeRequest(final Request<?> request) {
}

protected <T> ResponseDeserializer<Response<T>> responseDeserializer(Class<T> type) {
return response -> new Response<>(
return (response, ctx) -> new Response<>(
response.getResponseCode(),
response.getMeta(),
getSerde().deserializeUserData(response.getBody(), type)
getSerde().deserializeUserData(response.getBody(), type, ctx)
);
}

Expand Down
Loading

0 comments on commit 54c680d

Please sign in to comment.