Skip to content

Commit

Permalink
PDK with a single value (#11)
Browse files Browse the repository at this point in the history
* Back to attempt with single value

* Go back to known_key stub approach

* Update to new `RCow`

* Lots of fixes and cleaning up

See rodrimati1992/abi_stable_crates#75
  • Loading branch information
Mario authored and marioortizmanero committed Mar 3, 2022
1 parent 1be8ef6 commit 7c7f931
Show file tree
Hide file tree
Showing 67 changed files with 1,194 additions and 1,251 deletions.
663 changes: 369 additions & 294 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 4 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -206,9 +206,8 @@ reqwest = { version = "0.11.9", default-features = false, features = [
qwal = { git = "https://github.com/tremor-rs/qwal" }

# plugin system
abi_stable = { version = "0.10.3", default_features = false, features = ["rust_latest_stable"] }
# FIXME: clean up after new release
async-ffi = { version = "0.3", features = ["abi_stable"], git = "https://github.com/oxalica/async-ffi", branch = "master" }
abi_stable = { version = "0.10", default_features = false, features = ["rust_latest_stable"] }
async-ffi = { version = "0.4", features = ["abi_stable"] }
walkdir = "2.3.2"

[dev-dependencies]
Expand Down Expand Up @@ -238,9 +237,8 @@ es-integration = []
[patch.crates-io]
rust-bert = { git = 'https://github.com/mfelsche/rust-bert.git', rev = '1140989' }
rust_tokenizers = { git = 'https://github.com/mfelsche/rust-tokenizers.git', rev = '5a7860d' }
# FIXME: update to crates.io when this is added to a new version:
# https://github.com/rodrimati1992/abi_stable_crates/pull/70
abi_stable = { git = "https://github.com/marioortizmanero/abi_stable_crates.git", branch = "rvec-append" }
# FIXME: update to v0.11 when it's released
abi_stable = { git = "https://github.com/rodrimati1992/abi_stable_crates.git", branch = "0_11" }
# FIXME: update to crates.io when this is added to a new version:
# https://github.com/simd-lite/simd-json-derive/pull/9
simd-json-derive = { git = "https://github.com/marioortizmanero/simd-json-derive.git", branch = "abi-stable" }
Expand Down
2 changes: 0 additions & 2 deletions plugins/connectors/metronome/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions plugins/connectors/metronome/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ tremor-value = { version = "0.3.2", path = "../../../tremor-value" }
tremor-runtime = { version = "0.11.4", path = "../../../" }

abi_stable = { version = "0.10", default-features = false }
# FIXME: clean up after new release
async-ffi = { version = "0.3", features = ["abi_stable"], git = "https://github.com/oxalica/async-ffi", branch = "master" }
async-ffi = { version = "0.4", features = ["abi_stable"] }
serde = { version = "1.0", features = ["derive"] }

[patch.crates-io]
# FIXME: update to crates.io when this is added to a new version:
# https://github.com/rodrimati1992/abi_stable_crates/pull/70
abi_stable = { git = "https://github.com/marioortizmanero/abi_stable_crates.git", branch = "rvec-append" }
abi_stable = { git = "https://github.com/marioortizmanero/abi_stable_crates.git", branch = "tremor-patches" }
# FIXME: update to crates.io when this is added to a new version:
# https://github.com/simd-lite/simd-json-derive/pull/9
simd-json-derive = { git = "https://github.com/marioortizmanero/simd-json-derive.git", branch = "abi-stable" }
4 changes: 2 additions & 2 deletions plugins/connectors/metronome/src/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use tremor_common::time::nanotime;
use tremor_pipeline::DEFAULT_STREAM_ID;
use tremor_runtime::{connectors::prelude::*, pdk::RResult, ttry, utils::hostname};
use tremor_script::{EventOriginUri, EventPayload};
use tremor_value::{literal, pdk::PdkValue};
use tremor_value::literal;

use std::{
future,
Expand Down Expand Up @@ -107,7 +107,7 @@ impl RawSource for Metronome {
#[sabi_extern_fn]
pub fn from_config(
_id: RString,
raw_config: ROption<PdkValue<'static>>,
raw_config: ROption<Value<'static>>,
) -> FfiFuture<RResult<BoxedRawConnector>> {
async move {
if let RSome(raw_config) = raw_config {
Expand Down
8 changes: 5 additions & 3 deletions src/codec/binflux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ use std::io::{Cursor, Write};
use std::str;
use tremor_value::{literal, Object, Value};

use abi_stable::std_types::{RCowStr, Tuple2};

const TYPE_I64: u8 = 0;
const TYPE_F64: u8 = 1;
const TYPE_STRING: u8 = 2;
Expand Down Expand Up @@ -59,7 +61,7 @@ impl BInflux {
.chain_err(|| ErrorKind::InvalidBInfluxData("too many tags".into()))?,
)?;

for (k, v) in tags {
for Tuple2(k, v) in tags {
if let Some(v) = v.as_str() {
write_str(&mut res, k)?;
write_str(&mut res, v)?;
Expand All @@ -74,7 +76,7 @@ impl BInflux {
u16::try_from(fields.len())
.chain_err(|| ErrorKind::InvalidBInfluxData("too many fields".into()))?,
)?;
for (k, v) in fields {
for Tuple2(k, v) in fields {
write_str(&mut res, k)?;
if let Some(v) = v.as_i64() {
res.write_u8(TYPE_I64)?;
Expand Down Expand Up @@ -106,7 +108,7 @@ impl BInflux {
}

pub fn decode(data: &[u8]) -> Result<Value> {
fn read_string<'event>(c: &mut Cursor<&'event [u8]>) -> Result<Cow<'event, str>> {
fn read_string<'event>(c: &mut Cursor<&'event [u8]>) -> Result<RCowStr<'event>> {
let l = c.read_u16::<BigEndian>()? as usize;
#[allow(clippy::cast_possible_truncation)]
let p = c.position() as usize;
Expand Down
10 changes: 7 additions & 3 deletions src/codec/csv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::codec::prelude::*;
use beef::Cow;

use abi_stable::{
rvec,
std_types::{RCowStr, RString},
};

#[derive(Clone)]
pub struct Csv {}
Expand All @@ -37,9 +41,9 @@ impl Codec for Csv {
None => return Ok(None),
}?;

let mut fields = vec![];
let mut fields = rvec![];
for field in record.iter() {
fields.push(Value::String(Cow::from(field.to_string())));
fields.push(Value::String(RCowStr::Owned(RString::from(field))));
}

Ok(Some(Value::Array(fields)))
Expand Down
6 changes: 4 additions & 2 deletions src/codec/syslog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use chrono::{DateTime, Datelike, Offset, TimeZone, Utc};
use syslog_loose::{IncompleteDate, ProcId, Protocol, SyslogFacility, SyslogSeverity};
use tremor_value::Value;

use abi_stable::std_types::Tuple2;

const DEFAULT_PRI: i32 = 13;

pub trait Now: Send + Sync + Clone {
Expand Down Expand Up @@ -59,7 +61,7 @@ where
))
})?;
let mut elem = String::with_capacity(16);
for (id, params) in sd.iter() {
for Tuple2(id, params) in sd.iter() {
elem.push('[');
elem.push_str(&id.to_string());
let params = params.as_array().ok_or_else(|| {
Expand All @@ -73,7 +75,7 @@ where
"Invalid structured data: param's key value pair not an object",
))
})?;
for (k, v) in kv_map {
for Tuple2(k, v) in kv_map {
let value = v.as_str().ok_or_else(|| {
Error::from(ErrorKind::InvalidSyslogData(
"Invalid structured data: param's key value pair not an object",
Expand Down
4 changes: 3 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use tremor_script::{
};
use tremor_value::prelude::*;

use abi_stable::std_types::Tuple2;

pub(crate) type Id = String;

/// possible reconnect strategies for controlling if and how to reconnect
Expand Down Expand Up @@ -205,7 +207,7 @@ impl Connector {
.get_object("codec_map")
.map(|o| {
o.iter()
.map(|(k, v)| Ok((k.to_string(), Codec::from_value(v)?)))
.map(|Tuple2(k, v)| Ok((k.to_string(), Codec::from_value(v)?)))
.collect::<Result<HashMap<_, _>>>()
})
.transpose()?,
Expand Down
45 changes: 13 additions & 32 deletions src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ use crate::{
};
use abi_stable::{
std_types::{
RBox, RCow,
RBox, RCowStr,
ROption::{self, RNone, RSome},
RResult::{RErr, ROk},
RStr, RString, RVec,
Expand Down Expand Up @@ -273,9 +273,9 @@ pub struct ConnectorContext {
/// type of the connector
connector_type: ConnectorType,
/// The Quiescence Beacon
pub quiescence_beacon: BoxedQuiescenceBeacon,
quiescence_beacon: BoxedQuiescenceBeacon,
/// Notifier
pub notifier: reconnect::BoxedConnectionLostNotifier,
notifier: reconnect::BoxedConnectionLostNotifier,
}

impl Display for ConnectorContext {
Expand Down Expand Up @@ -371,7 +371,7 @@ pub async fn spawn(
let builder = known_connectors
.get(&config.connector_type)
.ok_or_else(|| ErrorKind::UnknownConnectorType(config.connector_type.to_string()))?;
let connector_config = config.config.clone().map(Into::into).into();
let connector_config = config.config.clone().into();
let connector = builder.from_config()(alias.clone().into(), connector_config).await;
let connector = Result::from(connector.map_err(Error::from))?;
let connector = Connector(connector);
Expand Down Expand Up @@ -1005,14 +1005,14 @@ const OUT_PORTS_REF: &'static [Cow<'static, str>; 2] = &OUT_PORTS;
#[abi_stable::sabi_trait]
pub trait RawConnector: Send {
/// Valid input ports for the connector, by default this is `in`
fn input_ports(&self) -> RVec<RCow<'static, str>> {
fn input_ports(&self) -> RVec<RCowStr<'static>> {
IN_PORTS_REF
.into_iter()
.map(|port| conv_cow_str_inv(port.clone()))
.collect()
}
/// Valid output ports for the connector, by default this is `out` and `err`
fn output_ports(&self) -> RVec<RCow<'static, str>> {
fn output_ports(&self) -> RVec<RCowStr<'static>> {
OUT_PORTS_REF
.into_iter()
.map(|port| conv_cow_str_inv(port.clone()))
Expand Down Expand Up @@ -1141,14 +1141,12 @@ pub type BoxedRawConnector = RawConnector_TO<'static, RBox<()>>;
///
/// Note that it may hurt performance in some parts of the connector interface,
/// so some of the functionality may not be fully wrapped.
pub struct Connector(pub BoxedRawConnector);
pub(crate) struct Connector(pub BoxedRawConnector);
impl Connector {
/// Wrapper for [`BoxedRawConnector::input_ports`]
#[inline]
pub fn input_ports(&self) -> Vec<Cow<'static, str>> {
self.0.input_ports().into_iter().map(conv_cow_str).collect()
}
/// Wrapper for [`BoxedRawConnector::output_ports`]
#[inline]
pub fn output_ports(&self) -> Vec<Cow<'static, str>> {
self.0
Expand All @@ -1158,19 +1156,16 @@ impl Connector {
.collect()
}

/// Wrapper for [`BoxedRawConnector::is_valid_input_port`]
#[inline]
pub fn is_valid_input_port(&self, port: &str) -> bool {
self.0.is_valid_input_port(port.into())
}

/// Wrapper for [`BoxedRawConnector::is_valid_output_port`]
#[inline]
pub fn is_valid_output_port(&self, port: &str) -> bool {
self.0.is_valid_output_port(port.into())
}

/// Wrapper for [`BoxedRawConnector::create_source`]
#[inline]
pub async fn create_source(
&mut self,
Expand All @@ -1182,16 +1177,12 @@ impl Connector {
.create_source(source_context.clone(), builder.qsize())
.await
{
ROk(RSome(raw_source)) => {
let wrapper = Source(raw_source);
builder.spawn(wrapper, source_context).map(Some)
}
ROk(RSome(source)) => builder.spawn(source, source_context).map(Some),
ROk(RNone) => Ok(None),
RErr(err) => Err(err.into()),
}
}

/// Wrapper for [`BoxedRawConnector::create_sink`]
#[inline]
pub async fn create_sink(
&mut self,
Expand All @@ -1206,16 +1197,12 @@ impl Connector {
.create_sink(sink_context.clone(), builder.qsize(), reply_tx)
.await
{
ROk(RSome(raw_sink)) => {
let wrapper = Sink(raw_sink);
builder.spawn(wrapper, sink_context).map(Some)
}
ROk(RSome(sink)) => builder.spawn(sink, sink_context).map(Some),
ROk(RNone) => Ok(None),
RErr(err) => Err(err.into()),
}
}

/// Wrapper for [`BoxedRawConnector::connect`]
#[inline]
pub async fn connect(&mut self, ctx: &ConnectorContext, attempt: &Attempt) -> Result<bool> {
self.0
Expand All @@ -1225,7 +1212,6 @@ impl Connector {
.into() // RResult -> Result
}

/// Wrapper for [`BoxedRawConnector::on_start`]
#[inline]
pub async fn on_start(&mut self, ctx: &ConnectorContext) -> Result<()> {
self.0
Expand All @@ -1235,7 +1221,6 @@ impl Connector {
.into() // RResult -> Result
}

/// Wrapper for [`BoxedRawConnector::on_pause`]
#[inline]
pub async fn on_pause(&mut self, ctx: &ConnectorContext) -> Result<()> {
self.0
Expand All @@ -1245,7 +1230,6 @@ impl Connector {
.into() // RResult -> Result
}

/// Wrapper for [`BoxedRawConnector::on_resume`]
#[inline]
pub async fn on_resume(&mut self, ctx: &ConnectorContext) -> Result<()> {
self.0
Expand All @@ -1255,7 +1239,6 @@ impl Connector {
.into() // RResult -> Result
}

/// Wrapper for [`BoxedRawConnector::on_drain`]
#[inline]
pub async fn on_drain(&mut self, ctx: &ConnectorContext) -> Result<()> {
self.0
Expand All @@ -1265,7 +1248,6 @@ impl Connector {
.into() // RResult -> Result
}

/// Wrapper for [`BoxedRawConnector::on_stop`]
#[inline]
pub async fn on_stop(&mut self, ctx: &ConnectorContext) -> Result<()> {
self.0
Expand All @@ -1275,7 +1257,6 @@ impl Connector {
.into() // RResult -> Result
}

/// Wrapper for [`BoxedRawConnector::codec_requirements`]
#[inline]
pub fn codec_requirements(&self) -> CodecReq {
self.0.codec_requirements()
Expand All @@ -1287,15 +1268,15 @@ impl Connector {
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, Default, StableAbi)]
pub struct ConnectorType(RString);

impl From<ConnectorType> for RString {
impl From<ConnectorType> for String {
fn from(ct: ConnectorType) -> Self {
ct.0
ct.0.into()
}
}

impl From<ConnectorType> for String {
impl From<ConnectorType> for RString {
fn from(ct: ConnectorType) -> Self {
ct.0.into()
ct.0
}
}

Expand Down
Loading

0 comments on commit 7c7f931

Please sign in to comment.