Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] Correct transaction interfaces #90

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ os:
# - windows # TODO: https://github.com/pingcap/kvproto/issues/355
- osx
rust:
# Requires nightly for now, stable can be re-enabled when 1.36 is stable.
# Requires nightly for now, stable can be re-enabled when async/await is stable.
# - stable
- nightly
env:
Expand Down
6 changes: 2 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ This is an open source (Apache 2) project hosted by the Cloud Native Computing F

## Using the client

The TiKV client is a Rust library (crate). It requires version 1.36 of the compiler and standard libraries (which will be stable from the 4th July 2019, see below for ensuring compatibility).
The TiKV client is a Rust library (crate). It requires a nightly Rust compiler with async/await support.

To use this crate in your project, add it as a dependency in your `Cargo.toml`:

Expand All @@ -28,8 +28,6 @@ The client requires a Git dependency until we can [publish it](https://github.co

There are [examples](examples) which show how to use the client in a Rust program.

The examples and documentation use async/await syntax. This is a new feature in Rust and is currently unstable. To use async/await you'll need to add the feature flag `#![async_await]` to your crate and use a nightly compiler (see below).

## Access the documentation

We recommend using the cargo-generated documentation to browse and understand the API. We've done
Expand All @@ -52,7 +50,7 @@ To check what version of Rust you are using, run
rustc --version
```

You'll see something like `rustc 1.36.0-nightly (a784a8022 2019-05-09)` where the `1.36.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use
You'll see something like `rustc 1.38.0-nightly (4b65a86eb 2019-07-15)` where the `1.38.0` is the toolchain version, and `nightly` is the channel (stable/beta/nightly). To install another toolchain use

```bash
rustup toolchain install nightly
Expand Down
38 changes: 13 additions & 25 deletions examples/transaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,28 +7,19 @@ mod common;
use crate::common::parse_args;
use futures::prelude::*;
use std::ops::RangeBounds;
use tikv_client::{
transaction::{Client, IsolationLevel},
Config, Key, KvPair, Value,
};
use tikv_client::{transaction::Client, Config, Key, KvPair, Value};

async fn puts(client: &Client, pairs: impl IntoIterator<Item = impl Into<KvPair>>) {
let mut txn = client.begin();
future::join_all(
pairs
.into_iter()
.map(Into::into)
.map(|p| txn.set(p.key().clone(), p.value().clone())),
)
.await
.into_iter()
.collect::<Result<Vec<()>, _>>()
.expect("Could not set key value pairs");
let mut txn = client.begin().await.expect("Could not begin a transaction");
for pair in pairs {
let (key, value) = pair.into().into();
txn.set(key, value);
}
txn.commit().await.expect("Could not commit transaction");
}

async fn get(client: &Client, key: Key) -> Value {
let txn = client.begin();
let txn = client.begin().await.expect("Could not begin a transaction");
txn.get(key).await.expect("Could not get value")
}

Expand All @@ -37,6 +28,8 @@ async fn get(client: &Client, key: Key) -> Value {
async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
client
.begin()
.await
.expect("Could not begin a transaction")
.scan(range)
.into_stream()
.take_while(move |r| {
Expand All @@ -53,15 +46,10 @@ async fn scan(client: &Client, range: impl RangeBounds<Key>, mut limit: usize) {
}

async fn dels(client: &Client, keys: impl IntoIterator<Item = Key>) {
let mut txn = client.begin();
txn.set_isolation_level(IsolationLevel::ReadCommitted);
let _: Vec<()> = stream::iter(keys.into_iter())
.then(|p| {
txn.delete(p)
.unwrap_or_else(|e| panic!("error in delete: {:?}", e))
})
.collect()
.await;
let mut txn = client.begin().await.expect("Could not begin a transaction");
for key in keys {
txn.delete(key);
}
txn.commit().await.expect("Could not commit transaction");
}

Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// Long and nested future chains can quickly result in large generic types.
#![type_length_limit = "16777216"]
#![allow(clippy::redundant_closure)]
#![feature(async_await)]

//! This crate provides a clean, ready to use client for [TiKV](https://github.com/tikv/tikv), a
//! distributed transactional Key-Value database written in Rust.
Expand Down
6 changes: 5 additions & 1 deletion src/rpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::{
kv::BoundRange,
raw::ColumnFamily,
rpc::{
pd::{PdClient, Region, RegionId, RetryClient, StoreId},
pd::{PdClient, Region, RegionId, RetryClient, StoreId, Timestamp},
security::SecurityManager,
tikv::KvClient,
Address, RawContext, Store, TxnContext,
Expand Down Expand Up @@ -225,6 +225,10 @@ impl<PdC: PdClient> RpcClient<PdC> {
future::err(Error::unimplemented())
}

pub fn get_timestamp(self: Arc<Self>) -> impl Future<Output = Result<Timestamp>> {
Arc::clone(&self.pd).get_timestamp()
}

// Returns a Steam which iterates over the contexts for each region covered by range.
fn regions_for_range(
self: Arc<Self>,
Expand Down
2 changes: 1 addition & 1 deletion src/rpc/pd/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl Region {
}
}

#[derive(Eq, PartialEq, Debug)]
#[derive(Eq, PartialEq, Debug, Clone, Copy)]
pub struct Timestamp {
pub physical: i64,
pub logical: i64,
Expand Down
43 changes: 26 additions & 17 deletions src/transaction/client.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::{Snapshot, Timestamp, Transaction};
use crate::{Config, Error};
use crate::rpc::RpcClient;
use crate::{Config, Result};

use derive_new::new;
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::pin::Pin;
use std::sync::Arc;

/// The TiKV transactional `Client` is used to issue requests to the TiKV server and PD cluster.
pub struct Client;
pub struct Client {
rpc: Arc<RpcClient>,
}

impl Client {
/// Creates a new [`Client`](Client) once the [`Connect`](Connect) resolves.
Expand Down Expand Up @@ -38,14 +42,15 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let transaction = client.begin();
/// let mut transaction = client.begin().await.unwrap();
/// // ... Issue some commands.
/// let commit = transaction.commit();
/// let result: () = commit.await.unwrap();
/// # });
/// ```
pub fn begin(&self) -> Transaction {
unimplemented!()
pub async fn begin(&self) -> Result<Transaction> {
let snapshot = self.snapshot().await?;
Ok(Transaction::new(snapshot))
}

/// Gets the latest [`Snapshot`](Snapshot).
Expand All @@ -57,12 +62,13 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let snapshot = client.snapshot();
/// let snapshot = client.snapshot().await.unwrap();
/// // ... Issue some commands.
/// # });
/// ```
pub fn snapshot(&self) -> Snapshot {
unimplemented!()
pub async fn snapshot(&self) -> Result<Snapshot> {
let timestamp = self.current_timestamp().await?;
self.snapshot_at(timestamp).await
}

/// Gets a [`Snapshot`](Snapshot) at the given point in time.
Expand All @@ -74,13 +80,13 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let timestamp = Timestamp { physical: 1564474902, logical: 1 };
/// let timestamp = Timestamp { physical: 1564481750172, logical: 1 };
/// let snapshot = client.snapshot_at(timestamp);
/// // ... Issue some commands.
/// # });
/// ```
pub fn snapshot_at(&self, _timestamp: Timestamp) -> Snapshot {
unimplemented!()
pub async fn snapshot_at(&self, timestamp: Timestamp) -> Result<Snapshot> {
Ok(Snapshot::new(timestamp))
}

/// Retrieves the current [`Timestamp`](Timestamp).
Expand All @@ -92,11 +98,11 @@ impl Client {
/// # futures::executor::block_on(async {
/// let connect = Client::connect(Config::default());
/// let client = connect.await.unwrap();
/// let timestamp = client.current_timestamp();
/// let timestamp = client.current_timestamp().await.unwrap();
/// # });
/// ```
pub fn current_timestamp(&self) -> Timestamp {
unimplemented!()
pub async fn current_timestamp(&self) -> Result<Timestamp> {
Arc::clone(&self.rpc).get_timestamp().await
}
}

Expand All @@ -120,10 +126,13 @@ pub struct Connect {
}

impl Future for Connect {
type Output = Result<Client, Error>;
type Output = Result<Client>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _config = &self.config;
unimplemented!()
let config = &self.config;
// TODO: RpcClient::connect currently uses a blocking implementation.
// Make it asynchronous later.
let rpc = Arc::new(RpcClient::connect(config)?);
Poll::Ready(Ok(Client { rpc }))
}
}
4 changes: 2 additions & 2 deletions src/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
//!

pub use self::client::{Client, Connect};
pub use self::requests::{BatchGet, Commit, Delete, Get, LockKeys, Rollback, Scanner, Set};
pub use self::transaction::{IsolationLevel, Snapshot, Transaction, TxnInfo};
pub use self::requests::Scanner;
pub use self::transaction::{Snapshot, Transaction, TxnInfo};
pub use super::rpc::Timestamp;

use crate::{Key, Value};
Expand Down
128 changes: 1 addition & 127 deletions src/transaction/requests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
// Copyright 2019 TiKV Project Authors. Licensed under Apache-2.0.

use super::Transaction;
use crate::{Error, Key, KvPair, Value};
use crate::{Error, KvPair};

use derive_new::new;
use futures::prelude::*;
use futures::task::{Context, Poll};
use std::pin::Pin;
Expand All @@ -20,127 +18,3 @@ impl Stream for Scanner {
unimplemented!()
}
}

/// An unresolved [`Transaction::get`](Transaction::get) request.
///
/// Once resolved this request will result in the fetching of the value associated with the given
/// key.
#[derive(new)]
pub struct Get {
key: Key,
}

impl Future for Get {
type Output = Result<Value, Error>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _key = &self.key;
unimplemented!()
}
}

/// An unresolved [`Transaction::batch_get`](Transaction::batch_get) request.
///
/// Once resolved this request will result in the fetching of the values associated with the given
/// keys.
#[derive(new)]
pub struct BatchGet {
keys: Vec<Key>,
}

impl Future for BatchGet {
type Output = Result<Vec<KvPair>, Error>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _keys = &self.keys;
unimplemented!()
}
}

/// An unresolved [`Transaction::commit`](Transaction::commit) request.
///
/// Once resolved this request will result in the committing of the transaction.
#[derive(new)]
pub struct Commit {
txn: Transaction,
}

impl Future for Commit {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _txn = &self.txn;
unimplemented!()
}
}

/// An unresolved [`Transaction::rollback`](Transaction::rollback) request.
///
/// Once resolved this request will result in the rolling back of the transaction.
#[derive(new)]
pub struct Rollback {
txn: Transaction,
}

impl Future for Rollback {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _txn = &self.txn;
unimplemented!()
}
}

/// An unresolved [`Transaction::lock_keys`](Transaction::lock_keys) request.
///
/// Once resolved this request will result in the locking of the given keys.
#[derive(new)]
pub struct LockKeys {
keys: Vec<Key>,
}

impl Future for LockKeys {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _keys = &self.keys;
unimplemented!()
}
}

/// An unresolved [`Transaction::set`](Transaction::set) request.
///
/// Once resolved this request will result in the setting of the value associated with the given
/// key.
#[derive(new)]
pub struct Set {
key: Key,
value: Value,
}

impl Future for Set {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _key = &self.key;
let _value = &self.value;
unimplemented!()
}
}

/// An unresolved [`Transaction::delete`](Transaction::delete) request.
///
/// Once resolved this request will result in the deletion of the given key.
#[derive(new)]
pub struct Delete {
key: Key,
}

impl Future for Delete {
type Output = Result<(), Error>;

fn poll(self: Pin<&mut Self>, _cx: &mut Context) -> Poll<Self::Output> {
let _key = &self.key;
unimplemented!()
}
}
Loading