Skip to content

Commit

Permalink
services/redis: Implement Write::append with native support (#1651)
Browse files Browse the repository at this point in the history
Signed-off-by: Wenyu Huang <huangwenyuu@outlook.com>
  • Loading branch information
uran0sH authored Mar 17, 2023
1 parent 8b55a4c commit f0a33f3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 3 deletions.
25 changes: 25 additions & 0 deletions src/raw/adapters/kv/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,31 @@ pub trait Adapter: Send + Sync + Debug + Unpin + 'static {
)
.with_operation("kv::Adapter::blocking_scan"))
}

/// Append a key into service
async fn append(&self, path: &str, value: &[u8]) -> Result<()> {
let _ = path;
let _ = value;

Err(Error::new(
ErrorKind::Unsupported,
"kv adapter doesn't support this operation",
)
.with_operation("kv::Adapter::append"))
}

/// Append a key into service
/// in blocking way.
fn blocking_append(&self, path: &str, value: &[u8]) -> Result<()> {
let _ = path;
let _ = value;

Err(Error::new(
ErrorKind::Unsupported,
"kv adapter doesn't support this operation",
)
.with_operation("kv::Adapter::blocking_append"))
}
}

/// Metadata for this key value accessor.
Expand Down
13 changes: 10 additions & 3 deletions src/raw/adapters/kv/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,13 +285,20 @@ impl<S: Adapter> oio::Write for KvWriter<S> {
}

async fn append(&mut self, bs: Bytes) -> Result<()> {
self.buf.extend(bs);

if let Err(e) = self.kv.append(&self.path, bs.to_vec().as_slice()).await {
if e.kind() == ErrorKind::Unsupported {
self.buf.extend(bs);
} else {
return Err(e);
}
}
Ok(())
}

async fn close(&mut self) -> Result<()> {
self.kv.set(&self.path, &self.buf).await?;
if !self.buf.is_empty() {
self.kv.set(&self.path, &self.buf).await?;
}

Ok(())
}
Expand Down
6 changes: 6 additions & 0 deletions src/services/redis/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,12 @@ impl kv::Adapter for Adapter {
let _: () = conn.del(key).await?;
Ok(())
}

async fn append(&self, key: &str, value: &[u8]) -> Result<()> {
let mut conn = self.conn().await?;
conn.append(key, value).await?;
Ok(())
}
}

impl From<RedisError> for Error {
Expand Down

0 comments on commit f0a33f3

Please sign in to comment.