Skip to content

Commit

Permalink
Run rustfmt on the codebase and enforce rustfmt via github actions (#132
Browse files Browse the repository at this point in the history
)
  • Loading branch information
rukai authored Aug 9, 2021
1 parent 87486c0 commit 9ded46b
Show file tree
Hide file tree
Showing 23 changed files with 142 additions and 120 deletions.
4 changes: 3 additions & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
run: |
curl -L "$LINK/$SCCACHE_VERSION/sccache-$SCCACHE_VERSION-x86_64-unknown-linux-musl.tar.gz" | tar xz
echo "$PWD/sccache-$SCCACHE_VERSION-x86_64-unknown-linux-musl/" >> $GITHUB_PATH
- name: Install libpacap
- name: Install libpcap
run: sudo apt-get install -y libpcap-dev
- name: Start sccache
env:
Expand All @@ -57,6 +57,8 @@ jobs:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
run: mkdir -p test_tmp && aws s3 sync s3://shotover-test-captures ./test_tmp
- name: Check `cargo fmt` was run
run: cargo fmt --all -- --check
- name: Run tests
run: cargo test -- --include-ignored
timeout-minutes: 30
Expand Down
3 changes: 3 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[toolchain]
channel = "stable"
components = [ "rustfmt" ]
Empty file added rustfmt.toml
Empty file.
4 changes: 2 additions & 2 deletions shotover-proxy/benches/helpers.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
use anyhow::Result;
use shotover_proxy::runner::{ConfigOpts, Runner};
use tokio::task::JoinHandle;
use tokio::runtime::Runtime;
use tokio::task::JoinHandle;

pub fn run_shotover_with_topology(topology_path: &str) -> (Runtime, JoinHandle<Result<()>>) {
let opts = ConfigOpts {
topology_file: topology_path.into(),
config_file: "config/config.yaml".into(),
.. ConfigOpts::default()
..ConfigOpts::default()
};
let spawn = Runner::new(opts).unwrap().run_spawn();

Expand Down
14 changes: 3 additions & 11 deletions shotover-proxy/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,11 @@
use std::{fmt, error};
use crate::message::Messages;
use std::{error, fmt};
use thiserror::Error;


#[derive(Error, Clone, Debug)]
pub enum RequestError {
#[error("Invalid header (expected {expected:?}, got {found:?})")]
InvalidHeader {
expected: String,
found: String,
},
InvalidHeader { expected: String, found: String },
#[error("Malform Request: {0}")]
MalformedRequest(String),

Expand All @@ -18,11 +14,8 @@ pub enum RequestError {

#[error("Could not process chain: {0}")]
ChainProcessingError(String),


}


#[derive(Error, Debug)]
pub struct ConfigError {
pub message: String,
Expand Down Expand Up @@ -51,5 +44,4 @@ impl fmt::Display for ConfigError {
}
}


pub type ChainResponse = anyhow::Result<Messages>;
pub type ChainResponse = anyhow::Result<Messages>;
2 changes: 1 addition & 1 deletion shotover-proxy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ pub mod config;
pub mod error;
pub mod message;
pub mod protocols;
pub mod runner;
pub mod runtimes;
pub mod server;
pub mod sources;
pub mod transforms;
pub mod runner;
29 changes: 14 additions & 15 deletions shotover-proxy/src/message/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,20 +207,18 @@ pub enum ASTHolder {
impl ASTHolder {
pub fn get_command(&self) -> String {
match self {
ASTHolder::SQL(statement) => {
match statement {
Statement::Query(_) => "SELECT",
Statement::Insert { .. } => "INSERT",
Statement::Update { .. } => "UPDATE",
Statement::Delete { .. } => "DELETE",
Statement::CreateView { .. } => "CREATE VIEW",
Statement::CreateTable { .. } => "CREATE TABLE",
Statement::AlterTable { .. } => "ALTER TABLE",
Statement::Drop { .. } => "DROP",
_ => "UNKNOWN",
}
.to_string()
ASTHolder::SQL(statement) => match statement {
Statement::Query(_) => "SELECT",
Statement::Insert { .. } => "INSERT",
Statement::Update { .. } => "UPDATE",
Statement::Delete { .. } => "DELETE",
Statement::CreateView { .. } => "CREATE VIEW",
Statement::CreateTable { .. } => "CREATE TABLE",
Statement::AlterTable { .. } => "ALTER TABLE",
Statement::Drop { .. } => "DROP",
_ => "UNKNOWN",
}
.to_string(),
ASTHolder::Commands(commands) => {
if let Value::List(coms) = commands {
if let Some(Value::Bytes(b)) = coms.get(0) {
Expand Down Expand Up @@ -486,8 +484,9 @@ impl Value {
ColType::Double => Value::Float(decode_double(actual_bytes).unwrap()),
ColType::Float => Value::Float(decode_float(actual_bytes).unwrap() as f64),
ColType::Int => Value::Integer(decode_int(actual_bytes).unwrap() as i64),
ColType::Timestamp =>
Value::Timestamp(Utc.timestamp_nanos(decode_timestamp(actual_bytes).unwrap())),
ColType::Timestamp => {
Value::Timestamp(Utc.timestamp_nanos(decode_timestamp(actual_bytes).unwrap()))
}
ColType::Uuid => Value::Bytes(Bytes::copy_from_slice(actual_bytes)),
ColType::Varchar => Value::Strings(decode_varchar(actual_bytes).unwrap()),
ColType::Varint => Value::Integer(decode_varint(actual_bytes).unwrap()),
Expand Down
2 changes: 1 addition & 1 deletion shotover-proxy/src/protocols/cassandra_protocol2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ mod cassandra_protocol_tests {
ast.to_string().replace(char::is_whitespace, ""),
);
}
details => panic!("Unexpected details: {:?}", details)
details => panic!("Unexpected details: {:?}", details),
}
}
}
Expand Down
12 changes: 2 additions & 10 deletions shotover-proxy/src/protocols/redis_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -582,17 +582,9 @@ impl RedisCodec {
Frame::Error(s) => self.handle_redis_error(s, frame),
Frame::Null => {
if self.decode_as_response {
Message::new_response(
QueryResponse::empty(),
false,
RawFrame::Redis(frame),
)
Message::new_response(QueryResponse::empty(), false, RawFrame::Redis(frame))
} else {
Message::new_query(
QueryMessage::empty(),
false,
RawFrame::Redis(frame),
)
Message::new_query(QueryMessage::empty(), false, RawFrame::Redis(frame))
}
}
})
Expand Down
30 changes: 18 additions & 12 deletions shotover-proxy/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ use metrics_runtime::Receiver;
use tokio::runtime::{self, Runtime};
use tokio::task::JoinHandle;
use tracing::{debug, info};
use tracing_appender::non_blocking::{WorkerGuard, NonBlocking};
use tracing_appender::non_blocking::{NonBlocking, WorkerGuard};
use tracing_subscriber::fmt::format::{DefaultFields, Format};
use tracing_subscriber::fmt::Layer;
use tracing_subscriber::fmt::format::{Format, DefaultFields};
use tracing_subscriber::layer::Layered;
use tracing_subscriber::reload::Handle;
use tracing_subscriber::{EnvFilter, Registry};

use crate::admin::httpserver::LogFilterHttpExporter;
use crate::config::topology::Topology;
use crate::config::Config;
use crate::transforms::Transforms;
use crate::transforms::Wrapper;
use crate::admin::httpserver::LogFilterHttpExporter;

#[derive(Clap, Clone)]
#[clap(version = crate_version!(), author = "Instaclustr")]
Expand All @@ -41,7 +41,7 @@ impl Default for ConfigOpts {
topology_file: "config/topology.yaml".into(),
config_file: "config/config.yaml".into(),
core_threads: 4,
stack_size: 2097152
stack_size: 2097152,
}
}
}
Expand All @@ -68,13 +68,21 @@ impl Runner {

let tracing = TracingState::new(config.main_log_level.as_str());

Ok(Runner { runtime, topology, config, tracing })
Ok(Runner {
runtime,
topology,
config,
tracing,
})
}

pub fn with_observability_interface(self) -> Result<Self> {
let receiver = Receiver::builder().build().expect("failed to create receiver");
let receiver = Receiver::builder()
.build()
.expect("failed to create receiver");
let socket: SocketAddr = self.config.observability_interface.parse()?;
let exporter = LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone());
let exporter =
LogFilterHttpExporter::new(receiver.controller(), socket, self.tracing.handle.clone());

receiver.install();
self.runtime.spawn(exporter.async_run());
Expand All @@ -100,7 +108,8 @@ impl Runner {
struct TracingState {
/// Once this is dropped tracing logs are ignored
guard: WorkerGuard,
handle: Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>,
handle:
Handle<EnvFilter, Layered<Layer<Registry, DefaultFields, Format, NonBlocking>, Registry>>,
}

impl TracingState {
Expand All @@ -121,7 +130,6 @@ impl TracingState {
}
}


pub struct RunnerSpawned {
pub runtime: Runtime,
pub handle: JoinHandle<Result<()>>,
Expand Down Expand Up @@ -149,8 +157,6 @@ pub async fn run(topology: Topology, config: Config) -> Result<()> {
info!("Goodbye!");
Ok(())
}
Err(error) => {
Err(anyhow!("Failed to run chains: {}", error))
}
Err(error) => Err(anyhow!("Failed to run chains: {}", error)),
}
}
61 changes: 34 additions & 27 deletions shotover-proxy/src/sources/mod.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,43 @@
use crate::config::topology::TopicHolder;
use crate::sources::cassandra_source::{CassandraConfig, CassandraSource};
use crate::sources::mpsc_source::{AsyncMpsc, AsyncMpscConfig};
use crate::transforms::chain::{TransformChain};
use crate::sources::redis_source::{RedisConfig, RedisSource};
use crate::transforms::chain::TransformChain;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::task::JoinHandle;
use crate::sources::redis_source::{RedisSource, RedisConfig};
use tokio::sync::{broadcast, mpsc};
use tokio::task::JoinHandle;

use anyhow::{Result};
use anyhow::Result;

pub mod cassandra_source;
pub mod mpsc_source;
pub mod redis_source;

/*
notify_shutdown: broadcast::Sender<()>,
notify_shutdown: broadcast::Sender<()>,
/// Used as part of the graceful shutdown process to wait for client
/// connections to complete processing.
///
/// Tokio channels are closed once all `Sender` handles go out of scope.
/// When a channel is closed, the receiver receives `None`. This is
/// leveraged to detect all connection handlers completing. When a
/// connection handler is initialized, it is assigned a clone of
/// `shutdown_complete_tx`. When the listener shuts down, it drops the
/// sender held by this `shutdown_complete_tx` field. Once all handler tasks
/// complete, all clones of the `Sender` are also dropped. This results in
/// `shutdown_complete_rx.recv()` completing with `None`. At this point, it
/// is safe to exit the server process.
shutdown_complete_rx: mpsc::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
*/
/// Used as part of the graceful shutdown process to wait for client
/// connections to complete processing.
///
/// Tokio channels are closed once all `Sender` handles go out of scope.
/// When a channel is closed, the receiver receives `None`. This is
/// leveraged to detect all connection handlers completing. When a
/// connection handler is initialized, it is assigned a clone of
/// `shutdown_complete_tx`. When the listener shuts down, it drops the
/// sender held by this `shutdown_complete_tx` field. Once all handler tasks
/// complete, all clones of the `Sender` are also dropped. This results in
/// `shutdown_complete_rx.recv()` completing with `None`. At this point, it
/// is safe to exit the server process.
shutdown_complete_rx: mpsc::Receiver<()>,
shutdown_complete_tx: mpsc::Sender<()>,
*/

#[derive(Debug)]
pub enum Sources {
Cassandra(CassandraSource),
Mpsc(AsyncMpsc),
Redis(RedisSource)
Redis(RedisSource),
}

impl Sources {
Expand All @@ -54,7 +54,7 @@ impl Sources {
pub enum SourcesConfig {
Cassandra(CassandraConfig),
Mpsc(AsyncMpscConfig),
Redis(RedisConfig)
Redis(RedisConfig),
}

impl SourcesConfig {
Expand All @@ -66,9 +66,18 @@ impl SourcesConfig {
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>> {
match self {
SourcesConfig::Cassandra(c) => c.get_source(chain, topics, notify_shutdown, shutdown_complete_tx).await,
SourcesConfig::Mpsc(m) => m.get_source(chain, topics, notify_shutdown, shutdown_complete_tx).await,
SourcesConfig::Redis(r) => r.get_source(chain, topics, notify_shutdown, shutdown_complete_tx).await
SourcesConfig::Cassandra(c) => {
c.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
.await
}
SourcesConfig::Mpsc(m) => {
m.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
.await
}
SourcesConfig::Redis(r) => {
r.get_source(chain, topics, notify_shutdown, shutdown_complete_tx)
.await
}
}
}
}
Expand All @@ -82,6 +91,4 @@ pub trait SourcesFromConfig: Send {
notify_shutdown: broadcast::Sender<()>,
shutdown_complete_tx: mpsc::Sender<()>,
) -> Result<Vec<Sources>>;


}
7 changes: 6 additions & 1 deletion shotover-proxy/src/transforms/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,12 @@ impl TransformChain {
*count += 1;
}

let chain_response = chain.process_request(Wrapper::new_with_chain_name(messages, chain.name.clone()), name).await;
let chain_response = chain
.process_request(
Wrapper::new_with_chain_name(messages, chain.name.clone()),
name,
)
.await;

if let Err(e) = &chain_response {
warn!("Internal error in buffered chain: {:?} - resetting", e);
Expand Down
8 changes: 4 additions & 4 deletions shotover-proxy/src/transforms/lua.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,10 @@ mod lua_transform_tests {
// use crate::transforms::printer::Printer;
// use crate::transforms::{Transform, Transforms, TransformsFromConfig, Wrapper};

// const REQUEST_STRING: &str = r###"
//qm.namespace = {"aaaaaaaaaa", "bbbbb"}
//return call_next_transform(qm)
//"###;
// const REQUEST_STRING: &str = r###"
//qm.namespace = {"aaaaaaaaaa", "bbbbb"}
//return call_next_transform(qm)
//"###;

// #[tokio::test(flavor = "multi_thread")]
// async fn test_lua_script() -> Result<(), Box<dyn Error>> {
Expand Down
Loading

0 comments on commit 9ded46b

Please sign in to comment.