diff --git a/src/coprocessor/endpoint.rs b/src/coprocessor/endpoint.rs index d91736e1176..0eac1b79768 100644 --- a/src/coprocessor/endpoint.rs +++ b/src/coprocessor/endpoint.rs @@ -74,12 +74,16 @@ impl Endpoint { /// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`. /// Returns `Err` if fails. - fn try_parse_request( + fn parse_request( &self, mut req: coppb::Request, peer: Option, is_streaming: bool, ) -> Result<(RequestHandlerBuilder, ReqContext)> { + fail_point!("coprocessor_parse_request", |_| Err(box_err!( + "unsupported tp (failpoint)" + ))); + let (context, data, ranges) = ( req.take_context(), req.take_data(), @@ -163,34 +167,6 @@ impl Endpoint { Ok((builder, req_ctx)) } - /// Parse the raw `Request` to create `RequestHandlerBuilder` and `ReqContext`. - #[inline] - fn parse_request( - &self, - req: coppb::Request, - peer: Option, - is_streaming: bool, - ) -> (RequestHandlerBuilder, ReqContext) { - match self.try_parse_request(req, peer, is_streaming) { - Ok(v) => v, - Err(err) => { - // If there are errors when parsing requests, create a dummy request handler. - let builder = - box |_, _: &_| Ok(cop_util::ErrorRequestHandler::new(err).into_boxed()); - let req_ctx = ReqContext::new( - "invalid", - kvrpcpb::Context::new(), - &[], - Duration::from_secs(60), // Large enough to avoid becoming outdated error - None, - None, - None, - ); - (builder, req_ctx) - } - } - } - /// Get the batch row limit configuration. #[inline] fn get_batch_row_limit(&self, is_streaming: bool) -> usize { @@ -269,39 +245,44 @@ impl Endpoint { }) } - /// Handle a unary request and run on the read pool. Returns a future producing the - /// result, which must be a `Response` and will never fail. If there are errors during - /// handling, they will be embedded in the `Response`. + /// Handle a unary request and run on the read pool. + /// + /// Returns `Err(err)` if the read pool is full. Returns `Ok(future)` in other cases. + /// The future inside may be an error however. fn handle_unary_request( &self, req_ctx: ReqContext, handler_builder: RequestHandlerBuilder, - ) -> impl Future { + ) -> Result> { let engine = self.engine.clone(); let priority = readpool::Priority::from(req_ctx.context.get_priority()); + // box the tracker so that moving it is cheap. let mut tracker = box Tracker::new(req_ctx); - let result = self.read_pool.future_execute(priority, move |ctxd| { - tracker.attach_ctxd(ctxd); - - Self::handle_unary_request_impl(engine, tracker, handler_builder) - }); - - future::result(result) - // If the read pool is full, an error response will be returned directly. + self.read_pool + .future_execute(priority, move |ctxd| { + tracker.attach_ctxd(ctxd); + Self::handle_unary_request_impl(engine, tracker, handler_builder) + }) .map_err(|_| Error::Full) - .flatten() - .or_else(|e| Ok(make_error_response(e))) } + /// Parses and handles a unary request. Returns a future that will never fail. If there are + /// errors during parsing or handling, they will be converted into a `Response` as the success + /// result of the future. #[inline] pub fn parse_and_handle_unary_request( &self, req: coppb::Request, peer: Option, ) -> impl Future { - let (handler_builder, req_ctx) = self.parse_request(req, peer, false); - self.handle_unary_request(req_ctx, handler_builder) + let result_of_future = self.parse_request(req, peer, false).and_then( + |(handler_builder, req_ctx)| self.handle_unary_request(req_ctx, handler_builder), + ); + + future::result(result_of_future) + .flatten() + .or_else(|e| Ok(make_error_response(e))) } /// The real implementation of handling a stream request. @@ -397,62 +378,55 @@ impl Endpoint { .flatten_stream() } - /// Handle a stream request and run on the read pool. Returns a stream producing each - /// result, which must be a `Response` and will never fail. If there are errors during - /// handling, they will be embedded in the `Response`. + /// Handle a stream request and run on the read pool. + /// + /// Returns `Err(err)` if the read pool is full. Returns `Ok(stream)` in other cases. + /// The stream inside may produce errors however. fn handle_stream_request( &self, req_ctx: ReqContext, handler_builder: RequestHandlerBuilder, - ) -> impl Stream { - let (tx, rx) = mpsc::channel::(self.stream_channel_size); + ) -> Result> { + let (tx, rx) = mpsc::channel::>(self.stream_channel_size); let engine = self.engine.clone(); let priority = readpool::Priority::from(req_ctx.context.get_priority()); // Must be created befure `future_execute`, otherwise wait time is not tracked. let mut tracker = box Tracker::new(req_ctx); - let tx1 = tx.clone(); - let result = self.read_pool.future_execute(priority, move |ctxd| { - tracker.attach_ctxd(ctxd); - - Self::handle_stream_request_impl(engine, tracker, handler_builder) - .or_else(|e| Ok::<_, mpsc::SendError<_>>(make_error_response(e))) - // Although returning `Ok()` from `or_else` will continue the stream, - // our stream has already ended when error is returned. - // Thus the stream will not continue any more even after we converting errors - // into a response. - .forward(tx1) - }); + self.read_pool + .future_execute(priority, move |ctxd| { + tracker.attach_ctxd(ctxd); - match result { - Err(_) => { - stream::once::<_, mpsc::SendError<_>>(Ok(make_error_response(Error::Full))) + Self::handle_stream_request_impl(engine, tracker, handler_builder) // Stream + .then(Ok::<_, mpsc::SendError<_>>) // Stream, MpscError> .forward(tx) - .then(|_| { - // ignore sink send failures - Ok::<_, ()>(()) - }) - // Should not be blocked, since the channel is large enough to hold 1 value. - .wait() - .unwrap(); - } - Ok(cpu_future) => { + }) + .map_err(|_| Error::Full) + .and_then(move |cpu_future| { // Keep running stream producer cpu_future.forget(); - } - } - rx + // Returns the stream instead of a future + Ok(rx.then(|r| r.unwrap())) + }) } + /// Parses and handles a stream request. Returns a stream that produce each result in a + /// `Response` and will never fail. If there are errors during parsing or handling, they will + /// be converted into a `Response` as the only stream item. #[inline] pub fn parse_and_handle_stream_request( &self, req: coppb::Request, peer: Option, ) -> impl Stream { - let (handler_builder, req_ctx) = self.parse_request(req, peer, true); - self.handle_stream_request(req_ctx, handler_builder) + let result_of_stream = self.parse_request(req, peer, true).and_then( + |(handler_builder, req_ctx)| self.handle_stream_request(req_ctx, handler_builder), + ); // Result, Error> + + stream::once(result_of_stream) // Stream, Error> + .flatten() // Stream + .or_else(|e| Ok(make_error_response(e))) // Stream } } @@ -648,6 +622,7 @@ mod tests { box |_, _: &_| Ok(UnaryFixture::new(Ok(coppb::Response::new())).into_boxed()); let resp = cop .handle_unary_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .wait() .unwrap(); assert!(resp.get_other_error().is_empty()); @@ -664,11 +639,12 @@ mod tests { None, None, ); - let resp = cop - .handle_unary_request(outdated_req_ctx, handler_builder) - .wait() - .unwrap(); - assert_eq!(resp.get_other_error(), OUTDATED_ERROR_MSG); + assert!( + cop.handle_unary_request(outdated_req_ctx, handler_builder) + .unwrap() + .wait() + .is_err() + ); } #[test] @@ -777,25 +753,28 @@ mod tests { let handler_builder = box |_, _: &_| Ok(UnaryFixture::new_with_duration(Ok(response), 1000).into_boxed()); - let future = cop.handle_unary_request(ReqContext::default_for_test(), handler_builder); - let tx = tx.clone(); - thread::spawn(move || tx.send(future.wait().unwrap())); + let result_of_future = + cop.handle_unary_request(ReqContext::default_for_test(), handler_builder); + match result_of_future { + Err(full_error) => { + tx.send(Err(full_error)).unwrap(); + } + Ok(future) => { + let tx = tx.clone(); + thread::spawn(move || { + tx.send(future.wait()).unwrap(); + }); + } + } thread::sleep(Duration::from_millis(100)); } // verify for _ in 2..5 { - let resp: coppb::Response = rx.recv().unwrap(); - assert_eq!(resp.get_data().len(), 0); - assert!(resp.has_region_error()); - assert!(resp.get_region_error().has_server_is_busy()); - assert_eq!( - resp.get_region_error().get_server_is_busy().get_reason(), - BUSY_ERROR_MSG - ); + assert!(rx.recv().unwrap().is_err()); } for i in 0..2 { - let resp = rx.recv().unwrap(); + let resp = rx.recv().unwrap().unwrap(); assert_eq!(resp.get_data(), [1, 2, i]); assert!(!resp.has_region_error()); } @@ -814,6 +793,7 @@ mod tests { box |_, _: &_| Ok(UnaryFixture::new(Err(Error::Other(box_err!("foo")))).into_boxed()); let resp = cop .handle_unary_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .wait() .unwrap(); assert_eq!(resp.get_data().len(), 0); @@ -836,6 +816,7 @@ mod tests { }; let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -855,6 +836,7 @@ mod tests { let handler_builder = box |_, _: &_| Ok(StreamFixture::new(responses).into_boxed()); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -878,6 +860,7 @@ mod tests { let handler_builder = box |_, _: &_| Ok(StreamFixture::new(vec![]).into_boxed()); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -913,6 +896,7 @@ mod tests { let handler_builder = box move |_, _: &_| Ok(handler.into_boxed()); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -938,6 +922,7 @@ mod tests { let handler_builder = box move |_, _: &_| Ok(handler.into_boxed()); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -963,6 +948,7 @@ mod tests { let handler_builder = box move |_, _: &_| Ok(handler.into_boxed()); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .collect() .wait() .unwrap(); @@ -1000,6 +986,7 @@ mod tests { let handler_builder = box move |_, _: &_| Ok(handler.into_boxed()); let resp_vec = cop .handle_stream_request(ReqContext::default_for_test(), handler_builder) + .unwrap() .take(7) .collect() .wait() @@ -1056,8 +1043,9 @@ mod tests { PAYLOAD_SMALL as u64, ).into_boxed()) }; - let resp_future_1 = - cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_1 = cop + .handle_unary_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap()); // Sleep a while to make sure that thread is spawn and snapshot is taken. @@ -1070,8 +1058,9 @@ mod tests { .into_boxed(), ) }; - let resp_future_2 = - cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_2 = cop + .handle_unary_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || sender.send(vec![resp_future_2.wait().unwrap()]).unwrap()); thread::sleep(Duration::from_millis(SNAPSHOT_DURATION_MS as u64)); @@ -1128,8 +1117,9 @@ mod tests { PAYLOAD_LARGE as u64, ).into_boxed()) }; - let resp_future_1 = - cop.handle_unary_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_1 = cop + .handle_unary_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || sender.send(vec![resp_future_1.wait().unwrap()]).unwrap()); // Sleep a while to make sure that thread is spawn and snapshot is taken. @@ -1150,8 +1140,9 @@ mod tests { ], ).into_boxed()) }; - let resp_future_3 = - cop.handle_stream_request(req_with_exec_detail.clone(), handler_builder); + let resp_future_3 = cop + .handle_stream_request(req_with_exec_detail.clone(), handler_builder) + .unwrap(); let sender = tx.clone(); thread::spawn(move || { sender diff --git a/src/coprocessor/mod.rs b/src/coprocessor/mod.rs index 86fc9deaec8..0c76b04d11f 100644 --- a/src/coprocessor/mod.rs +++ b/src/coprocessor/mod.rs @@ -85,6 +85,10 @@ impl Deadline { /// Returns error if the deadline is exceeded. pub fn check_if_exceeded(&self) -> Result<()> { + fail_point!("coprocessor_deadline_check_exceeded", |_| Err( + Error::Outdated(Duration::from_secs(60), self.tag) + )); + let now = Instant::now_coarse(); if self.deadline <= now { let elapsed = now.duration_since(self.start_time); diff --git a/src/coprocessor/util.rs b/src/coprocessor/util.rs index 4c27f26cec5..d51c83e02a0 100644 --- a/src/coprocessor/util.rs +++ b/src/coprocessor/util.rs @@ -16,26 +16,6 @@ use tipb::schema::ColumnInfo; use coprocessor::codec::datum::Datum; use coprocessor::codec::mysql; -use coprocessor::*; - -pub struct ErrorRequestHandler { - error: Option, -} - -impl ErrorRequestHandler { - pub fn new(error: Error) -> ErrorRequestHandler { - ErrorRequestHandler { error: Some(error) } - } -} - -impl RequestHandler for ErrorRequestHandler { - fn handle_request(&mut self) -> Result { - Err(self.error.take().unwrap()) - } - fn handle_streaming_request(&mut self) -> Result<(Option, bool)> { - Err(self.error.take().unwrap()) - } -} /// Convert the key to the smallest key which is larger than the key given. pub fn convert_to_prefix_next(key: &mut Vec) { diff --git a/src/server/readpool/mod.rs b/src/server/readpool/mod.rs index 896fe637a84..bc8eae39a63 100644 --- a/src/server/readpool/mod.rs +++ b/src/server/readpool/mod.rs @@ -122,6 +122,11 @@ impl ReadPool { F::Item: Send + 'static, F::Error: Send + 'static, { + fail_point!("read_pool_execute_full", |_| Err(Full { + current_tasks: 100, + max_tasks: 100, + })); + let pool = self.get_pool_by_priority(priority); let max_tasks = self.get_max_tasks_by_priority(priority); let current_tasks = pool.get_running_task_count(); diff --git a/src/storage/engine/rocksdb_engine.rs b/src/storage/engine/rocksdb_engine.rs new file mode 100644 index 00000000000..fc5c2b7614b --- /dev/null +++ b/src/storage/engine/rocksdb_engine.rs @@ -0,0 +1,445 @@ +// Copyright 2016 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::{self, Debug, Display, Formatter}; +use std::ops::Deref; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use kvproto::errorpb::Error as ErrorHeader; +use kvproto::kvrpcpb::Context; +use tempdir::TempDir; + +use crate::raftstore::store::engine::{IterOption, Peekable}; +use crate::storage::{CfName, Key, Value, CF_DEFAULT, CF_LOCK, CF_RAFT, CF_WRITE}; +use rocksdb::{DBIterator, SeekKey, Writable, WriteBatch, DB}; + +use crate::util::escape; +use crate::util::rocksdb_util::{self, CFOptions}; +use crate::util::worker::{Runnable, Scheduler, Worker}; + +use super::{ + Callback, CbContext, Cursor, Engine, Error, Iterator as EngineIterator, Modify, Result, + ScanMode, Snapshot, +}; + +pub use crate::raftstore::store::engine::SyncSnapshot as RocksSnapshot; + +const TEMP_DIR: &str = ""; + +enum Task { + Write(Vec, Callback<()>), + Snapshot(Callback), +} + +impl Display for Task { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + match *self { + Task::Write(..) => write!(f, "write task"), + Task::Snapshot(_) => write!(f, "snapshot task"), + } + } +} + +struct Runner(Arc); + +impl Runnable for Runner { + fn run(&mut self, t: Task) { + match t { + Task::Write(modifies, cb) => cb((CbContext::new(), write_modifies(&self.0, modifies))), + Task::Snapshot(cb) => cb(( + CbContext::new(), + Ok(RocksSnapshot::new(Arc::clone(&self.0))), + )), + } + } +} + +struct RocksEngineCore { + // only use for memory mode + temp_dir: Option, + worker: Worker, +} + +impl Drop for RocksEngineCore { + fn drop(&mut self) { + if let Some(h) = self.worker.stop() { + h.join().unwrap(); + } + } +} + +#[derive(Clone)] +pub struct RocksEngine { + core: Arc>, + sched: Scheduler, + db: Arc, +} + +impl RocksEngine { + pub fn new( + path: &str, + cfs: &[CfName], + cfs_opts: Option>>, + ) -> Result { + info!("RocksEngine: creating for path"; "path" => path); + let (path, temp_dir) = match path { + TEMP_DIR => { + let td = TempDir::new("temp-rocksdb").unwrap(); + (td.path().to_str().unwrap().to_owned(), Some(td)) + } + _ => (path.to_owned(), None), + }; + let mut worker = Worker::new("engine-rocksdb"); + let db = Arc::new(rocksdb_util::new_engine(&path, None, cfs, cfs_opts)?); + box_try!(worker.start(Runner(Arc::clone(&db)))); + Ok(RocksEngine { + sched: worker.scheduler(), + core: Arc::new(Mutex::new(RocksEngineCore { temp_dir, worker })), + db, + }) + } + + pub fn get_rocksdb(&self) -> Arc { + Arc::clone(&self.db) + } + + pub fn stop(&self) { + let mut core = self.core.lock().unwrap(); + if let Some(h) = core.worker.stop() { + h.join().unwrap(); + } + } +} + +impl Display for RocksEngine { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!(f, "RocksDB") + } +} + +impl Debug for RocksEngine { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + write!( + f, + "RocksDB [is_temp: {}]", + self.core.lock().unwrap().temp_dir.is_some() + ) + } +} + +/// A builder to build a temporary `RocksEngine`. +/// +/// Only used for test purpose. +#[must_use] +pub struct TestEngineBuilder { + path: Option, + cfs: Option>, +} + +impl TestEngineBuilder { + pub fn new() -> Self { + Self { + path: None, + cfs: None, + } + } + + /// Customize the data directory of the temporary engine. + /// + /// By default, TEMP_DIR will be used. + pub fn path(mut self, path: impl AsRef) -> Self { + self.path = Some(path.as_ref().to_path_buf()); + self + } + + /// Customize the CFs that engine will have. + /// + /// By default, engine will have all CFs. + pub fn cfs(mut self, cfs: impl AsRef<[CfName]>) -> Self { + self.cfs = Some(cfs.as_ref().to_vec()); + self + } + + /// Build a `RocksEngine`. + pub fn build(self) -> Result { + let path = match self.path { + None => TEMP_DIR.to_owned(), + Some(p) => p.to_str().unwrap().to_owned(), + }; + let cfs = self.cfs.unwrap_or_else(|| crate::storage::ALL_CFS.to_vec()); + let cfg_rocksdb = crate::config::DbConfig::default(); + let cfs_opts = cfs + .iter() + .map(|cf| match *cf { + CF_DEFAULT => CFOptions::new(CF_DEFAULT, cfg_rocksdb.defaultcf.build_opt()), + CF_LOCK => CFOptions::new(CF_LOCK, cfg_rocksdb.lockcf.build_opt()), + CF_WRITE => CFOptions::new(CF_WRITE, cfg_rocksdb.writecf.build_opt()), + CF_RAFT => CFOptions::new(CF_RAFT, cfg_rocksdb.raftcf.build_opt()), + _ => CFOptions::new(*cf, rocksdb::ColumnFamilyOptions::new()), + }) + .collect(); + RocksEngine::new(&path, &cfs, Some(cfs_opts)) + } +} + +fn write_modifies(db: &DB, modifies: Vec) -> Result<()> { + let wb = WriteBatch::new(); + for rev in modifies { + let res = match rev { + Modify::Delete(cf, k) => { + if cf == CF_DEFAULT { + trace!("RocksEngine: delete"; "key" => %k); + wb.delete(k.as_encoded()) + } else { + trace!("RocksEngine: delete_cf"; "cf" => cf, "key" => %k); + let handle = rocksdb_util::get_cf_handle(db, cf)?; + wb.delete_cf(handle, k.as_encoded()) + } + } + Modify::Put(cf, k, v) => { + if cf == CF_DEFAULT { + trace!("RocksEngine: put"; "key" => %k, "value" => escape(&v)); + wb.put(k.as_encoded(), &v) + } else { + trace!("RocksEngine: put_cf"; "cf" => cf, "key" => %k, "value" => escape(&v)); + let handle = rocksdb_util::get_cf_handle(db, cf)?; + wb.put_cf(handle, k.as_encoded(), &v) + } + } + Modify::DeleteRange(cf, start_key, end_key) => { + trace!( + "RocksEngine: delete_range_cf"; + "cf" => cf, + "start_key" => %start_key, + "end_key" => %end_key + ); + let handle = rocksdb_util::get_cf_handle(db, cf)?; + wb.delete_range_cf(handle, start_key.as_encoded(), end_key.as_encoded()) + } + }; + if let Err(msg) = res { + return Err(Error::RocksDb(msg)); + } + } + if let Err(msg) = db.write(wb) { + return Err(Error::RocksDb(msg)); + } + Ok(()) +} + +impl Engine for RocksEngine { + type Snap = RocksSnapshot; + + fn async_write(&self, _: &Context, modifies: Vec, cb: Callback<()>) -> Result<()> { + if modifies.is_empty() { + return Err(Error::EmptyRequest); + } + box_try!(self.sched.schedule(Task::Write(modifies, cb))); + Ok(()) + } + + fn async_snapshot(&self, _: &Context, cb: Callback) -> Result<()> { + fail_point!("rockskv_async_snapshot", |_| Err(box_err!( + "snapshot failed" + ))); + fail_point!("rockskv_async_snapshot_not_leader", |_| { + let mut header = ErrorHeader::new(); + header.mut_not_leader().set_region_id(100); + Err(Error::Request(header)) + }); + box_try!(self.sched.schedule(Task::Snapshot(cb))); + Ok(()) + } +} + +impl Snapshot for RocksSnapshot { + type Iter = DBIterator>; + + fn get(&self, key: &Key) -> Result> { + trace!("RocksSnapshot: get"; "key" => %key); + let v = box_try!(self.get_value(key.as_encoded())); + Ok(v.map(|v| v.to_vec())) + } + + fn get_cf(&self, cf: CfName, key: &Key) -> Result> { + trace!("RocksSnapshot: get_cf"; "cf" => cf, "key" => %key); + let v = box_try!(self.get_value_cf(cf, key.as_encoded())); + Ok(v.map(|v| v.to_vec())) + } + + fn iter(&self, iter_opt: IterOption, mode: ScanMode) -> Result> { + trace!("RocksSnapshot: create iterator"); + let iter = self.db_iterator(iter_opt); + Ok(Cursor::new(iter, mode)) + } + + fn iter_cf( + &self, + cf: CfName, + iter_opt: IterOption, + mode: ScanMode, + ) -> Result> { + trace!("RocksSnapshot: create cf iterator"); + let iter = self.db_iterator_cf(cf, iter_opt)?; + Ok(Cursor::new(iter, mode)) + } +} + +impl + Send> EngineIterator for DBIterator { + fn next(&mut self) -> bool { + DBIterator::next(self) + } + + fn prev(&mut self) -> bool { + DBIterator::prev(self) + } + + fn seek(&mut self, key: &Key) -> Result { + Ok(DBIterator::seek(self, key.as_encoded().as_slice().into())) + } + + fn seek_for_prev(&mut self, key: &Key) -> Result { + Ok(DBIterator::seek_for_prev( + self, + key.as_encoded().as_slice().into(), + )) + } + + fn seek_to_first(&mut self) -> bool { + DBIterator::seek(self, SeekKey::Start) + } + + fn seek_to_last(&mut self) -> bool { + DBIterator::seek(self, SeekKey::End) + } + + fn valid(&self) -> bool { + DBIterator::valid(self) + } + + fn key(&self) -> &[u8] { + DBIterator::key(self) + } + + fn value(&self) -> &[u8] { + DBIterator::value(self) + } +} + +#[cfg(test)] +mod tests { + pub use super::super::perf_context::{PerfStatisticsDelta, PerfStatisticsInstant}; + use super::super::tests::*; + use super::super::CFStatistics; + use super::*; + use tempdir::TempDir; + + #[test] + fn test_rocksdb() { + let engine = TestEngineBuilder::new() + .cfs(TEST_ENGINE_CFS) + .build() + .unwrap(); + test_base_curd_options(&engine) + } + + #[test] + fn test_rocksdb_linear() { + let engine = TestEngineBuilder::new() + .cfs(TEST_ENGINE_CFS) + .build() + .unwrap(); + test_linear(&engine); + } + + #[test] + fn test_rocksdb_statistic() { + let engine = TestEngineBuilder::new() + .cfs(TEST_ENGINE_CFS) + .build() + .unwrap(); + test_cfs_statistics(&engine); + } + + #[test] + fn rocksdb_reopen() { + let dir = TempDir::new("rocksdb_test").unwrap(); + { + let engine = TestEngineBuilder::new() + .path(dir.path()) + .cfs(TEST_ENGINE_CFS) + .build() + .unwrap(); + must_put_cf(&engine, "cf", b"k", b"v1"); + } + { + let engine = TestEngineBuilder::new() + .path(dir.path()) + .cfs(TEST_ENGINE_CFS) + .build() + .unwrap(); + assert_has_cf(&engine, "cf", b"k", b"v1"); + } + } + + #[test] + fn test_rocksdb_perf_statistics() { + let engine = TestEngineBuilder::new() + .cfs(TEST_ENGINE_CFS) + .build() + .unwrap(); + test_perf_statistics(&engine); + } + + pub fn test_perf_statistics(engine: &E) { + must_put(engine, b"foo", b"bar1"); + must_put(engine, b"foo2", b"bar2"); + must_put(engine, b"foo3", b"bar3"); // deleted + must_put(engine, b"foo4", b"bar4"); + must_put(engine, b"foo42", b"bar42"); // deleted + must_put(engine, b"foo5", b"bar5"); // deleted + must_put(engine, b"foo6", b"bar6"); + must_delete(engine, b"foo3"); + must_delete(engine, b"foo42"); + must_delete(engine, b"foo5"); + + let snapshot = engine.snapshot(&Context::new()).unwrap(); + let mut iter = snapshot + .iter(IterOption::default(), ScanMode::Forward) + .unwrap(); + + let mut statistics = CFStatistics::default(); + + let perf_statistics = PerfStatisticsInstant::new(); + iter.seek(&Key::from_raw(b"foo30"), &mut statistics) + .unwrap(); + assert_eq!(perf_statistics.delta().internal_delete_skipped_count, 0); + + let perf_statistics = PerfStatisticsInstant::new(); + iter.near_seek(&Key::from_raw(b"foo55"), &mut statistics) + .unwrap(); + assert_eq!(perf_statistics.delta().internal_delete_skipped_count, 2); + + let perf_statistics = PerfStatisticsInstant::new(); + iter.prev(&mut statistics); + assert_eq!(perf_statistics.delta().internal_delete_skipped_count, 2); + + iter.prev(&mut statistics); + assert_eq!(perf_statistics.delta().internal_delete_skipped_count, 3); + + iter.prev(&mut statistics); + assert_eq!(perf_statistics.delta().internal_delete_skipped_count, 3); + } + +} diff --git a/tests/failpoints_cases/mod.rs b/tests/failpoints_cases/mod.rs index 2d6b8dd0d69..d4248a59adb 100644 --- a/tests/failpoints_cases/mod.rs +++ b/tests/failpoints_cases/mod.rs @@ -12,6 +12,7 @@ // limitations under the License. mod test_conf_change; +mod test_coprocessor; mod test_merge; mod test_pending_peers; mod test_snap; diff --git a/tests/failpoints_cases/test_coprocessor.rs b/tests/failpoints_cases/test_coprocessor.rs new file mode 100644 index 00000000000..35065fc3a99 --- /dev/null +++ b/tests/failpoints_cases/test_coprocessor.rs @@ -0,0 +1,118 @@ +// Copyright 2019 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +use test_coprocessor::*; + +#[test] +fn test_outdated() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("coprocessor_deadline_check_exceeded", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("request outdated")); +} + +#[test] +fn test_outdated_2() { + // It should not even take any snapshots when request is outdated from the beginning. + + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot", "panic").unwrap(); + fail::cfg("coprocessor_deadline_check_exceeded", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("request outdated")); +} + +#[test] +fn test_parse_request_failed() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("coprocessor_parse_request", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("unsupported tp")); +} + +#[test] +fn test_parse_request_failed_2() { + // It should not even take any snapshots when parse failed. + + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot", "panic").unwrap(); + fail::cfg("coprocessor_parse_request", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("unsupported tp")); +} + +#[test] +fn test_readpool_full() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("read_pool_execute_full", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_region_error().has_server_is_busy()); +} + +#[test] +fn test_snapshot_failed() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_other_error().contains("snapshot failed")); +} + +#[test] +fn test_snapshot_failed_2() { + let _guard = crate::setup(); + + let product = ProductTable::new(); + let (_, endpoint) = init_with_data(&product, &[]); + let req = DAGSelect::from(&product).build(); + + fail::cfg("rockskv_async_snapshot_not_leader", "return()").unwrap(); + let resp = handle_request(&endpoint, req); + + assert!(resp.get_region_error().has_not_leader()); +}