diff --git a/shotover-proxy/src/transforms/mod.rs b/shotover-proxy/src/transforms/mod.rs index 52aff0fca..2555f1027 100644 --- a/shotover-proxy/src/transforms/mod.rs +++ b/shotover-proxy/src/transforms/mod.rs @@ -250,8 +250,30 @@ impl Transforms { } fn add_pushed_messages_tx(&mut self, pushed_messages_tx: mpsc::UnboundedSender) { - if let Transforms::CassandraSinkSingle(c) = self { - c.add_pushed_messages_tx(pushed_messages_tx) + match self { + Transforms::CassandraSinkSingle(c) => c.add_pushed_messages_tx(pushed_messages_tx), + Transforms::CassandraPeersRewrite(c) => c.add_pushed_messages_tx(pushed_messages_tx), + Transforms::RedisCache(r) => r.add_pushed_messages_tx(pushed_messages_tx), + Transforms::Tee(t) => t.add_pushed_messages_tx(pushed_messages_tx), + Transforms::RedisSinkSingle(r) => r.add_pushed_messages_tx(pushed_messages_tx), + Transforms::ConsistentScatter(c) => c.add_pushed_messages_tx(pushed_messages_tx), + Transforms::RedisTimestampTagger(r) => r.add_pushed_messages_tx(pushed_messages_tx), + Transforms::RedisClusterPortsRewrite(r) => r.add_pushed_messages_tx(pushed_messages_tx), + Transforms::DebugPrinter(p) => p.add_pushed_messages_tx(pushed_messages_tx), + Transforms::DebugForceParse(p) => p.add_pushed_messages_tx(pushed_messages_tx), + Transforms::Null(n) => n.add_pushed_messages_tx(pushed_messages_tx), + Transforms::RedisSinkCluster(r) => r.add_pushed_messages_tx(pushed_messages_tx), + Transforms::ParallelMap(s) => s.add_pushed_messages_tx(pushed_messages_tx), + Transforms::PoolConnections(s) => s.add_pushed_messages_tx(pushed_messages_tx), + Transforms::Coalesce(s) => s.add_pushed_messages_tx(pushed_messages_tx), + Transforms::QueryTypeFilter(s) => s.add_pushed_messages_tx(pushed_messages_tx), + Transforms::QueryCounter(s) => s.add_pushed_messages_tx(pushed_messages_tx), + #[cfg(test)] + Transforms::Loopback(l) => l.add_pushed_messages_tx(pushed_messages_tx), + Transforms::Protect(p) => p.add_pushed_messages_tx(pushed_messages_tx), + Transforms::DebugReturner(d) => d.add_pushed_messages_tx(pushed_messages_tx), + Transforms::DebugRandomDelay(d) => d.add_pushed_messages_tx(pushed_messages_tx), + Transforms::RequestThrottling(d) => d.add_pushed_messages_tx(pushed_messages_tx), } } }