Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP threaded poll fix #98

Closed
wants to merge 4 commits into from
Closed

WIP threaded poll fix #98

wants to merge 4 commits into from

Conversation

clux
Copy link
Member

@clux clux commented Dec 3, 2019

Trying to fix #97. ⚠️ : Not compiling yet.

Tried propagating BoxStream and the corresponding Send requirement on K it forced (this is reasonable because a kube object is always just raw data, no refcounts or other magic - but we might have to propagate it to k8s-openapi).

I think it got almost all the way there, but now it complains at informer in poll(). It wants K to implement Sync as well, and that certainly is unreasonable. But it seems to come from Send being required on &K not being satisfied, looks like in a closure?


error[E0277]: `K` cannot be shared between threads safely
   --> src/api/informer.rs:204:20
    |
204 |                 }).boxed())
    |                    ^^^^^ `K` cannot be shared between threads safely
    |
    = help: the trait `std::marker::Sync` is not implemented for `K`
    = help: consider adding a `where K: std::marker::Sync` bound
    = note: required because of the requirements on the impl of `std::marker::Send` for `&K`
    = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7> {std::result::Result<api::resource::WatchEvent<K>, Error>, K, K, K, &'r K, K, api::metadata::ObjectMeta, &'s api::metadata::ObjectMeta, &'t0 std::option::Option<std::string::String>, std::option::Option<std::string::String>, std::option::Option<std::string::String>, std::string::String, &'t1 std::string::String, std::string::String, std::string::String, &'t2 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t3, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t4, std::string::String>, (), ErrorResponse, bool, bool, bool, &'t5 futures_util::lock::mutex::Mutex<bool>, std::sync::Arc<futures_util::lock::mutex::Mutex<bool>>, futures_util::lock::mutex::MutexLockFuture<'t6, bool>, futures_util::lock::mutex::MutexLockFuture<'t7, bool>, ()}`
    = note: required because it appears within the type `[static generator@src/api/informer.rs:179:32: 203:22 event:std::result::Result<api::resource::WatchEvent<K>, Error>, version:std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, needs_resync:std::sync::Arc<futures_util::lock::mutex::Mutex<bool>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7> {std::result::Result<api::resource::WatchEvent<K>, Error>, K, K, K, &'r K, K, api::metadata::ObjectMeta, &'s api::metadata::ObjectMeta, &'t0 std::option::Option<std::string::String>, std::option::Option<std::string::String>, std::option::Option<std::string::String>, std::string::String, &'t1 std::string::String, std::string::String, std::string::String, &'t2 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t3, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t4, std::string::String>, (), ErrorResponse, bool, bool, bool, &'t5 futures_util::lock::mutex::Mutex<bool>, std::sync::Arc<futures_util::lock::mutex::Mutex<bool>>, futures_util::lock::mutex::MutexLockFuture<'t6, bool>, futures_util::lock::mutex::MutexLockFuture<'t7, bool>, ()}]`
    = note: required because it appears within the type `std::future::GenFuture<[static generator@src/api/informer.rs:179:32: 203:22 event:std::result::Result<api::resource::WatchEvent<K>, Error>, version:std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, needs_resync:std::sync::Arc<futures_util::lock::mutex::Mutex<bool>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7> {std::result::Result<api::resource::WatchEvent<K>, Error>, K, K, K, &'r K, K, api::metadata::ObjectMeta, &'s api::metadata::ObjectMeta, &'t0 std::option::Option<std::string::String>, std::option::Option<std::string::String>, std::option::Option<std::string::String>, std::string::String, &'t1 std::string::String, std::string::String, std::string::String, &'t2 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t3, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t4, std::string::String>, (), ErrorResponse, bool, bool, bool, &'t5 futures_util::lock::mutex::Mutex<bool>, std::sync::Arc<futures_util::lock::mutex::Mutex<bool>>, futures_util::lock::mutex::MutexLockFuture<'t6, bool>, futures_util::lock::mutex::MutexLockFuture<'t7, bool>, ()}]>`
    = note: required because it appears within the type `impl core::future::future::Future`
    = note: required because it appears within the type `std::option::Option<impl core::future::future::Future>`
    = note: required because it appears within the type `futures_util::stream::stream::then::Then<std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = std::result::Result<api::resource::WatchEvent<K>, Error>> + std::marker::Send>>, impl core::future::future::Future, [closure@src/api/informer.rs:175:32: 204:18 needs_resync:std::sync::Arc<futures_util::lock::mutex::Mutex<bool>>, version:std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>]>`

error: aborting due to previous error

@clux
Copy link
Member Author

clux commented Dec 3, 2019

Ok. Stuck again.

   Compiling kube v0.22.1 (/home/clux/babylon/kube-rs)
warning: unused import: `Stream`
  --> examples/crd_reflector.rs:11:15
   |
11 | use futures::{Stream, StreamExt};
   |               ^^^^^^
   |
   = note: `#[warn(unused_imports)]` on by default

error[E0277]: `dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>>` cannot be sent between threads safely
   --> examples/crd_reflector.rs:39:5
    |
39  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ `dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>>` cannot be sent between threads safely
    | 
   ::: /home/clux/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.0-alpha.6/src/executor.rs:100:40
    |
100 |     F: Future<Output = ()> + 'static + Send,
    |                                        ---- required by this bound in `tokio::executor::spawn`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>>`
    = note: required because of the requirements on the impl of `std::marker::Send` for `std::ptr::Unique<dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>>>`
    = note: required because it appears within the type `std::boxed::Box<dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>>>`
    = note: required because it appears within the type `std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>>>>`
    = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15, 't16, 't17, 't18, 't19, 't20, 't21, 't22, 't23, 't24, 't25, 't26, 't27, 't28, 't29, 't30, 't31> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'s kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::raw::RawApi, &'t0 kube::api::raw::RawApi, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t1 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t2 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t3, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t4, std::string::String>, (), std::string::String, http::request::Request<std::vec::Vec<u8>>, fn(std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error>) -> std::result::Result<<std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::Ok, <std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::Error> {<std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::into_result}, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t6 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t7 kube::client::APIClient, kube::client::APIClient, http::request::Request<std::vec::Vec<u8>>, impl core::future::future::Future, impl core::future::future::Future, (), std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't14)>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t15 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t16 futures_util::lock::mutex::Mutex<std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>>, futures_util::lock::mutex::MutexLockFuture<'t17, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, futures_util::lock::mutex::MutexLockFuture<'t18, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, (), futures_util::lock::mutex::MutexGuard<'t19, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t20 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t21 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t22, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t23, std::string::String>, (), futures_util::lock::mutex::MutexGuard<'t24, std::string::String>, &'t25 mut std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't26)>>, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't27)>>, futures_util::stream::stream::next::Next<'t28, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't29)>>>, futures_util::stream::stream::next::Next<'t30, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't31)>>>, ()}`
    = note: required because it appears within the type `[static generator@DefId(18:75 ~ kube[9a0b]::api[0]::reflector[0]::{{impl}}[1]::single_watch[0]::{{closure}}[0]) 0:&kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15, 't16, 't17, 't18, 't19, 't20, 't21, 't22, 't23, 't24, 't25, 't26, 't27, 't28, 't29, 't30, 't31> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'s kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::raw::RawApi, &'t0 kube::api::raw::RawApi, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t1 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t2 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t3, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t4, std::string::String>, (), std::string::String, http::request::Request<std::vec::Vec<u8>>, fn(std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error>) -> std::result::Result<<std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::Ok, <std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::Error> {<std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::into_result}, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t6 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t7 kube::client::APIClient, kube::client::APIClient, http::request::Request<std::vec::Vec<u8>>, impl core::future::future::Future, impl core::future::future::Future, (), std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't14)>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t15 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t16 futures_util::lock::mutex::Mutex<std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>>, futures_util::lock::mutex::MutexLockFuture<'t17, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, futures_util::lock::mutex::MutexLockFuture<'t18, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, (), futures_util::lock::mutex::MutexGuard<'t19, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t20 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t21 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t22, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t23, std::string::String>, (), futures_util::lock::mutex::MutexGuard<'t24, std::string::String>, &'t25 mut std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't26)>>, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't27)>>, futures_util::stream::stream::next::Next<'t28, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't29)>>>, futures_util::stream::stream::next::Next<'t30, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't31)>>>, ()}]`
    = note: required because it appears within the type `std::future::GenFuture<[static generator@DefId(18:75 ~ kube[9a0b]::api[0]::reflector[0]::{{impl}}[1]::single_watch[0]::{{closure}}[0]) 0:&kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6, 't7, 't8, 't9, 't10, 't11, 't12, 't13, 't14, 't15, 't16, 't17, 't18, 't19, 't20, 't21, 't22, 't23, 't24, 't25, 't26, 't27, 't28, 't29, 't30, 't31> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'s kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::raw::RawApi, &'t0 kube::api::raw::RawApi, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t1 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t2 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t3, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t4, std::string::String>, (), std::string::String, http::request::Request<std::vec::Vec<u8>>, fn(std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error>) -> std::result::Result<<std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::Ok, <std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::Error> {<std::result::Result<std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + std::marker::Send + 't5)>>, kube::Error> as std::ops::Try>::into_result}, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t6 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t7 kube::client::APIClient, kube::client::APIClient, http::request::Request<std::vec::Vec<u8>>, impl core::future::future::Future, impl core::future::future::Future, (), std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't14)>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t15 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t16 futures_util::lock::mutex::Mutex<std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>>, futures_util::lock::mutex::MutexLockFuture<'t17, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, futures_util::lock::mutex::MutexLockFuture<'t18, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, (), futures_util::lock::mutex::MutexGuard<'t19, std::collections::BTreeMap<kube::api::reflector::ObjectId, kube::api::resource::Object<FooSpec, kube::api::Void>>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t20 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t21 futures_util::lock::mutex::Mutex<std::string::String>, std::sync::Arc<futures_util::lock::mutex::Mutex<std::string::String>>, futures_util::lock::mutex::MutexLockFuture<'t22, std::string::String>, futures_util::lock::mutex::MutexLockFuture<'t23, std::string::String>, (), futures_util::lock::mutex::MutexGuard<'t24, std::string::String>, &'t25 mut std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't26)>>, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't27)>>, futures_util::stream::stream::next::Next<'t28, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't29)>>>, futures_util::stream::stream::next::Next<'t30, std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>> + 't31)>>>, ()}]>`
    = note: required because it appears within the type `impl core::future::future::Future`
    = note: required because it appears within the type `impl core::future::future::Future`
    = note: required because it appears within the type `for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'s kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t0 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, (), std::result::Result<(), kube::Error>, kube::Error, std::time::Duration, fn(std::time::Duration) -> futures_timer::delay::Delay {futures_timer::delay::Delay::new}, std::time::Duration, futures_timer::delay::Delay, futures_timer::delay::Delay, (), fn(std::result::Result<(), kube::Error>) -> std::result::Result<<std::result::Result<(), kube::Error> as std::ops::Try>::Ok, <std::result::Result<(), kube::Error> as std::ops::Try>::Error> {<std::result::Result<(), kube::Error> as std::ops::Try>::into_result}, &'t3 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t4 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, ()}`
    = note: required because it appears within the type `[static generator@DefId(18:57 ~ kube[9a0b]::api[0]::reflector[0]::{{impl}}[1]::poll[0]::{{closure}}[0]) 0:&kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'s kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t0 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, (), std::result::Result<(), kube::Error>, kube::Error, std::time::Duration, fn(std::time::Duration) -> futures_timer::delay::Delay {futures_timer::delay::Delay::new}, std::time::Duration, futures_timer::delay::Delay, futures_timer::delay::Delay, (), fn(std::result::Result<(), kube::Error>) -> std::result::Result<<std::result::Result<(), kube::Error> as std::ops::Try>::Ok, <std::result::Result<(), kube::Error> as std::ops::Try>::Error> {<std::result::Result<(), kube::Error> as std::ops::Try>::into_result}, &'t3 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t4 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, ()}]`
    = note: required because it appears within the type `std::future::GenFuture<[static generator@DefId(18:57 ~ kube[9a0b]::api[0]::reflector[0]::{{impl}}[1]::poll[0]::{{closure}}[0]) 0:&kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>> for<'r, 's, 't0, 't1, 't2, 't3, 't4, 't5, 't6> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'s kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t0 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, (), std::result::Result<(), kube::Error>, kube::Error, std::time::Duration, fn(std::time::Duration) -> futures_timer::delay::Delay {futures_timer::delay::Delay::new}, std::time::Duration, futures_timer::delay::Delay, futures_timer::delay::Delay, (), fn(std::result::Result<(), kube::Error>) -> std::result::Result<<std::result::Result<(), kube::Error> as std::ops::Try>::Ok, <std::result::Result<(), kube::Error> as std::ops::Try>::Error> {<std::result::Result<(), kube::Error> as std::ops::Try>::into_result}, &'t3 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, &'t4 kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, ()}]>`
    = note: required because it appears within the type `impl core::future::future::Future`
    = note: required because it appears within the type `impl core::future::future::Future`
    = note: required because it appears within the type `for<'r, 's, 't0> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, ()}`
    = note: required because it appears within the type `[static generator@examples/crd_reflector.rs:39:24: 44:6 rf:&kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>> for<'r, 's, 't0> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, ()}]`
    = note: required because it appears within the type `std::future::GenFuture<[static generator@examples/crd_reflector.rs:39:24: 44:6 rf:&kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>> for<'r, 's, 't0> {&'r kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::api::reflector::Reflector<kube::api::resource::Object<FooSpec, kube::api::Void>>, impl core::future::future::Future, impl core::future::future::Future, ()}]>`
    = note: required because it appears within the type `impl core::future::future::Future`

error: aborting due to previous error

For more information about this error, try `rustc --explain E0277`.
error: could not compile `kube`.

To learn more, run the command again with --verbose.

the key line:

39  |     tokio::spawn(async {
    |     ^^^^^^^^^^^^ `dyn futures_core::stream::Stream<Item = std::result::Result<kube::api::resource::WatchEvent<kube::api::resource::Object<FooSpec, kube::api::Void>>, kube::Error>>` cannot be sent between threads safely

is what i don't understand. I even tried explicit unsafe impl Send for WatchEvent<K> just to check, but even that does not seem to mark the dyn Stream<Item = Result<WatchEvent<K>>> as Send, is is not supposed to?

Is BoxStream the right thing to use here? The error looks suspiciously like rust-lang/rust#53259

Can't look more today. If anyone wants to jump in, please.

@clux
Copy link
Member Author

clux commented Dec 4, 2019

Glad I don't have to continue this rabbit hole 😅

@clux clux closed this Dec 4, 2019
@clux clux deleted the threaded-poll branch February 26, 2020 00:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Calling Reflector::poll in separate thread/task
1 participant