Skip to content

[OPIK-457] Fix Redis lock keys leak #730

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ health:

distributedLock:
lockTimeoutMS: ${DISTRIBUTED_LOCK_TIME_OUT:-500}
ttlInSeconds: ${DISTRIBUTED_LOCK_TTL_IN_SEC:-5}

redis:
singleNodeUrl: ${REDIS_URL:-}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,20 @@ public class DistributedLockConfig {

@Valid
@JsonProperty
@NotNull private int lockTimeoutMS;
@NotNull private int lockTimeoutMS; // lease time in milliseconds

/**
*
* @param ttlInSeconds
*
* This value has to be considerably higher than the lockTimeoutMS value, as it has to guarantee that the last
* thread to join the queue to acquire the lock will have enough time to execute the action. Then, the lock will be deleted from redis after the @ttlInSeconds.
* <br>
* This is needed as redisson by default doesn't delete the lock from redis after the lease time expires, it just releases the lock. The expiration time will be reset every time the lock is acquired.
*
* */
@Valid
@JsonProperty
@NotNull private int ttlInSeconds; // time to live in seconds

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@
import org.redisson.api.RPermitExpirableSemaphoreReactive;
import org.redisson.api.RedissonReactiveClient;
import org.redisson.api.options.CommonOptions;
import org.redisson.client.RedisException;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

@RequiredArgsConstructor
@Slf4j
Expand All @@ -21,31 +24,54 @@ class RedissonLockService implements LockService {
private final @NonNull RedissonReactiveClient redisClient;
private final @NonNull DistributedLockConfig distributedLockConfig;

private record LockInstance(RPermitExpirableSemaphoreReactive semaphore, String locked) {

public void release() {
semaphore.release(locked)
.subscribe(
__ -> log.debug("Lock {} released successfully", locked),
__ -> log.warn("Lock {} already released", locked));
Comment on lines +32 to +33
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: wrap escaped values in the logs between single quotes.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding both in a next review

}

}

@Override
public <T> Mono<T> executeWithLock(@NonNull Lock lock, @NonNull Mono<T> action) {

RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock, distributedLockConfig.getLockTimeoutMS());
RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock);

log.debug("Trying to lock with {}", lock);

return semaphore
.trySetPermits(1)
.then(Mono.defer(semaphore::acquire))
.flatMap(locked -> runAction(lock, action, locked)
return acquireLock(semaphore)
.flatMap(lockInstance -> runAction(lock, action, lockInstance.locked())
.subscribeOn(Schedulers.boundedElastic())
.doFinally(signalType -> {
semaphore.release(locked).subscribe();
lockInstance.release();
log.debug("Lock {} released", lock);
}));
}

private RPermitExpirableSemaphoreReactive getSemaphore(Lock lock, int lockTimeoutMS) {
private RPermitExpirableSemaphoreReactive getSemaphore(Lock lock) {
return redisClient.getPermitExpirableSemaphore(
CommonOptions
.name(lock.key())
.timeout(Duration.ofMillis(lockTimeoutMS))
.timeout(Duration.ofMillis(distributedLockConfig.getLockTimeoutMS()))
.retryInterval(Duration.ofMillis(10))
.retryAttempts(lockTimeoutMS / 10));
.retryAttempts(distributedLockConfig.getLockTimeoutMS() / 10));
}

private Mono<LockInstance> acquireLock(RPermitExpirableSemaphoreReactive semaphore) {
return Mono.defer(() -> acquire(semaphore))
.retryWhen(Retry.max(3).filter(RedisException.class::isInstance));
}

private Mono<LockInstance> acquire(RPermitExpirableSemaphoreReactive semaphore) {
return semaphore
.setPermits(1)
.then(Mono.defer(
() -> semaphore.acquire(distributedLockConfig.getLockTimeoutMS(), TimeUnit.MILLISECONDS)))
.flatMap(locked -> semaphore.expire(Duration.ofSeconds(distributedLockConfig.getTtlInSeconds()))
.thenReturn(new LockInstance(semaphore, locked)));
}

private <T> Mono<T> runAction(Lock lock, Mono<T> action, String locked) {
Expand All @@ -59,15 +85,16 @@ private <T> Mono<T> runAction(Lock lock, Mono<T> action, String locked) {

@Override
public <T> Flux<T> executeWithLock(@NonNull Lock lock, @NonNull Flux<T> stream) {
RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock, distributedLockConfig.getLockTimeoutMS());

return semaphore
.trySetPermits(1)
.then(Mono.defer(semaphore::acquire))
.flatMapMany(locked -> stream(lock, stream, locked)
RPermitExpirableSemaphoreReactive semaphore = getSemaphore(lock);

log.debug("Trying to lock with {}", lock);

return acquireLock(semaphore)
.flatMapMany(lockInstance -> stream(lock, stream, lockInstance.locked())
.subscribeOn(Schedulers.boundedElastic())
.doFinally(signalType -> {
semaphore.release(locked).subscribe();
lockInstance.release();
log.debug("Lock {} released", lock);
}));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
@UtilityClass
public class TestDropwizardAppExtensionUtils {

public record CustomConfig(String key, String value) {
}

@Builder
public record AppContextConfig(
String jdbcUrl,
Expand All @@ -42,7 +45,8 @@ public record AppContextConfig(
String usageReportUrl,
String metadataVersion,
EventBus mockEventBus,
boolean corsEnabled) {
boolean corsEnabled,
List<CustomConfig> customConfigs) {
}

public static TestDropwizardAppExtension newTestDropwizardAppExtension(String jdbcUrl,
Expand Down Expand Up @@ -86,36 +90,36 @@ public static TestDropwizardAppExtension newTestDropwizardAppExtension(

public static TestDropwizardAppExtension newTestDropwizardAppExtension(AppContextConfig appContextConfig) {

var list = new ArrayList<String>();
list.add("database.url: " + appContextConfig.jdbcUrl());
var configs = new ArrayList<String>();
configs.add("database.url: " + appContextConfig.jdbcUrl());

if (appContextConfig.jdbcUserName() != null) {
list.add("database.user: " + appContextConfig.jdbcUserName());
configs.add("database.user: " + appContextConfig.jdbcUserName());
}

if (appContextConfig.jdbcDriverClass() != null) {
list.add("database.driverClass: " + appContextConfig.jdbcDriverClass());
configs.add("database.driverClass: " + appContextConfig.jdbcDriverClass());
}

if (appContextConfig.awsJdbcDriverPlugins() != null) {
list.add("database.properties.wrapperPlugins: " + appContextConfig.awsJdbcDriverPlugins());
configs.add("database.properties.wrapperPlugins: " + appContextConfig.awsJdbcDriverPlugins());
}

if (appContextConfig.databaseAnalyticsFactory() != null) {
list.add("databaseAnalytics.port: " + appContextConfig.databaseAnalyticsFactory().getPort());
list.add("databaseAnalytics.username: " + appContextConfig.databaseAnalyticsFactory().getUsername());
list.add("databaseAnalytics.password: " + appContextConfig.databaseAnalyticsFactory().getPassword());
configs.add("databaseAnalytics.port: " + appContextConfig.databaseAnalyticsFactory().getPort());
configs.add("databaseAnalytics.username: " + appContextConfig.databaseAnalyticsFactory().getUsername());
configs.add("databaseAnalytics.password: " + appContextConfig.databaseAnalyticsFactory().getPassword());
}

if (appContextConfig.runtimeInfo() != null) {
list.add("authentication.enabled: true");
list.add("authentication.sdk.url: "
configs.add("authentication.enabled: true");
configs.add("authentication.sdk.url: "
+ "%s/opik/auth".formatted(appContextConfig.runtimeInfo().getHttpsBaseUrl()));
list.add("authentication.ui.url: "
configs.add("authentication.ui.url: "
+ "%s/opik/auth-session".formatted(appContextConfig.runtimeInfo().getHttpsBaseUrl()));

if (appContextConfig.cacheTtlInSeconds() != null) {
list.add("authentication.apiKeyResolutionCacheTTLInSec: " + appContextConfig.cacheTtlInSeconds());
configs.add("authentication.apiKeyResolutionCacheTTLInSec: " + appContextConfig.cacheTtlInSeconds());
}
}

Expand Down Expand Up @@ -146,46 +150,56 @@ public void run(GuiceyEnvironment environment) {
};

if (appContextConfig.redisUrl() != null) {
list.add("redis.singleNodeUrl: %s".formatted(appContextConfig.redisUrl()));
list.add("redis.sentinelMode: false");
list.add("redis.lockTimeout: 500");
configs.add("redis.singleNodeUrl: %s".formatted(appContextConfig.redisUrl()));
configs.add("redis.sentinelMode: false");
configs.add("redis.lockTimeout: 500");
}

if (appContextConfig.rateLimitEnabled()) {
list.add("rateLimit.enabled: true");
list.add("rateLimit.generalLimit.limit: %d".formatted(appContextConfig.limit()));
list.add("rateLimit.generalLimit.durationInSeconds: %d"
configs.add("rateLimit.enabled: true");
configs.add("rateLimit.generalLimit.limit: %d".formatted(appContextConfig.limit()));
configs.add("rateLimit.generalLimit.durationInSeconds: %d"
.formatted(appContextConfig.limitDurationInSeconds()));

if (appContextConfig.customLimits() != null) {
appContextConfig.customLimits()
.forEach((bucket, limitConfig) -> {
list.add("rateLimit.customLimits.%s.limit: %d".formatted(bucket, limitConfig.limit()));
list.add("rateLimit.customLimits.%s.durationInSeconds: %d".formatted(bucket,
configs.add("rateLimit.customLimits.%s.limit: %d".formatted(bucket, limitConfig.limit()));
configs.add("rateLimit.customLimits.%s.durationInSeconds: %d".formatted(bucket,
limitConfig.durationInSeconds()));
});
}
}

if (appContextConfig.metadataVersion() != null) {
list.add("metadata.version: %s".formatted(appContextConfig.metadataVersion()));
configs.add("metadata.version: %s".formatted(appContextConfig.metadataVersion()));
}

if (appContextConfig.usageReportEnabled()) {
list.add("usageReport.enabled: %s".formatted(true));
configs.add("usageReport.enabled: %s".formatted(true));

if (appContextConfig.usageReportUrl() != null) {
list.add("usageReport.url: %s".formatted(appContextConfig.usageReportUrl()));
configs.add("usageReport.url: %s".formatted(appContextConfig.usageReportUrl()));
}
}

if (appContextConfig.corsEnabled) {
list.add("cors.enabled: true");
configs.add("cors.enabled: true");
}

if (CollectionUtils.isNotEmpty(appContextConfig.customConfigs())) {
appContextConfig
.customConfigs()
.stream()
.filter(customConfig -> configs.stream().noneMatch(s -> s.contains(customConfig.key())))
.forEach(customConfig -> {
configs.add("%s: %s".formatted(customConfig.key(), customConfig.value()));
});
}

return TestDropwizardAppExtension.forApp(OpikApplication.class)
.config("src/test/resources/config-test.yml")
.configOverrides(list.toArray(new String[0]))
.configOverrides(configs.toArray(new String[0]))
.randomPorts()
.hooks(hook)
.create();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,35 @@
import com.comet.opik.api.resources.utils.ClickHouseContainerUtils;
import com.comet.opik.api.resources.utils.MySQLContainerUtils;
import com.comet.opik.api.resources.utils.RedisContainerUtils;
import com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils;
import com.comet.opik.infrastructure.lock.LockService;
import com.redis.testcontainers.RedisContainer;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.redisson.api.RedissonReactiveClient;
import org.testcontainers.clickhouse.ClickHouseContainer;
import org.testcontainers.containers.MySQLContainer;
import org.testcontainers.junit.jupiter.Testcontainers;
import org.testcontainers.lifecycle.Startables;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;
import ru.vyarus.dropwizard.guice.test.jupiter.ext.TestDropwizardAppExtension;

import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.AppContextConfig;
import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.CustomConfig;
import static com.comet.opik.api.resources.utils.TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: please clean up all these JUnit assertions and move to assertJ.

import static org.junit.jupiter.api.Assertions.assertTrue;

@Testcontainers(parallel = true)
@Slf4j
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class RedissonLockServiceIntegrationTest {

Expand All @@ -39,15 +45,18 @@ class RedissonLockServiceIntegrationTest {
private static final TestDropwizardAppExtension app;

static {
MYSQL.start();
CLICKHOUSE.start();
REDIS.start();

Startables.deepStart(REDIS, MYSQL, CLICKHOUSE).join();
var databaseAnalyticsFactory = ClickHouseContainerUtils.newDatabaseAnalyticsFactory(CLICKHOUSE,
ClickHouseContainerUtils.DATABASE_NAME);

app = TestDropwizardAppExtensionUtils.newTestDropwizardAppExtension(MYSQL.getJdbcUrl(),
databaseAnalyticsFactory, null, REDIS.getRedisURI());
app = newTestDropwizardAppExtension(
AppContextConfig.builder()
.jdbcUrl(MYSQL.getJdbcUrl())
.databaseAnalyticsFactory(databaseAnalyticsFactory)
.redisUrl(REDIS.getRedisURI())
.customConfigs(List.of(new CustomConfig("distributedLock.lockTimeoutMS", "100"),
new CustomConfig("distributedLock.ttlInSeconds", "1")))
.build());
}

@Test
Expand Down Expand Up @@ -107,4 +116,31 @@ void testExecuteWithLock_AddIfAbsent_Flux(LockService lockService) {
assertTrue(sharedList.contains("C"));
}

@Test
void testExecuteWithLock_LockShouldHaveBeenEvicted(LockService lockService, RedissonReactiveClient redisClient) {
LockService.Lock lock = new LockService.Lock(UUID.randomUUID(), "test-lock");
List<String> sharedList = new ArrayList<>();

lockService.executeWithLock(lock, Mono.delay(Duration.ofMillis(100)).then(Mono.fromCallable(() -> {
sharedList.add("A");
return true;
}))).block();

Mono.delay(Duration.ofMillis(1500)).block();

assertFalse(redisClient.getBucket(lock.key()).isExists().block());

lockService.executeWithLock(lock, Mono.delay(Duration.ofMillis(100)).then(Mono.fromCallable(() -> {
sharedList.add("B");
return true;
}))).block();

assertTrue(sharedList.contains("A"));
assertTrue(sharedList.contains("B"));

Mono.delay(Duration.ofSeconds(1)).block();

assertFalse(redisClient.getBucket(lock.key()).isExists().block());
}

}
1 change: 1 addition & 0 deletions apps/opik-backend/src/test/resources/config-test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ health:

distributedLock:
lockTimeout: 500
ttlInSeconds: 1

redis:
singleNodeUrl:
Expand Down
Loading