Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

more stable upcalls from library #4

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/gradle-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
name: Gradle Package

on:
push:
branches:
- master
#push:
# branches:
# - master
pull_request:
branches:
- master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
Expand All @@ -21,20 +22,18 @@ public class EverSdkContext {
private final static System.Logger logger = System.getLogger(EverSdkContext.class.getName());

private final int id;
private final AtomicInteger requestCount;
private final AtomicInteger requestCount = new AtomicInteger();
private final Client.ClientConfig clientConfig;

@JsonIgnore private final Map<Integer, NativeUpcallHandler> requests;
@JsonIgnore private final Map<Integer, CompletableFuture<String>> responses;
@JsonIgnore private final Map<Integer, SdkSubscription> subscriptions;
private final Queue<Integer> requestRemoveQueue = new ConcurrentLinkedDeque<>();

@JsonIgnore private final Map<Integer, NativeUpcallHandler> requests = new ConcurrentHashMap<>();
@JsonIgnore private final Map<Integer, CompletableFuture<String>> responses = new ConcurrentHashMap<>();
@JsonIgnore private final Map<Integer, SdkSubscription> subscriptions = new ConcurrentHashMap<>();

public EverSdkContext(int id, Client.ClientConfig clientConfig) {
this.id = id;
this.clientConfig = clientConfig;
this.requestCount = new AtomicInteger();
this.requests = new HashMap<>();
this.responses = new HashMap<>();
this.subscriptions = new HashMap<>();
}

// public <R> R sync(R calleeFunction) {
Expand Down Expand Up @@ -80,7 +79,7 @@ public <T, P> T callEvent(String functionName,
Class<T> resultClass) throws EverSdkException {
final int requestId = requestCountNextVal();
this.subscriptions.put(requestId, new SdkSubscription(consumer));
addRequest(requestId, functionName, params);
addRequest(requestId, functionName, params, true);
return awaitSyncResponse(requestId, resultClass);
}

Expand All @@ -97,7 +96,7 @@ public <T, P> T callEvent(String functionName,
*/
public <T, P> T call(String functionName, P params, Class<T> resultClass) throws EverSdkException {
final int requestId = requestCountNextVal();
addRequest(requestId, functionName, params);
addRequest(requestId, functionName, params, true);
return awaitSyncResponse(requestId, resultClass);
}

Expand All @@ -111,8 +110,7 @@ public <T, P> T call(String functionName, P params, Class<T> resultClass) throws
*/
public <P> void callVoid(String functionName, P params) throws EverSdkException {
final int requestId = requestCountNextVal();
addRequest(requestId, functionName, params);
finishResponse(requestId);
addRequest(requestId, functionName, params, false);
}

public int requestCountNextVal() {
Expand Down Expand Up @@ -159,8 +157,12 @@ public void addEvent(int requestId, String responseString) {
}

public void finishRequest(int requestId) {
this.requests.remove(requestId);
//this.responses.remove(requestId);
requestRemoveQueue.add(requestId);
if (requestRemoveQueue.size() > 10) {
this.requests.remove(requestRemoveQueue.poll());
}
logger.log(System.Logger.Level.DEBUG,
() -> "Requests current size: " + this.requests.size());
this.subscriptions.remove(requestId);
}

Expand Down Expand Up @@ -241,10 +243,12 @@ private <R> R awaitSyncResponse(int requestId, Class<R> resultClass) throws Ever
* @param <P>
* @throws EverSdkException
*/
private <P> void addRequest(int requestId, String functionName, P params) {
final NativeUpcallHandler handler = new NativeUpcallHandler(this.id);
private <P> void addRequest(int requestId, String functionName, P params, boolean hasResponse) {
final NativeUpcallHandler handler = new NativeUpcallHandler(this.id, hasResponse);
this.requests.put(requestId, handler);
this.responses.put(requestId, new CompletableFuture<>());
if (hasResponse) {
this.responses.put(requestId, new CompletableFuture<>());
}
var paramsJson = processParams(params);
NativeMethods.tcRequest(this.id, functionName, paramsJson, Arena.ofAuto(), requestId, handler);
logger.log(System.Logger.Level.TRACE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ public class NativeMethods {

public static String tcCreateContext(String configJson) {
Arena offHeapMemory = Arena.ofAuto();
SegmentAllocator textAllocator = SegmentAllocator.prefixAllocator(offHeapMemory.allocate(constants$0.const$0));
//SegmentAllocator textAllocator = SegmentAllocator.prefixAllocator(offHeapMemory.allocate(constants$0.const$0));
MemorySegment handle = ton_client.tc_create_context(NativeStrings.toRust(configJson, offHeapMemory));
String s = NativeStrings.toJava(
ton_client.tc_read_string(
textAllocator,
RuntimeHelper.CONSTANT_ALLOCATOR,
handle
),
offHeapMemory
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
package tech.deplant.java4ever.binding.ffi;

import tech.deplant.commons.Strings;
import tech.deplant.java4ever.binding.EverSdk;

import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.util.Optional;

record NativeUpcallHandler(int contextId) implements tc_response_handler_t {
record NativeUpcallHandler(int contextId, boolean hasResponse) implements tc_response_handler_t {

private final static System.Logger logger = System.getLogger(NativeUpcallHandler.class.getName());

Expand All @@ -26,37 +24,47 @@ record NativeUpcallHandler(int contextId) implements tc_response_handler_t {
@Override
public void apply(int request_id, MemorySegment params_json, int response_type, boolean finished) {

final String responseString = Strings.notEmptyElse(NativeStrings.toJava(params_json, Arena.ofAuto()), "{}");
try {

final String responseString = NativeStrings.toJava(params_json, Arena.ofAuto());

logger.log(System.Logger.Level.TRACE,
() -> "CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
request_id,
response_type,
finished,
responseString));
var optionalCtx = Optional.ofNullable(EverSdk.getContext(contextId()));
logger.log(System.Logger.Level.TRACE,
() -> "CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
request_id,
response_type,
finished,
responseString));
var optionalCtx = Optional.ofNullable(EverSdk.getContext(contextId()));

if (optionalCtx.isEmpty()) {
if (optionalCtx.isEmpty()) {
logger.log(System.Logger.Level.ERROR,
() -> "Context not found! CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
request_id,
response_type,
finished,
responseString));
} else {
var ctx = optionalCtx.get();
if (response_type == ton_client.tc_response_success() && hasResponse()) {
ctx.addResponse(request_id, responseString);
} else if (response_type == ton_client.tc_response_error()) {
ctx.addError(request_id, responseString);
} else if (response_type >= 100) {
ctx.addEvent(request_id, responseString);
}

// if "final" flag received, let's remove everything
if (finished) {
EverSdk.getContext(contextId()).finishRequest(request_id);
}
}
} catch (Exception e) {
logger.log(System.Logger.Level.ERROR,
() -> "Context not found! CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId,
() -> "Unexpected EVER-SDK interop error! CTX:%d REQ:%d TYPE:%d FINISHED:%s JSON:%s".formatted(contextId(),
request_id,
response_type,
finished,
responseString));
} else {
var ctx = optionalCtx.get();
if (response_type == ton_client.tc_response_success()) {
ctx.addResponse(request_id, responseString);
} else if (response_type == ton_client.tc_response_error()) {
ctx.addError(request_id, responseString);
} else if (response_type >= 100) {
ctx.addEvent(request_id, responseString);
}

// if "final" flag received, let's remove everything
if (finished) {
EverSdk.getContext(contextId()).finishRequest(request_id);
}
params_json.toString()));
}
}
}
Loading