Skip to content

Commit

Permalink
feat: support proxy plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
vicanso committed Apr 14, 2024
1 parent c5f9eb3 commit 7291903
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 32 deletions.
1 change: 1 addition & 0 deletions src/config/load.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ pub struct LocationConf {
pub br_level: Option<u32>,
pub zstd_level: Option<u32>,
pub remark: Option<String>,
pub proxy_plugins: Option<Vec<String>>,
}

impl LocationConf {
Expand Down
3 changes: 3 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ fn run() -> Result<(), Box<dyn Error>> {
let webhook_type = conf.webhook_type.clone().unwrap_or_default();
let mut builder = env_logger::Builder::from_env(env_logger::Env::default());

// TODO load from config
let _ = plugin::init_proxy_plguins(vec![]);

if let Some(log_level) = &conf.log_level {
match log_level.to_lowercase().as_str() {
"error" => builder.filter_level(log::LevelFilter::Error),
Expand Down
18 changes: 4 additions & 14 deletions src/plugin/limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,26 +13,14 @@
// limitations under the License.

use super::ProxyPlugin;
use super::{Error, Result};
use crate::state::State;
use crate::util;
use async_trait::async_trait;
use pingora::proxy::Session;
use pingora_limits::inflight::Inflight;
use snafu::{ResultExt, Snafu};
use std::num::ParseIntError;
use substring::Substring;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid {message}"))]
Invalid { message: String },
#[snafu(display("Parse int {source}"))]
ParseInt { source: ParseIntError },
#[snafu(display("Exceed limit {value}/{max}"))]
Exceed { max: isize, value: isize },
}
type Result<T, E = Error> = std::result::Result<T, E>;

#[derive(PartialEq, Debug)]
pub enum LimitTag {
Ip,
Expand All @@ -53,7 +41,9 @@ impl Limiter {
let (key, max) = value.split_once(' ').ok_or(Error::Invalid {
message: value.to_string(),
})?;
let max = max.parse::<u32>().context(ParseIntSnafu)?;
let max = max
.parse::<u32>()
.map_err(|e| Error::ParseInt { source: e })?;
if key.len() < 2 {
return Err(Error::Invalid {
message: key.to_string(),
Expand Down
62 changes: 61 additions & 1 deletion src/plugin/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,77 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use crate::state::State;
use async_trait::async_trait;
use once_cell::sync::OnceCell;
use pingora::proxy::Session;
use snafu::Snafu;
use std::num::ParseIntError;

mod limit;

pub use limit::Limiter;

#[derive(Debug, Snafu)]
pub enum Error {
#[snafu(display("Invalid {message}"))]
Invalid { message: String },
#[snafu(display("Parse int {source}"))]
ParseInt { source: ParseIntError },
#[snafu(display("Exceed limit {value}/{max}"))]
Exceed { max: isize, value: isize },
}
type Result<T, E = Error> = std::result::Result<T, E>;

#[async_trait]
pub trait ProxyPlugin {
pub trait ProxyPlugin: Sync + Send {
async fn handle(&self, _session: &mut Session, _ctx: &mut State) -> pingora::Result<bool> {
Ok(false)
}
}

#[derive(PartialEq, Debug)]
pub enum ProxyPluginCategory {
Limit,
}

pub struct ProxyPluginConf {
pub name: String,
pub value: String,
pub category: ProxyPluginCategory,
}

static PROXY_PLUGINS: OnceCell<HashMap<String, Box<dyn ProxyPlugin>>> = OnceCell::new();

pub fn init_proxy_plguins(confs: Vec<ProxyPluginConf>) -> Result<()> {
PROXY_PLUGINS.get_or_try_init(|| {
let mut plguins: HashMap<String, Box<dyn ProxyPlugin>> = HashMap::new();
for conf in confs {
match conf.category {
ProxyPluginCategory::Limit => {
let l = Limiter::new(&conf.value)?;
plguins.insert(conf.name, Box::new(l));
}
_ => {
return Err(Error::Invalid {
message: format!("Invalid cateogry({:?})", conf.category),
})
}
};
}

Ok(plguins)
})?;
Ok(())
}

pub fn get_proxy_plugin(name: &str) -> Option<&dyn ProxyPlugin> {
if let Some(plugins) = PROXY_PLUGINS.get() {
if let Some(plugin) = plugins.get(name) {
return Some(plugin.as_ref());
}
}
None
}
23 changes: 23 additions & 0 deletions src/proxy/location.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
use super::Upstream;
use crate::config::LocationConf;
use crate::http_extra::{convert_headers, HttpHeader};
use crate::plugin::get_proxy_plugin;
use crate::state::State;
use http::header::HeaderValue;
use once_cell::sync::Lazy;
use pingora::http::{RequestHeader, ResponseHeader};
use pingora::proxy::Session;
use regex::Regex;
use snafu::{ResultExt, Snafu};
use std::sync::Arc;
Expand Down Expand Up @@ -88,6 +91,7 @@ pub struct Location {
gzip_level: u32,
br_level: u32,
zstd_level: u32,
proxy_plugins: Option<Vec<String>>,
pub support_compression: bool,
pub upstream: Arc<Upstream>,
pub upstream_name: String,
Expand Down Expand Up @@ -149,6 +153,7 @@ impl Location {
headers: format_headers(&conf.headers)?,
proxy_headers: format_headers(&conf.proxy_headers)?,
gzip_level,
proxy_plugins: conf.proxy_plugins.clone(),
br_level,
zstd_level,
support_compression,
Expand Down Expand Up @@ -234,6 +239,24 @@ impl Location {
}
None
}
#[inline]
pub async fn exec_proxy_plugins(
&self,
session: &mut Session,
ctx: &mut State,
) -> pingora::Result<bool> {
if let Some(plugins) = &self.proxy_plugins {
for name in plugins.iter() {
if let Some(plugin) = get_proxy_plugin(name) {
let done = plugin.handle(session, ctx).await?;
if done {
return Ok(true);
}
}
}
}
Ok(false)
}
}

#[cfg(test)]
Expand Down
7 changes: 6 additions & 1 deletion src/proxy/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use super::logger::Parser;
use super::{Location, Upstream};
use crate::config::{LocationConf, PingapConf, UpstreamConf};
use crate::http_extra::{HttpResponse, HTTP_HEADER_CONTENT_JSON, HTTP_HEADER_WWW_AUTHENTICATE};
use crate::serve::Serve;
use crate::plugin::ProxyPlugin;
use crate::serve::ADMIN_SERVE;
use crate::state::{get_hostname, State};
use crate::util;
Expand Down Expand Up @@ -442,6 +442,11 @@ impl ProxyHttp for Server {
session.downstream_compression.adjust_level(level);
}

let done = lo.exec_proxy_plugins(session, ctx).await?;
if done {
return Ok(true);
}

ctx.location_index = Some(location_index);

// TODO get response from cache
Expand Down
4 changes: 2 additions & 2 deletions src/serve/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
// limitations under the License.

use super::embedded_file::EmbeddedStaticFile;
use super::Serve;
use crate::config::{self, save_config, LocationConf, ServerConf, UpstreamConf};
use crate::config::{PingapConf, CATEGORY_LOCATION, CATEGORY_SERVER, CATEGORY_UPSTREAM};
use crate::http_extra::HttpResponse;
use crate::plugin::ProxyPlugin;
use crate::state::State;
use crate::state::{get_start_time, restart};
use crate::util::{self, get_pkg_version};
Expand Down Expand Up @@ -191,7 +191,7 @@ fn get_method_path(session: &Session) -> (Method, String) {
}

#[async_trait]
impl Serve for AdminServe {
impl ProxyPlugin for AdminServe {
async fn handle(&self, session: &mut Session, ctx: &mut State) -> pingora::Result<bool> {
let (method, mut path) = get_method_path(session);
let api_prefix = "/api";
Expand Down
9 changes: 7 additions & 2 deletions src/serve/directory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::plugin::ProxyPlugin;
use crate::state::State;
use crate::util;
use async_trait::async_trait;
use bytes::Bytes;
use http::{header, HeaderValue, StatusCode};
use log::error;
Expand Down Expand Up @@ -129,8 +131,11 @@ impl Directory {
cache_private,
}
}
/// Gets the file match request path, then sends the data as chunk.
pub async fn handle(&self, session: &mut Session, _ctx: &mut State) -> pingora::Result<bool> {
}

#[async_trait]
impl ProxyPlugin for Directory {
async fn handle(&self, session: &mut Session, _ctx: &mut State) -> pingora::Result<bool> {
let mut filename = session.req_header().uri.path().to_string();
if filename.len() <= 1 {
filename = self.index.clone();
Expand Down
8 changes: 7 additions & 1 deletion src/serve/mock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
// limitations under the License.

use crate::http_extra::{convert_headers, HttpResponse};
use crate::plugin::ProxyPlugin;
use crate::state::State;
use async_trait::async_trait;
use bytes::Bytes;
use http::StatusCode;
use pingora::proxy::Session;
Expand Down Expand Up @@ -64,8 +66,12 @@ impl MockResponse {

Ok(MockResponse { resp })
}
}

#[async_trait]
impl ProxyPlugin for MockResponse {
/// Sends the mock data to client.
pub async fn handle(&self, session: &mut Session, _ctx: &mut State) -> pingora::Result<bool> {
async fn handle(&self, session: &mut Session, _ctx: &mut State) -> pingora::Result<bool> {
let _ = self.resp.clone().send(session).await?;
Ok(true)
}
Expand Down
11 changes: 0 additions & 11 deletions src/serve/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,22 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use crate::state::State;
use async_trait::async_trait;
use pingora::proxy::Session;

mod admin;
mod directory;
mod embedded_file;
mod mock;

#[async_trait]
pub trait Serve {
async fn handle(&self, _session: &mut Session, _ctx: &mut State) -> pingora::Result<bool> {
Ok(true)
}
}

pub use admin::ADMIN_SERVE;
pub use directory::{Directory, PROTOCOL_FILE};
pub use mock::{MockResponse, PROTOCOL_MOCK};

0 comments on commit 7291903

Please sign in to comment.