Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Master as a tcp server #358

Merged
merged 13 commits into from
May 4, 2024
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ members = [
]

[workspace.dependencies]
sfio-tokio-ffi = "0.8"
sfio-tokio-ffi = "0.9.0"
sfio-tracing-ffi = "0.9.0"
oo-bindgen = "0.8.6"
tracing = "0.1"
Expand Down
138 changes: 138 additions & 0 deletions dnp3/examples/master_tcp_server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
//! Example of running a master as a TCP server
use std::collections::HashMap;
use std::net::SocketAddr;

use tokio_stream::StreamExt;
use tokio_util::codec::{FramedRead, LinesCodec};

use dnp3::decode::*;
use dnp3::link::*;
use dnp3::master::*;
use dnp3::tcp::*;

/// read handler that does nothing
#[derive(Copy, Clone)]
pub struct NullReadHandler;

impl NullReadHandler {
/// create a boxed instance of the NullReadHandler
pub fn boxed() -> Box<dyn ReadHandler> {
Box::new(Self {})
}
}

impl ReadHandler for NullReadHandler {}

#[derive(Copy, Clone)]
struct NullAssociationHandler;

impl AssociationHandler for NullAssociationHandler {}

#[derive(Copy, Clone)]
struct NullAssociationInformation;

impl AssociationInformation for NullAssociationInformation {}

struct ConnectionHandler {
channels: HashMap<u16, MasterChannel>,
}

impl ConnectionHandler {
async fn setup_channel(
channel: &mut MasterChannel,
source: u16,
) -> Result<AssociationHandle, Box<dyn std::error::Error>> {
let assoc = channel
.add_association(
EndpointAddress::try_new(source)?,
AssociationConfig::new(
EventClasses::all(),
EventClasses::all(),
Classes::all(),
EventClasses::none(),
),
Box::new(NullReadHandler),
Box::new(NullAssociationHandler),
Box::new(NullAssociationInformation),
)
.await?;
channel.enable().await?;
Ok(assoc)
}
}

impl dnp3::tcp::ConnectionHandler for ConnectionHandler {
async fn accept(&mut self, _: SocketAddr) -> Result<AcceptAction, Reject> {
Ok(AcceptAction::GetLinkIdentity)
}

async fn start(&mut self, _: MasterChannel, _: SocketAddr) {
//
}

async fn accept_link_id(
&mut self,
addr: SocketAddr,
source: u16,
_destination: u16,
) -> Result<AcceptConfig, Reject> {
tracing::info!("accepted from {addr:?}:{source}");
let mut decode_level = DecodeLevel::nothing();
decode_level.application = AppDecodeLevel::ObjectValues;
let config = AcceptConfig {
error_mode: LinkErrorMode::Close,
config: MasterChannelConfig {
master_address: EndpointAddress::try_new(1).unwrap(),
decode_level,
tx_buffer_size: Default::default(),
rx_buffer_size: Default::default(),
},
};
Ok(config)
}

async fn start_with_link_id(
&mut self,
mut channel: MasterChannel,
_addr: SocketAddr,
source: u16,
destination: u16,
) {
tracing::info!("start with source = {source} dest = {destination}");

match Self::setup_channel(&mut channel, source).await {
Ok(_) => {
self.channels.insert(source, channel);
}
Err(err) => {
tracing::warn!("channel setup failed: {err}");
}
}
}
}

#[tokio::main(flavor = "multi_thread")]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::INFO)
.with_target(false)
.init();

let _server = spawn_master_tcp_server(
"127.0.0.1:20000".parse()?,
LinkIdConfig::default().decode_level(PhysDecodeLevel::Data),
ConnectionHandler {
channels: Default::default(),
},
)
.await?;

let mut reader = FramedRead::new(tokio::io::stdin(), LinesCodec::new());

loop {
let cmd = reader.next().await.unwrap()?;
if cmd == "x" {
return Ok(());
}
}
}
5 changes: 1 addition & 4 deletions dnp3/src/app/attr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -892,10 +892,7 @@ impl<'a> Iterator for VariationListIter<'a> {
let variation = *self.data.first()?;
let prop = *self.data.get(1)?;

self.data = match self.data.get(2..) {
Some(x) => x,
None => &[],
};
self.data = self.data.get(2..).unwrap_or_default();

Some(AttrItem {
variation,
Expand Down
22 changes: 20 additions & 2 deletions dnp3/src/app/timeout.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use std::time::Duration;

/// A wrapper around a std::time::Duration
/// that ensures values are in the range `[1ms .. 1hour]`
/// A wrapper around a std::time::Duration that ensures values are in the range `[1ms .. 1hour]`
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
#[cfg_attr(
feature = "serialization",
Expand All @@ -10,6 +9,12 @@ use std::time::Duration;
#[cfg_attr(feature = "serialization", serde(try_from = "Duration"))]
pub struct Timeout(pub(crate) Duration);

impl From<Timeout> for Duration {
fn from(value: Timeout) -> Self {
value.0
}
}

impl Default for Timeout {
fn default() -> Self {
Self(Duration::from_secs(5))
Expand Down Expand Up @@ -39,6 +44,19 @@ impl Timeout {
/// maximum allowed timeout value as a duration
pub const MAX: Duration = Duration::from_secs(60 * 60); // one hour

/// construct from a duration, saturating at the minimum and maximum
pub fn saturating(value: Duration) -> Self {
if value < Self::MIN {
return Self(Self::MIN);
}

if value > Self::MAX {
return Self(Self::MAX);
}

Self(value)
}

/// try to construct a `Timeout` from a count of seconds
///
/// returns a `RangeError` is < `Timeout::MIN` or > `Timeout::MAX`
Expand Down
4 changes: 4 additions & 0 deletions dnp3/src/link/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ impl Layer {
}
}

pub(crate) fn seed(&mut self, seed_data: &[u8]) -> Result<(), scursor::WriteError> {
self.reader.seed(seed_data)
}

pub(crate) fn reset(&mut self) {
self.secondary_state = SecondaryState::NotReset;
self.reader.reset();
Expand Down
15 changes: 8 additions & 7 deletions dnp3/src/link/reader.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::io::ErrorKind;

use crate::decode::DecodeLevel;
use crate::link::display::LinkDisplay;
use crate::link::error::LinkError;
Expand All @@ -9,7 +7,7 @@ use crate::link::{LinkErrorMode, LinkReadMode};
use crate::util::phys::{PhysAddr, PhysLayer};

use crate::link;
use scursor::ReadCursor;
use scursor::{ReadCursor, WriteCursor};

/// How many link frames might be required to transport this much application data?
const fn num_link_frames(fragment_size: usize) -> usize {
Expand Down Expand Up @@ -142,6 +140,13 @@ impl Reader {
}
}

pub(crate) fn seed(&mut self, seed_data: &[u8]) -> Result<(), scursor::WriteError> {
let mut cursor = WriteCursor::new(self.buffer.writable());
cursor.write_bytes(seed_data)?;
self.buffer.advance_write(seed_data.len());
Ok(())
}

pub(crate) fn reset(&mut self) {
self.buffer.reset();
self.parser.reset();
Expand Down Expand Up @@ -197,10 +202,6 @@ impl Reader {
// now we can read more data
let (count, addr) = io.read(self.buffer.writable(), level.physical).await?;

if count == 0 {
return Err(LinkError::Stdio(ErrorKind::UnexpectedEof));
}

self.buffer.advance_write(count);
Ok(addr)
}
Expand Down
4 changes: 4 additions & 0 deletions dnp3/src/master/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ impl MasterTask {
}
}

pub(crate) fn seed_link(&mut self, seed_data: &[u8]) -> Result<(), scursor::WriteError> {
self.reader.seed_link(seed_data)
}

#[cfg(test)]
pub(crate) fn set_rx_frame_info(&mut self, info: crate::link::header::FrameInfo) {
self.reader.get_inner().set_rx_frame_info(info);
Expand Down
2 changes: 1 addition & 1 deletion dnp3/src/outstation/control/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl SelectState {
return Err(CommandStatus::Timeout);
}
Some(elapsed) => {
if elapsed > timeout.0 {
if elapsed > timeout.into() {
tracing::warn!("received valid OPERATE after SELECT timeout");
return Err(CommandStatus::Timeout);
}
Expand Down
2 changes: 1 addition & 1 deletion dnp3/src/outstation/database/details/event/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -682,7 +682,7 @@ impl EventBuffer {
.events
.iter()
.filter(|(_, e)| e.state.get() == EventState::Unselected && selector(e))
.take(limit.unwrap_or(usize::max_value()))
.take(limit.unwrap_or(usize::MAX))
{
evt.state.set(EventState::Selected);
count += 1;
Expand Down
2 changes: 1 addition & 1 deletion dnp3/src/outstation/database/details/event/write_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ where
let difference: u64 = time.timestamp().raw_value() - cto.timestamp().raw_value();

// too big of a difference to encode
if difference > u16::max_value().into() {
if difference > u16::MAX.into() {
return Ok(Continue::NewHeader);
}

Expand Down
3 changes: 3 additions & 0 deletions dnp3/src/tcp/master/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
mod client;
mod server;

pub use client::*;
pub use server::*;
Loading
Loading