Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Run rustfmt on the codebase and enforce rustfmt via github actions #132

Merged
merged 1 commit into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
run: |
curl -L "$LINK/$SCCACHE_VERSION/sccache-$SCCACHE_VERSION-x86_64-unknown-linux-musl.tar.gz" | tar xz
echo "$PWD/sccache-$SCCACHE_VERSION-x86_64-unknown-linux-musl/" >> $GITHUB_PATH
- name: Install libpacap
- name: Install libpcap
run: sudo apt-get install -y libpcap-dev
- name: Start sccache
env:
Expand All @@ -57,6 +57,8 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: mkdir -p test_tmp && aws s3 sync s3://shotover-test-captures ./test_tmp
- name: Check `cargo fmt` was run
run: cargo fmt --all -- --check
- name: Run tests
run: cargo test -- --include-ignored
timeout-minutes: 30
Expand Down
3 changes: 3 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "stable"
components = [ "rustfmt" ]
Empty file added rustfmt.toml
Empty file.
4 changes: 2 additions & 2 deletions shotover-proxy/benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use anyhow::Result;
use shotover_proxy::runner::{ConfigOpts, Runner};
use tokio::task::JoinHandle;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

pub fn run_shotover_with_topology(topology_path: &str) -> (Runtime, JoinHandle<Result<()>>) {
let opts = ConfigOpts {
topology_file: topology_path.into(),
config_file: "config/config.yaml".into(),
.. ConfigOpts::default()
..ConfigOpts::default()
};
let spawn = Runner::new(opts).unwrap().run_spawn();

Expand Down
14 changes: 3 additions & 11 deletions shotover-proxy/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::{fmt, error};
use crate::message::Messages;
use std::{error, fmt};
use thiserror::Error;


#[derive(Error, Clone, Debug)]
pub enum RequestError {
#[error("Invalid header (expected {expected:?}, got {found:?})")]
InvalidHeader {
expected: String,
found: String,
},
InvalidHeader { expected: String, found: String },
#[error("Malform Request: {0}")]
MalformedRequest(String),

Expand All @@ -18,11 +14,8 @@ pub enum RequestError {

#[error("Could not process chain: {0}")]
ChainProcessingError(String),


}


#[derive(Error, Debug)]
pub struct ConfigError {
pub message: String,
Expand Down Expand Up @@ -51,5 +44,4 @@ impl fmt::Display for ConfigError {
}
}


pub type ChainResponse = anyhow::Result<Messages>;
pub type ChainResponse = anyhow::Result<Messages>;
2 changes: 1 addition & 1 deletion shotover-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pub mod config;
pub mod error;
pub mod message;
pub mod protocols;
pub mod runner;
pub mod runtimes;
pub mod server;
pub mod sources;
pub mod transforms;
pub mod runner;
29 changes: 14 additions & 15 deletions shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,18 @@ pub enum ASTHolder {
impl ASTHolder {
pub fn get_command(&self) -> String {
match self {
ASTHolder::SQL(statement) => {
match statement {
Statement::Query(_) => "SELECT",
Statement::Insert { .. } => "INSERT",
Statement::Update { .. } => "UPDATE",
Statement::Delete { .. } => "DELETE",
Statement::CreateView { .. } => "CREATE VIEW",
Statement::CreateTable { .. } => "CREATE TABLE",
Statement::AlterTable { .. } => "ALTER TABLE",
Statement::Drop { .. } => "DROP",
_ => "UNKNOWN",
}
.to_string()
ASTHolder::SQL(statement) => match statement {
Statement::Query(_) => "SELECT",
Statement::Insert { .. } => "INSERT",
Statement::Update { .. } => "UPDATE",
Statement::Delete { .. } => "DELETE",
Statement::CreateView { .. } => "CREATE VIEW",
Statement::CreateTable { .. } => "CREATE TABLE",
Statement::AlterTable { .. } => "ALTER TABLE",
Statement::Drop { .. } => "DROP",
_ => "UNKNOWN",
}
.to_string(),
ASTHolder::Commands(commands) => {
if let Value::List(coms) = commands {
if let Some(Value::Bytes(b)) = coms.get(0) {
Expand Down Expand Up @@ -486,8 +484,9 @@ impl Value {
ColType::Double => Value::Float(decode_double(actual_bytes).unwrap()),
ColType::Float => Value::Float(decode_float(actual_bytes).unwrap() as f64),
ColType::Int => Value::Integer(decode_int(actual_bytes).unwrap() as i64),
ColType::Timestamp =>
Value::Timestamp(Utc.timestamp_nanos(decode_timestamp(actual_bytes).unwrap())),
ColType::Timestamp => {
Value::Timestamp(Utc.timestamp_nanos(decode_timestamp(actual_bytes).unwrap()))
}
ColType::Uuid => Value::Bytes(Bytes::copy_from_slice(actual_bytes)),
ColType::Varchar => Value::Strings(decode_varchar(actual_bytes).unwrap()),
ColType::Varint => Value::Integer(decode_varint(actual_bytes).unwrap()),
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/protocols/cassandra_protocol2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ mod cassandra_protocol_tests {
ast.to_string().replace(char::is_whitespace, ""),
);
}
details => panic!("Unexpected details: {:?}", details)
details => panic!("Unexpected details: {:?}", details),
}
}
}
Expand Down
12 changes: 2 additions & 10 deletions shotover-proxy/src/protocols/redis_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,17 +582,9 @@ impl RedisCodec {
Frame::Error(s) => self.handle_redis_error(s, frame),
Frame::Null => {
if self.decode_as_response {
Message::new_response(
QueryResponse::empty(),
false,
RawFrame::Redis(frame),
)
Message::new_response(QueryResponse::empty(), false, RawFrame::Redis(frame))
} else {
Message::new_query(
QueryMessage::empty(),
false,
RawFrame::Redis(frame),
)
Message::new_query(QueryMessage::empty(), false, RawFrame::Redis(frame))
}
}
})
Expand Down
30 changes: 18 additions & 12 deletions shotover-proxy/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ use metrics_runtime::Receiver;
use tokio::runtime::{self, Runtime};
use tokio::task::JoinHandle;
use tracing::{debug, info};
use tracing_appender::non_blocking::{WorkerGuard, NonBlocking};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::fmt::format::{Format, DefaultFields};
use tracing_subscriber::layer::Layered;
use tracing_subscriber::reload::Handle;
use tracing_subscriber::{EnvFilter, Registry};

use crate::admin::httpserver::LogFilterHttpExporter;
use crate::config::topology::Topology;
use crate::config::Config;
use crate::transforms::Transforms;
use crate::transforms::Wrapper;
use crate::admin::httpserver::LogFilterHttpExporter;

#[derive(Clap, Clone)]
#[clap(version = crate_version!(), author = "Instaclustr")]
Expand All @@ -41,7 +41,7 @@ impl Default for ConfigOpts {
topology_file: "config/topology.yaml".into(),
config_file: "config/config.yaml".into(),
core_threads: 4,
stack_size: 2097152
stack_size: 2097152,
}
}
}
Expand All @@ -68,13 +68,21 @@ impl Runner {

let tracing = TracingState::new(config.main_log_level.as_str());

Ok(Runner { runtime, topology, config, tracing })
Ok(Runner {
runtime,
topology,
config,
tracing,
})
}

pub fn with_observability_interface(self) -> Result<Self> {
let receiver = Receiver::builder().build().expect("failed to create receiver");
let receiver = Receiver::builder()
.build()
.expect("failed to create receiver");
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone());
let exporter =
LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone());

receiver.install();
self.runtime.spawn(exporter.async_run());
Expand All @@ -100,7 +108,8 @@ impl Runner {
struct TracingState {
/// Once this is dropped tracing logs are ignored
guard: WorkerGuard,
handle: Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>,
handle:
Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>,
}

impl TracingState {
Expand All @@ -121,7 +130,6 @@ impl TracingState {
}
}


pub struct RunnerSpawned {
pub runtime: Runtime,
pub handle: JoinHandle<Result<()>>,
Expand Down Expand Up @@ -149,8 +157,6 @@ pub async fn run(topology: Topology, config: Config) -> Result<()> {
info!("Goodbye!");
Ok(())
}
Err(error) => {
Err(anyhow!("Failed to run chains: {}", error))
}
Err(error) => Err(anyhow!("Failed to run chains: {}", error)),
}
}
61 changes: 34 additions & 27 deletions shotover-proxy/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
use crate::config::topology::TopicHolder;
use crate::sources::cassandra_source::{CassandraConfig, CassandraSource};
use crate::sources::mpsc_source::{AsyncMpsc, AsyncMpscConfig};
use crate::transforms::chain::{TransformChain};
use crate::sources::redis_source::{RedisConfig, RedisSource};
use crate::transforms::chain::TransformChain;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use crate::sources::redis_source::{RedisSource, RedisConfig};
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;

use anyhow::{Result};
use anyhow::Result;

pub mod cassandra_source;
pub mod mpsc_source;
pub mod redis_source;

/*
notify_shutdown: broadcast::Sender<()>,
notify_shutdown: broadcast::Sender<()>,

/// Used as part of the graceful shutdown process to wait for client
/// connections to complete processing.
///
/// Tokio channels are closed once all `Sender` handles go out of scope.
/// When a channel is closed, the receiver receives `None`. This is
/// leveraged to detect all connection handlers completing. When a
/// connection handler is initialized, it is assigned a clone of
/// `shutdown_complete_tx`. When the listener shuts down, it drops the
/// sender held by this `shutdown_complete_tx` field. Once all handler tasks
/// complete, all clones of the `Sender` are also dropped. This results in
/// `shutdown_complete_rx.recv()` completing with `None`. At this point, it
/// is safe to exit the server process.
shutdown_complete_rx: mpsc::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
*/
/// Used as part of the graceful shutdown process to wait for client
/// connections to complete processing.
///
/// Tokio channels are closed once all `Sender` handles go out of scope.
/// When a channel is closed, the receiver receives `None`. This is
/// leveraged to detect all connection handlers completing. When a
/// connection handler is initialized, it is assigned a clone of
/// `shutdown_complete_tx`. When the listener shuts down, it drops the
/// sender held by this `shutdown_complete_tx` field. Once all handler tasks
/// complete, all clones of the `Sender` are also dropped. This results in
/// `shutdown_complete_rx.recv()` completing with `None`. At this point, it
/// is safe to exit the server process.
shutdown_complete_rx: mpsc::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
*/

#[derive(Debug)]
pub enum Sources {
Cassandra(CassandraSource),
Mpsc(AsyncMpsc),
Redis(RedisSource)
Redis(RedisSource),
}

impl Sources {
Expand All @@ -54,7 +54,7 @@ impl Sources {
pub enum SourcesConfig {
Cassandra(CassandraConfig),
Mpsc(AsyncMpscConfig),
Redis(RedisConfig)
Redis(RedisConfig),
}

impl SourcesConfig {
Expand All @@ -66,9 +66,18 @@ impl SourcesConfig {
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>> {
match self {
SourcesConfig::Cassandra(c) => c.get_source(chain, topics, notify_shutdown, shutdown_complete_tx).await,
SourcesConfig::Mpsc(m) => m.get_source(chain, topics, notify_shutdown, shutdown_complete_tx).await,
SourcesConfig::Redis(r) => r.get_source(chain, topics, notify_shutdown, shutdown_complete_tx).await
SourcesConfig::Cassandra(c) => {
c.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
.await
}
SourcesConfig::Mpsc(m) => {
m.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
.await
}
SourcesConfig::Redis(r) => {
r.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
.await
}
}
}
}
Expand All @@ -82,6 +91,4 @@ pub trait SourcesFromConfig: Send {
notify_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>>;


}
7 changes: 6 additions & 1 deletion shotover-proxy/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ impl TransformChain {
*count += 1;
}

let chain_response = chain.process_request(Wrapper::new_with_chain_name(messages, chain.name.clone()), name).await;
let chain_response = chain
.process_request(
Wrapper::new_with_chain_name(messages, chain.name.clone()),
name,
)
.await;

if let Err(e) = &chain_response {
warn!("Internal error in buffered chain: {:?} - resetting", e);
Expand Down
8 changes: 4 additions & 4 deletions shotover-proxy/src/transforms/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ mod lua_transform_tests {
// use crate::transforms::printer::Printer;
// use crate::transforms::{Transform, Transforms, TransformsFromConfig, Wrapper};

// const REQUEST_STRING: &str = r###"
//qm.namespace = {"aaaaaaaaaa", "bbbbb"}
//return call_next_transform(qm)
//"###;
// const REQUEST_STRING: &str = r###"
//qm.namespace = {"aaaaaaaaaa", "bbbbb"}
//return call_next_transform(qm)
//"###;

// #[tokio::test(flavor = "multi_thread")]
// async fn test_lua_script() -> Result<(), Box<dyn Error>> {
Expand Down
Loading