Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(services/redis): add support of list operation #5304

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ services-pcloud = []
services-persy = ["dep:persy", "internal-tokio-rt"]
services-postgresql = ["dep:sqlx", "sqlx?/postgres"]
services-redb = ["dep:redb", "internal-tokio-rt"]
services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp"]
services-redis = ["dep:redis", "dep:bb8", "redis?/tokio-rustls-comp", "dep:ouroboros"]
services-redis-native-tls = ["services-redis", "redis?/tokio-native-tls-comp"]
services-rocksdb = ["dep:rocksdb", "internal-tokio-rt"]
services-s3 = [
Expand Down
92 changes: 82 additions & 10 deletions core/src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,13 @@
// under the License.

use bb8::RunError;
use futures::Stream;
use futures::StreamExt;
use http::Uri;
use ouroboros::self_referencing;
use redis::cluster::ClusterClient;
use redis::cluster::ClusterClientBuilder;
use redis::AsyncIter;
use redis::Client;
use redis::ConnectionAddr;
use redis::ConnectionInfo;
Expand All @@ -27,6 +31,9 @@ use redis::RedisConnectionInfo;
use std::fmt::Debug;
use std::fmt::Formatter;
use std::path::PathBuf;
use std::pin::Pin;
use std::task::Context;
use std::task::Poll;
use std::time::Duration;
use tokio::sync::OnceCell;

Expand Down Expand Up @@ -291,8 +298,23 @@ impl Debug for Adapter {

impl Adapter {
async fn conn(&self) -> Result<bb8::PooledConnection<'_, RedisConnectionManager>> {
let pool = self
.conn
let pool = self.pool().await?;
Adapter::conn_from_pool(pool).await
}

async fn conn_from_pool(
pool: &bb8::Pool<RedisConnectionManager>,
) -> Result<bb8::PooledConnection<RedisConnectionManager>> {
pool.get().await.map_err(|err| match err {
RunError::TimedOut => {
Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
}
RunError::User(err) => err,
})
}

async fn pool(&self) -> Result<&bb8::Pool<RedisConnectionManager>> {
self.conn
.get_or_try_init(|| async {
bb8::Pool::builder()
.build(self.get_redis_connection_manager())
Expand All @@ -302,13 +324,7 @@ impl Adapter {
.set_source(err)
})
})
.await?;
pool.get().await.map_err(|err| match err {
RunError::TimedOut => {
Error::new(ErrorKind::Unexpected, "get connection from pool failed").set_temporary()
}
RunError::User(err) => err,
})
.await
}

fn get_redis_connection_manager(&self) -> RedisConnectionManager {
Expand All @@ -326,8 +342,43 @@ impl Adapter {
}
}

#[self_referencing]
struct RedisAsyncConnIter<'a> {
conn: bb8::PooledConnection<'a, RedisConnectionManager>,

#[borrows(mut conn)]
#[not_covariant]
iter: AsyncIter<'this, String>,
}

#[self_referencing]
pub struct RedisScanner {
pool: bb8::Pool<RedisConnectionManager>,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could extract RedisCore as other services do and use Arc<RedisCore> here to avoid duplicate calls like conn_from_pool.

path: String,

#[borrows(pool, path)]
#[not_covariant]
inner: RedisAsyncConnIter<'this>,
}

unsafe impl Sync for RedisScanner {}

impl Stream for RedisScanner {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, how about implement kv::Scan directly without go through Stream?

type Item = Result<String>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.with_inner_mut(|s| s.with_iter_mut(|v| v.poll_next_unpin(cx).map(|v| v.map(Ok))))
}
}

impl kv::Scan for RedisScanner {
async fn next(&mut self) -> Result<Option<String>> {
<Self as StreamExt>::next(self).await.transpose()
}
}

impl kv::Adapter for Adapter {
type Scanner = ();
type Scanner = RedisScanner;

fn info(&self) -> kv::Info {
kv::Info::new(
Expand All @@ -336,6 +387,12 @@ impl kv::Adapter for Adapter {
Capability {
read: true,
write: true,
// due to limitation of Redis itself,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, this is the first time I've learned this. Would you like to add a note in the documents about this backend to make it clear to users?

// on cluster mode we cannot get full list of keys via SCAN,
// so here we disable it on cluster mode to avoid confusions.
// TODO: we can perform multiple SCAN on each cluster node
// and merge the result to simulate the behavior of list here.
list: self.cluster_client.is_none(),

..Default::default()
},
Expand Down Expand Up @@ -366,4 +423,19 @@ impl kv::Adapter for Adapter {
conn.append(key, value).await?;
Ok(())
}

async fn scan(&self, path: &str) -> Result<Self::Scanner> {
let pool = self.pool().await?.clone();

RedisScanner::try_new_async_send(pool, path.to_string(), |pool, path| {
Box::pin(async {
let conn = Adapter::conn_from_pool(pool).await?;
RedisAsyncConnIter::try_new_async_send(conn, |conn| {
Box::pin(async { conn.scan(path).await })
})
.await
})
})
.await
}
}
13 changes: 13 additions & 0 deletions core/src/services/redis/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use redis::cluster::ClusterClient;
use redis::cluster_async::ClusterConnection;
use redis::from_redis_value;
use redis::AsyncCommands;
use redis::AsyncIter;
use redis::Client;
use redis::RedisError;

Expand Down Expand Up @@ -105,6 +106,18 @@ impl RedisConnection {
}
Ok(())
}

pub async fn scan(&mut self, prefix: &str) -> crate::Result<AsyncIter<'_, String>> {
let pattern = format!("{}*", prefix);
Ok(match self {
RedisConnection::Normal(ref mut conn) => {
conn.scan_match(pattern).await.map_err(format_redis_error)?
}
RedisConnection::Cluster(ref mut conn) => {
conn.scan_match(pattern).await.map_err(format_redis_error)?
}
})
}
}

#[derive(Clone)]
Expand Down
2 changes: 1 addition & 1 deletion core/src/services/redis/docs.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This service can be used to:
- [x] delete
- [x] copy
- [x] rename
- [ ] ~~list~~
- [x] list
- [ ] ~~presign~~
- [ ] blocking

Expand Down
2 changes: 1 addition & 1 deletion fixtures/redis/docker-compose-kvrocks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,6 @@ version: '3.8'

services:
redis:
image: apache/kvrocks:2.5.1
image: apache/kvrocks:2.10.1
ports:
- '6379:6666'
Loading