Skip to content

Commit

Permalink
Merge pull request #4 from deplant/stable-upcall
Browse files Browse the repository at this point in the history
more stable upcalls from library
  • Loading branch information
cassandrus authored Mar 22, 2024
2 parents 630d24e + 42e9957 commit 843afb4
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 50 deletions.
6 changes: 3 additions & 3 deletions .github/workflows/gradle-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
name: Gradle Package

on:
push:
branches:
- master
#push:
# branches:
# - master
pull_request:
branches:
- master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand All @@ -21,20 +22,18 @@ public class EverSdkContext {
private final static System.Logger logger = System.getLogger(EverSdkContext.class.getName());

private final int id;
private final AtomicInteger requestCount;
private final AtomicInteger requestCount = new AtomicInteger();
private final Client.ClientConfig clientConfig;

@JsonIgnore private final Map<Integer, NativeUpcallHandler> requests;
@JsonIgnore private final Map<Integer, CompletableFuture<String>> responses;
@JsonIgnore private final Map<Integer, SdkSubscription> subscriptions;
private final Queue<Integer> requestRemoveQueue = new ConcurrentLinkedDeque<>();

@JsonIgnore private final Map<Integer, NativeUpcallHandler> requests = new ConcurrentHashMap<>();
@JsonIgnore private final Map<Integer, CompletableFuture<String>> responses = new ConcurrentHashMap<>();
@JsonIgnore private final Map<Integer, SdkSubscription> subscriptions = new ConcurrentHashMap<>();

public EverSdkContext(int id, Client.ClientConfig clientConfig) {
this.id = id;
this.clientConfig = clientConfig;
this.requestCount = new AtomicInteger();
this.requests = new HashMap<>();
this.responses = new HashMap<>();
this.subscriptions = new HashMap<>();
}

// public <R> R sync(R calleeFunction) {
Expand Down Expand Up @@ -80,7 +79,7 @@ public <T, P> T callEvent(String functionName,
Class<T> resultClass) throws EverSdkException {
final int requestId = requestCountNextVal();
this.subscriptions.put(requestId, new SdkSubscription(consumer));
addRequest(requestId, functionName, params);
addRequest(requestId, functionName, params, true);
return awaitSyncResponse(requestId, resultClass);
}

Expand All @@ -97,7 +96,7 @@ public <T, P> T callEvent(String functionName,
*/
public <T, P> T call(String functionName, P params, Class<T> resultClass) throws EverSdkException {
final int requestId = requestCountNextVal();
addRequest(requestId, functionName, params);
addRequest(requestId, functionName, params, true);
return awaitSyncResponse(requestId, resultClass);
}

Expand All @@ -111,8 +110,7 @@ public <T, P> T call(String functionName, P params, Class<T> resultClass) throws
*/
public <P> void callVoid(String functionName, P params) throws EverSdkException {
final int requestId = requestCountNextVal();
addRequest(requestId, functionName, params);
finishResponse(requestId);
addRequest(requestId, functionName, params, false);
}

public int requestCountNextVal() {
Expand Down Expand Up @@ -159,8 +157,12 @@ public void addEvent(int requestId, String responseString) {
}

public void finishRequest(int requestId) {
this.requests.remove(requestId);
//this.responses.remove(requestId);
requestRemoveQueue.add(requestId);
if (requestRemoveQueue.size() > 10) {
this.requests.remove(requestRemoveQueue.poll());
}
logger.log(System.Logger.Level.DEBUG,
() -> "Requests current size: " + this.requests.size());
this.subscriptions.remove(requestId);
}

Expand Down Expand Up @@ -241,10 +243,12 @@ private <R> R awaitSyncResponse(int requestId, Class<R> resultClass) throws Ever
* @param <P>
* @throws EverSdkException
*/
private <P> void addRequest(int requestId, String functionName, P params) {
final NativeUpcallHandler handler = new NativeUpcallHandler(this.id);
private <P> void addRequest(int requestId, String functionName, P params, boolean hasResponse) {
final NativeUpcallHandler handler = new NativeUpcallHandler(this.id, hasResponse);
this.requests.put(requestId, handler);
this.responses.put(requestId, new CompletableFuture<>());
if (hasResponse) {
this.responses.put(requestId, new CompletableFuture<>());
}
var paramsJson = processParams(params);
NativeMethods.tcRequest(this.id, functionName, paramsJson, Arena.ofAuto(), requestId, handler);
logger.log(System.Logger.Level.TRACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ public class NativeMethods {

public static String tcCreateContext(String configJson) {
Arena offHeapMemory = Arena.ofAuto();
SegmentAllocator textAllocator = SegmentAllocator.prefixAllocator(offHeapMemory.allocate(constants$0.const$0));
//SegmentAllocator textAllocator = SegmentAllocator.prefixAllocator(offHeapMemory.allocate(constants$0.const$0));
MemorySegment handle = ton_client.tc_create_context(NativeStrings.toRust(configJson, offHeapMemory));
String s = NativeStrings.toJava(
ton_client.tc_read_string(
textAllocator,
RuntimeHelper.CONSTANT_ALLOCATOR,
handle
),
offHeapMemory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package tech.deplant.java4ever.binding.ffi;

import tech.deplant.commons.Strings;
import tech.deplant.java4ever.binding.EverSdk;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.Optional;

record NativeUpcallHandler(int contextId) implements tc_response_handler_t {
record NativeUpcallHandler(int contextId, boolean hasResponse) implements tc_response_handler_t {

private final static System.Logger logger = System.getLogger(NativeUpcallHandler.class.getName());

Expand All @@ -26,37 +24,47 @@ record NativeUpcallHandler(int contextId) implements tc_response_handler_t {
@Override
public void apply(int request_id, MemorySegment params_json, int response_type, boolean finished) {

final String responseString = Strings.notEmptyElse(NativeStrings.toJava(params_json, Arena.ofAuto()), "{}");
try {

final String responseString = NativeStrings.toJava(params_json, Arena.ofAuto());

logger.log(System.Logger.Level.TRACE,
() -> "CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
request_id,
response_type,
finished,
responseString));
var optionalCtx = Optional.ofNullable(EverSdk.getContext(contextId()));
logger.log(System.Logger.Level.TRACE,
() -> "CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
request_id,
response_type,
finished,
responseString));
var optionalCtx = Optional.ofNullable(EverSdk.getContext(contextId()));

if (optionalCtx.isEmpty()) {
if (optionalCtx.isEmpty()) {
logger.log(System.Logger.Level.ERROR,
() -> "Context not found! CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
request_id,
response_type,
finished,
responseString));
} else {
var ctx = optionalCtx.get();
if (response_type == ton_client.tc_response_success() && hasResponse()) {
ctx.addResponse(request_id, responseString);
} else if (response_type == ton_client.tc_response_error()) {
ctx.addError(request_id, responseString);
} else if (response_type >= 100) {
ctx.addEvent(request_id, responseString);
}

// if "final" flag received, let's remove everything
if (finished) {
EverSdk.getContext(contextId()).finishRequest(request_id);
}
}
} catch (Exception e) {
logger.log(System.Logger.Level.ERROR,
() -> "Context not found! CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
() -> "Unexpected EVER-SDK interop error! CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId(),
request_id,
response_type,
finished,
responseString));
} else {
var ctx = optionalCtx.get();
if (response_type == ton_client.tc_response_success()) {
ctx.addResponse(request_id, responseString);
} else if (response_type == ton_client.tc_response_error()) {
ctx.addError(request_id, responseString);
} else if (response_type >= 100) {
ctx.addEvent(request_id, responseString);
}

// if "final" flag received, let's remove everything
if (finished) {
EverSdk.getContext(contextId()).finishRequest(request_id);
}
params_json.toString()));
}
}
}

0 comments on commit 843afb4

Please sign in to comment.