Skip to content

Commit

Permalink
feat(bindings/java): support ConcurrentLimitLayer (#5168)
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun authored Oct 8, 2024
1 parent ccf6801 commit a0e93b5
Show file tree
Hide file tree
Showing 4 changed files with 82 additions and 0 deletions.
13 changes: 13 additions & 0 deletions bindings/java/src/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ use jni::sys::jboolean;
use jni::sys::jfloat;
use jni::sys::jlong;
use jni::JNIEnv;
use opendal::layers::ConcurrentLimitLayer;
use opendal::layers::RetryLayer;
use opendal::Operator;

Expand Down Expand Up @@ -49,3 +50,15 @@ pub extern "system" fn Java_org_apache_opendal_layer_RetryLayer_doLayer(
}
Box::into_raw(Box::new(op.clone().layer(retry))) as jlong
}

#[no_mangle]
pub extern "system" fn Java_org_apache_opendal_layer_ConcurrentLimitLayer_doLayer(
_: JNIEnv,
_: JClass,
op: *mut Operator,
permits: jlong,
) -> jlong {
let op = unsafe { &*op };
let concurrent_limit = ConcurrentLimitLayer::new(permits as usize);
Box::into_raw(Box::new(op.clone().layer(concurrent_limit))) as jlong
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.opendal.layer;

import org.apache.opendal.Layer;

/**
* Users can control how many concurrent connections could be established between
* OpenDAL and underlying storage services.
*
* @see <a href="https://docs.rs/opendal/latest/opendal/layers/struct.ConcurrentLimitLayer.html">ConcurrentLimitLayer's rustdoc</a>
*/
public class ConcurrentLimitLayer extends Layer {
private final long permits;

/**
* Create a new ConcurrentLimitLayer will specify permits.
*
* @param permits concurrent connections could be established
*/
public ConcurrentLimitLayer(long permits) {
if (permits <= 0) {
throw new IllegalArgumentException("permits must be positive");
}

this.permits = permits;
}

@Override
protected long layer(long nativeOp) {
return doLayer(nativeOp, permits);
}

private static native long doLayer(long nativeHandle, long permits);
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@
import lombok.Builder;
import org.apache.opendal.Layer;

/**
* This layer will retry failed operations when {@code Error::is_temporary} returns {@code true}.
* If operation still failed, this layer will set error to Persistent which means error has been retried.
*
* @see <a href="https://docs.rs/opendal/latest/opendal/layers/struct.RetryLayer.html">RetryLayer's rustdoc</a>
*/
@Builder
public class RetryLayer extends Layer {

Expand Down
11 changes: 11 additions & 0 deletions bindings/java/src/test/java/org/apache/opendal/test/LayerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import lombok.Cleanup;
import org.apache.opendal.AsyncOperator;
import org.apache.opendal.Layer;
import org.apache.opendal.layer.ConcurrentLimitLayer;
import org.apache.opendal.layer.RetryLayer;
import org.junit.jupiter.api.Test;

Expand All @@ -38,4 +39,14 @@ void testOperatorWithRetryLayer() {
@Cleanup final AsyncOperator layeredOp = op.layer(retryLayer);
assertThat(layeredOp.info).isNotNull();
}

@Test
void testOperatorWithConcurrentLimitLayer() {
final Map<String, String> conf = new HashMap<>();
conf.put("root", "/opendal/");
final Layer concurrentLimitLayer = new ConcurrentLimitLayer(1024);
@Cleanup final AsyncOperator op = AsyncOperator.of("memory", conf);
@Cleanup final AsyncOperator layeredOp = op.layer(concurrentLimitLayer);
assertThat(layeredOp.info).isNotNull();
}
}

0 comments on commit a0e93b5

Please sign in to comment.