diff --git a/substrate-archive-backend/src/read_only_backend.rs b/substrate-archive-backend/src/read_only_backend.rs index 38841d15..063aa4fe 100644 --- a/substrate-archive-backend/src/read_only_backend.rs +++ b/substrate-archive-backend/src/read_only_backend.rs @@ -140,16 +140,13 @@ where if key.len() == 4 && fun(num) { let head: Option = readable_db .get(super::util::columns::HEADER, &value) - .map(|bytes| Decode::decode(&mut &bytes[..]).ok()) - .flatten(); + .and_then(|bytes| Decode::decode(&mut &bytes[..]).ok()); let body: Option> = readable_db .get(super::util::columns::BODY, &value) - .map(|bytes| Decode::decode(&mut &bytes[..]).ok()) - .flatten(); + .and_then(|bytes| Decode::decode(&mut &bytes[..]).ok()); let justif: Option = readable_db .get(super::util::columns::JUSTIFICATION, &value) - .map(|bytes| Decode::decode(&mut &bytes[..]).ok()) - .flatten(); + .and_then(|bytes| Decode::decode(&mut &bytes[..]).ok()); construct_block(head, body, justif) } else { None diff --git a/substrate-archive-backend/src/runtime_version_cache.rs b/substrate-archive-backend/src/runtime_version_cache.rs index 5cc8f256..60310c87 100644 --- a/substrate-archive-backend/src/runtime_version_cache.rs +++ b/substrate-archive-backend/src/runtime_version_cache.rs @@ -26,7 +26,7 @@ use arc_swap::ArcSwap; use codec::Decode; use hashbrown::HashMap; -use sc_executor::{WasmExecutionMethod, WasmExecutor}; +use sc_executor::WasmExecutor; use sp_core::traits::ReadRuntimeVersion; use sp_runtime::{ generic::SignedBlock, @@ -39,6 +39,7 @@ use sp_version::RuntimeVersion; use crate::{ database::ReadOnlyDb, error::{BackendError, Result}, + frontend::RuntimeConfig, read_only_backend::ReadOnlyBackend, }; @@ -50,12 +51,12 @@ pub struct RuntimeVersionCache { } impl RuntimeVersionCache { - pub fn new(backend: Arc>) -> Self { + pub fn new(backend: Arc>, config: RuntimeConfig) -> Self { // TODO: https://github.com/paritytech/substrate-archive/issues/247 let exec = WasmExecutor::::new( - WasmExecutionMethod::Interpreted, - Some(128), - 1, + config.exec_method.into(), + config.wasm_pages, + config.block_workers, None, 128, ); diff --git a/substrate-archive/sqlx-data.json b/substrate-archive/sqlx-data.json index 6dadbe24..536223db 100644 --- a/substrate-archive/sqlx-data.json +++ b/substrate-archive/sqlx-data.json @@ -1,239 +1,215 @@ { "db": "PostgreSQL", "05b056299b2e00d9d96eb6357329e8fdb77aede4485e045bb71d75c1e6feba1e": { - "query": "\n\t\t\tSELECT DISTINCT ON (spec) spec, block_num\n\t\t\tFROM blocks\n\t\t\tWHERE spec != 0\n\t\t\tORDER BY spec, block_num ASC\n\t\t", "describe": { "columns": [ { - "ordinal": 0, "name": "spec", + "ordinal": 0, "type_info": "Int4" }, { - "ordinal": 1, "name": "block_num", + "ordinal": 1, "type_info": "Int4" } ], - "parameters": { - "Left": [] - }, "nullable": [ false, false - ] - } + ], + "parameters": { + "Left": [] + } + }, + "query": "\n\t\t\tSELECT DISTINCT ON (spec) spec, block_num\n\t\t\tFROM blocks\n\t\t\tWHERE spec != 0\n\t\t\tORDER BY spec, block_num ASC\n\t\t" }, "2dd53db31924b333e8f3e096ae72a109336dc80bcb85dbb7c93dad3329069a8e": { - "query": "\n\t\tSELECT missing_num\n\t\tFROM (SELECT MAX(block_num) AS max_num FROM blocks) max,\n\t\t\tGENERATE_SERIES($1, max_num) AS missing_num\n\t\tWHERE\n\t\tNOT EXISTS (SELECT id FROM blocks WHERE block_num = missing_num)\n\t\tORDER BY missing_num ASC\n\t\tLIMIT $2", "describe": { "columns": [ { - "ordinal": 0, "name": "missing_num", + "ordinal": 0, "type_info": "Int4" } ], + "nullable": [ + null + ], "parameters": { "Left": [ "Int4", "Int8" ] - }, - "nullable": [ - null - ] - } + } + }, + "query": "\n\t\tSELECT missing_num\n\t\tFROM (SELECT MAX(block_num) AS max_num FROM blocks) max,\n\t\t\tGENERATE_SERIES($1, max_num) AS missing_num\n\t\tWHERE\n\t\tNOT EXISTS (SELECT id FROM blocks WHERE block_num = missing_num)\n\t\tORDER BY missing_num ASC\n\t\tLIMIT $2" }, "4d0f81228d72971606b7e21c677150b541d3f6575bcfe5b2bc7b078f407eab99": { - "query": "SELECT EXISTS(SELECT 1 FROM blocks WHERE hash = $1)", "describe": { "columns": [ { - "ordinal": 0, "name": "exists", + "ordinal": 0, "type_info": "Bool" } ], + "nullable": [ + null + ], "parameters": { "Left": [ "Bytea" ] - }, - "nullable": [ - null - ] - } + } + }, + "query": "SELECT EXISTS(SELECT 1 FROM blocks WHERE hash = $1)" }, "51240d7ed289ce201ceef2b58a98859bdac3c74c588c0cf8c03731be3fe04519": { - "query": "SELECT version FROM metadata", "describe": { "columns": [ { - "ordinal": 0, "name": "version", + "ordinal": 0, "type_info": "Int4" } ], - "parameters": { - "Left": [] - }, "nullable": [ false - ] - } + ], + "parameters": { + "Left": [] + } + }, + "query": "SELECT version FROM metadata" }, "664d3547283b0758cf5b608f969707abcba6b904b08cda98f62d69d31d045aea": { - "query": "SELECT EXISTS(SELECT version FROM metadata WHERE version = $1)", "describe": { "columns": [ { - "ordinal": 0, "name": "exists", + "ordinal": 0, "type_info": "Bool" } ], + "nullable": [ + null + ], "parameters": { "Left": [ "Int4" ] - }, - "nullable": [ - null - ] - } + } + }, + "query": "SELECT EXISTS(SELECT version FROM metadata WHERE version = $1)" }, "6c0b6b13ae86421b99380dbcf75a37a9edc8357c4c4b0b3db43d203837bf9612": { - "query": "SELECT block_num FROM blocks WHERE block_num = ANY ($1)", "describe": { "columns": [ { - "ordinal": 0, "name": "block_num", + "ordinal": 0, "type_info": "Int4" } ], + "nullable": [ + false + ], "parameters": { "Left": [ "Int4Array" ] - }, - "nullable": [ - false - ] - } + } + }, + "query": "SELECT block_num FROM blocks WHERE block_num = ANY ($1)" }, "9e6316290793ef9ca02c1a917d4bd3412497336b1928222381591f2a7a00e3cb": { - "query": "SELECT meta FROM metadata WHERE version = $1", "describe": { "columns": [ { - "ordinal": 0, "name": "meta", + "ordinal": 0, "type_info": "Bytea" } ], + "nullable": [ + false + ], "parameters": { "Left": [ "Int4" ] - }, - "nullable": [ - false - ] - } + } + }, + "query": "SELECT meta FROM metadata WHERE version = $1" }, - "b32766abc83ba8a342a39dca90f2fbf63317f80b46377695d2cf85325c08c907": { - "query": "\n SELECT block_num FROM blocks\n WHERE NOT EXISTS\n (SELECT block_num FROM storage WHERE storage.block_num = blocks.block_num)\n ORDER BY block_num;\n ", + "bc7850f77a9f06fd5ed526757ca7b4330359499b2cd0502cc7c0c58d18a0cf02": { "describe": { "columns": [ { + "name": "max", "ordinal": 0, - "name": "block_num", "type_info": "Int4" } ], - "parameters": { - "Left": [] - }, "nullable": [ - false - ] - } - }, - "bc7850f77a9f06fd5ed526757ca7b4330359499b2cd0502cc7c0c58d18a0cf02": { - "query": "SELECT MAX(block_num) FROM blocks", - "describe": { - "columns": [ - { - "ordinal": 0, - "name": "max", - "type_info": "Int4" - } + null ], "parameters": { "Left": [] - }, - "nullable": [ - null - ] - } + } + }, + "query": "SELECT MAX(block_num) FROM blocks" }, "cb4c39a4eaa048f4f1acf779ebcdac6a11e4f6f66fbfe5071dc4d8cbf0ef1bb0": { - "query": "\n SELECT id, parent_hash, hash, block_num, state_root, extrinsics_root, digest, ext, spec\n FROM blocks\n WHERE block_num = $1\n ", "describe": { "columns": [ { - "ordinal": 0, "name": "id", + "ordinal": 0, "type_info": "Int4" }, { - "ordinal": 1, "name": "parent_hash", + "ordinal": 1, "type_info": "Bytea" }, { - "ordinal": 2, "name": "hash", + "ordinal": 2, "type_info": "Bytea" }, { - "ordinal": 3, "name": "block_num", + "ordinal": 3, "type_info": "Int4" }, { - "ordinal": 4, "name": "state_root", + "ordinal": 4, "type_info": "Bytea" }, { - "ordinal": 5, "name": "extrinsics_root", + "ordinal": 5, "type_info": "Bytea" }, { - "ordinal": 6, "name": "digest", + "ordinal": 6, "type_info": "Bytea" }, { - "ordinal": 7, "name": "ext", + "ordinal": 7, "type_info": "Bytea" }, { - "ordinal": 8, "name": "spec", + "ordinal": 8, "type_info": "Int4" } ], - "parameters": { - "Left": [ - "Int4" - ] - }, "nullable": [ false, false, @@ -244,83 +220,107 @@ false, false, false - ] - } + ], + "parameters": { + "Left": [ + "Int4" + ] + } + }, + "query": "\n SELECT id, parent_hash, hash, block_num, state_root, extrinsics_root, digest, ext, spec\n FROM blocks\n WHERE block_num = $1\n " }, "da3d043ca1c56f587af1b165bb3573d76b1fe568a52168f863b6edb5d0813f19": { - "query": "\n\tSELECT version as present, past_version as past, meta as metadata, past_metadata FROM (\n\t\tSELECT\n\t\t\tversion, meta,\n\t\t\tLAG(version, 1) OVER (ORDER BY version) as past_version,\n\t\t\tLAG(meta, 1) OVER (ORDER BY version) as past_metadata\n\t\tFROM metadata\n\t) as z WHERE version = $1;\n\t", "describe": { "columns": [ { - "ordinal": 0, "name": "present", + "ordinal": 0, "type_info": "Int4" }, { - "ordinal": 1, "name": "past", + "ordinal": 1, "type_info": "Int4" }, { - "ordinal": 2, "name": "metadata", + "ordinal": 2, "type_info": "Bytea" }, { - "ordinal": 3, "name": "past_metadata", + "ordinal": 3, "type_info": "Bytea" } ], - "parameters": { - "Left": [ - "Int4" - ] - }, "nullable": [ false, null, false, null - ] - } + ], + "parameters": { + "Left": [ + "Int4" + ] + } + }, + "query": "\n\tSELECT version as present, past_version as past, meta as metadata, past_metadata FROM (\n\t\tSELECT\n\t\t\tversion, meta,\n\t\t\tLAG(version, 1) OVER (ORDER BY version) as past_version,\n\t\t\tLAG(meta, 1) OVER (ORDER BY version) as past_metadata\n\t\tFROM metadata\n\t) as z WHERE version = $1;\n\t" }, - "eafd9a045fa56835703150bd9ad24801207e0326287cee132f45a5273d3f3200": { - "query": "\n\t\tSELECT block_num, hash, ext, spec FROM blocks\n\t\tWHERE NOT EXISTS\n\t\t\t(SELECT number FROM extrinsics WHERE extrinsics.number = blocks.block_num)\n\t\tORDER BY block_num ASC\n\t\tLIMIT $1\n\t\t", + "dd0f2e1ecd3bea735a825fc36e65ea644ca6870a6b61a40eae92bc3724ac0313": { "describe": { "columns": [ { + "name": "block_num", "ordinal": 0, + "type_info": "Int4" + } + ], + "nullable": [ + false + ], + "parameters": { + "Left": [] + } + }, + "query": "\n SELECT block_num FROM blocks\n WHERE NOT EXISTS\n (SELECT block_num FROM storage WHERE storage.block_num = blocks.block_num)\n ORDER BY block_num ASC\n\t\tLIMIT 1000;\n " + }, + "eafd9a045fa56835703150bd9ad24801207e0326287cee132f45a5273d3f3200": { + "describe": { + "columns": [ + { "name": "block_num", + "ordinal": 0, "type_info": "Int4" }, { - "ordinal": 1, "name": "hash", + "ordinal": 1, "type_info": "Bytea" }, { - "ordinal": 2, "name": "ext", + "ordinal": 2, "type_info": "Bytea" }, { - "ordinal": 3, "name": "spec", + "ordinal": 3, "type_info": "Int4" } ], - "parameters": { - "Left": [ - "Int8" - ] - }, "nullable": [ false, false, false, false - ] - } + ], + "parameters": { + "Left": [ + "Int8" + ] + } + }, + "query": "\n\t\tSELECT block_num, hash, ext, spec FROM blocks\n\t\tWHERE NOT EXISTS\n\t\t\t(SELECT number FROM extrinsics WHERE extrinsics.number = blocks.block_num)\n\t\tORDER BY block_num ASC\n\t\tLIMIT $1\n\t\t" } } \ No newline at end of file diff --git a/substrate-archive/src/actors/workers/blocks.rs b/substrate-archive/src/actors/workers/blocks.rs index 1a5a1e5f..052ff66e 100644 --- a/substrate-archive/src/actors/workers/blocks.rs +++ b/substrate-archive/src/actors/workers/blocks.rs @@ -60,7 +60,7 @@ where { pub fn new(conf: &SystemConfig, db: DatabaseAct, meta: MetadataAct) -> Self { Self { - rt_cache: Arc::new(RuntimeVersionCache::new(conf.backend.clone())), + rt_cache: Arc::new(RuntimeVersionCache::new(conf.backend.clone(), conf.runtime.clone())), last_max: 0, backend: conf.backend().clone(), db, diff --git a/substrate-archive/src/actors/workers/extrinsics_decoder.rs b/substrate-archive/src/actors/workers/extrinsics_decoder.rs index 6f29259b..c071c088 100644 --- a/substrate-archive/src/actors/workers/extrinsics_decoder.rs +++ b/substrate-archive/src/actors/workers/extrinsics_decoder.rs @@ -131,11 +131,32 @@ impl ExtrinsicsDecoder { .find(|(_curr, next)| *next >= version) .map(|(c, _)| c) .ok_or(ArchiveError::PrevSpecNotFound(*version))?; - let ext = decoder.decode_extrinsics(*previous, ext.as_slice())?; - extrinsics.push(ExtrinsicsModel::new(hash, number, ext)?); + match decoder.decode_extrinsics(*previous, ext.as_slice()) { + Ok(exts) => { + if let Ok(exts_model) = ExtrinsicsModel::new(hash, number, exts) { + extrinsics.push(exts_model); + } + } + Err(err) => { + log::warn!( + "decode extrinsic upgrade failed, block: {}, spec: {}, reason: {:?}", + number, + spec, + err + ); + } + } } else { - let ext = decoder.decode_extrinsics(spec, ext.as_slice())?; - extrinsics.push(ExtrinsicsModel::new(hash, number, ext)?); + match decoder.decode_extrinsics(spec, ext.as_slice()) { + Ok(exts) => { + if let Ok(exts_model) = ExtrinsicsModel::new(hash, number, exts) { + extrinsics.push(exts_model); + } + } + Err(err) => { + log::warn!("decode extrinsic failed, block: {}, spec: {}, reason: {:?}", number, spec, err); + } + } } } Ok(extrinsics) diff --git a/substrate-archive/src/archive.rs b/substrate-archive/src/archive.rs index 979cba36..643d8f59 100644 --- a/substrate-archive/src/archive.rs +++ b/substrate-archive/src/archive.rs @@ -162,6 +162,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to storing metadata in a temporary directory. + #[must_use] pub fn chain_spec(mut self, spec: Box) -> Self { self.config.chain.spec = Some(spec); self @@ -171,6 +172,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to the environment variable CHAIN_DATA_DB. + #[must_use] pub fn chain_data_path>(mut self, path: S) -> Self { self.config.chain.data_path = Some(path.into()); self @@ -180,6 +182,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to 128MB. + #[must_use] pub fn cache_size(mut self, cache_size: usize) -> Self { self.config.chain.cache_size = cache_size; self @@ -191,6 +194,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to storing metadata in a temporary directory. + #[must_use] pub fn rocksdb_secondary_path>(mut self, path: S) -> Self { self.config.chain.rocksdb_secondary_path = Some(path.into()); self @@ -200,6 +204,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to value of the environment variable DATABASE_URL. + #[must_use] pub fn pg_url>(mut self, url: S) -> Self { self.config.database = Some(DatabaseConfig { url: url.into() }); self @@ -209,6 +214,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to the interpreted method. + #[must_use] pub fn execution_method(mut self, method: ExecutionMethod) -> Self { self.config.runtime.exec_method = method; self @@ -218,6 +224,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to the number of logical cpus in the system. + #[must_use] pub fn block_workers(mut self, workers: usize) -> Self { self.config.runtime.block_workers = workers; self @@ -227,6 +234,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to 64 * (number of logic cpu's). + #[must_use] pub fn wasm_pages(mut self, pages: u64) -> Self { self.config.runtime.wasm_pages = Some(pages); self @@ -236,6 +244,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to 20 seconds. + #[must_use] pub fn task_timeout(mut self, timeout: u64) -> Self { self.config.control.task_timeout = timeout; self @@ -245,6 +254,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to 100_000. + #[must_use] pub fn max_block_load(mut self, max_block_load: u32) -> Self { self.config.control.max_block_load = max_block_load; self @@ -254,6 +264,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to `DEBUG`. + #[must_use] pub fn log_std_level(mut self, level: log::LevelFilter) -> Self { self.config.log.std = level; self @@ -263,6 +274,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to `DEBUG`. + #[must_use] pub fn log_file_level(mut self, level: log::LevelFilter) -> Self { if let Some(file) = &mut self.config.log.file { file.level = level; @@ -276,6 +288,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to `//substrate-archive`. + #[must_use] pub fn log_file_dir>(mut self, dir: P) -> Self { if let Some(file) = &mut self.config.log.file { file.dir = Some(dir.into()); @@ -289,6 +302,7 @@ impl ArchiveBuilder { /// /// # Default /// Defaults to `substrate-archive.log`. + #[must_use] pub fn log_file_name>(mut self, name: S) -> Self { if let Some(file) = &mut self.config.log.file { file.name = name.into(); @@ -307,12 +321,14 @@ impl ArchiveBuilder { /// /// # Default /// Wasm Tracing is disabled by default. + #[must_use] pub fn wasm_tracing(mut self, wasm_tracing: Option) -> Self { self.config.wasm_tracing = wasm_tracing; self } /// Set the host functions to use for runtime being indexed + #[must_use] pub fn host_functions(mut self, host_functions: Vec<&'static dyn Function>) -> Self { self.host_functions = Some(host_functions); self diff --git a/substrate-archive/src/database/listener.rs b/substrate-archive/src/database/listener.rs index cc596014..193c086f 100644 --- a/substrate-archive/src/database/listener.rs +++ b/substrate-archive/src/database/listener.rs @@ -116,6 +116,7 @@ where Self { task: f, channels: Vec::new(), pg_url: url.to_string(), queue_handle } } + #[must_use] pub fn listen_on(mut self, channel: Channel) -> Self { self.channels.push(channel); self diff --git a/substrate-archive/src/database/queries.rs b/substrate-archive/src/database/queries.rs index b354851d..65ef383f 100644 --- a/substrate-archive/src/database/queries.rs +++ b/substrate-archive/src/database/queries.rs @@ -196,7 +196,8 @@ pub(crate) async fn missing_storage_blocks(conn: &mut sqlx::PgConnection) -> Res SELECT block_num FROM blocks WHERE NOT EXISTS (SELECT block_num FROM storage WHERE storage.block_num = blocks.block_num) - ORDER BY block_num; + ORDER BY block_num ASC + LIMIT 1000; "# ) .fetch_all(conn) @@ -343,13 +344,13 @@ mod tests { let database = Database::new(&test_common::DATABASE_URL.to_string()).await?; // insert some dummy data to satisfy the foreign key constraint sqlx::query("INSERT INTO metadata (version, meta) VALUES ($1, $2)") - .bind(26) + .bind(26_i32) .bind(mock_bytes.as_slice()) .execute(&mut database.conn().await?) .await?; database.insert(BatchBlock::new(blocks.clone())).await?; - let mock_storage = blocks[200..] + let mock_storage = blocks[..800] .iter() .map(|b| { StorageModel::new( @@ -374,8 +375,8 @@ mod tests { let items = task::block_on(missing_storage_blocks(&mut conn))?; assert_eq!(items.len(), 200); - assert_eq!(items.iter().min(), Some(&3_000_001u32)); - assert_eq!(items.iter().max(), Some(&3_000_200u32)); + assert_eq!(items.iter().min(), Some(&3_000_801u32)); + assert_eq!(items.iter().max(), Some(&3_001_000u32)); Ok(()) } diff --git a/substrate-archive/src/wasm_tracing.rs b/substrate-archive/src/wasm_tracing.rs index 8f10c98b..6a41a3da 100644 --- a/substrate-archive/src/wasm_tracing.rs +++ b/substrate-archive/src/wasm_tracing.rs @@ -334,7 +334,7 @@ mod tests { let (spans, events, _) = handler.scoped_trace(|| { executor .uncached_call( - RuntimeBlob::uncompress_if_needed(&wasm_binary_unwrap()[..]).unwrap(), + RuntimeBlob::uncompress_if_needed(wasm_binary_unwrap()).unwrap(), &mut ext, true, "test_trace_handler", diff --git a/work-queue/integration_tests/tests/codegen.rs b/work-queue/integration_tests/tests/codegen.rs index 23727b4f..6c355a87 100644 --- a/work-queue/integration_tests/tests/codegen.rs +++ b/work-queue/integration_tests/tests/codegen.rs @@ -100,8 +100,8 @@ fn test_imports_only_used_in_job_body_are_not_warned_as_unused() { let runner = TestGuard::dummy_runner(); smol::block_on(async { - let mut conn = runner.handle(); - uses_trait_import().enqueue(&mut conn).await.unwrap(); + let conn = runner.handle(); + uses_trait_import().enqueue(conn).await.unwrap(); runner.run_pending_tasks().unwrap(); runner.wait_for_all_tasks().unwrap(); @@ -123,8 +123,8 @@ fn proc_macro_accepts_arbitrary_where_clauses() { let runner = TestGuard::builder(()).register_job::>().build(); smol::block_on(async { - let mut conn = runner.handle(); - can_specify_where_clause("hello".to_string()).enqueue(&mut conn).await.unwrap(); + let conn = runner.handle(); + can_specify_where_clause("hello".to_string()).enqueue(conn).await.unwrap(); runner.run_pending_tasks().unwrap(); runner.wait_for_all_tasks().unwrap(); diff --git a/work-queue/sa-work-queue/src/lib.rs b/work-queue/sa-work-queue/src/lib.rs index f8b64e9f..9fd76665 100644 --- a/work-queue/sa-work-queue/src/lib.rs +++ b/work-queue/sa-work-queue/src/lib.rs @@ -52,13 +52,13 @@ //! The URL along with RabbitMQ prefetch and queue name may be configured. //! function //! ```no_run -//!Runner::builder((), "amqp://localhost:5672") +//!sa_work_queue::Runner::builder((), "amqp://localhost:5672") //! .queue_name("my-work-queue") //! .num_threads(2) //! .prefetch(200) -//! .timeout(Duration::from_secs(5)) +//! .timeout(std::time::Duration::from_secs(5)) //! .build() -//! .unwrap()?; +//! .unwrap(); //! ``` //! //!