Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 53 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

44 changes: 44 additions & 0 deletions crates/engineioxide-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
[package]
name = "engineioxide-client"
description = "Engine IO client implementation in rust"
version = "0.17.0"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
repository.workspace = true
homepage.workspace = true
keywords.workspace = true
categories.workspace = true
license.workspace = true
readme = "README.md"

[dependencies]
engineioxide-core = { path = "../engineioxide-core", version = "0.2" }
bytes.workspace = true
futures-core.workspace = true
futures-util.workspace = true
http.workspace = true
http-body.workspace = true
serde.workspace = true
serde_json.workspace = true
thiserror.workspace = true
tokio = { workspace = true, features = ["rt", "time"] }
hyper = { workspace = true, features = ["client", "http1"] }
tokio-tungstenite.workspace = true
http-body-util.workspace = true
pin-project-lite.workspace = true
smallvec.workspace = true
hyper-util = { workspace = true, features = ["tokio"] }

# Tracing
tracing = { workspace = true, optional = true }

[dev-dependencies]
tokio = { workspace = true, features = ["macros", "parking_lot"] }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
engineioxide = { path = "../engineioxide", features = ["tracing", "v3"] }

[features]
v3 = ["engineioxide-core/v3"]
tracing = ["dep:tracing", "engineioxide-core/tracing"]
__test_harness = []
Empty file.
112 changes: 112 additions & 0 deletions crates/engineioxide-client/src/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
use std::{
fmt,
pin::Pin,
sync::Mutex,
task::{Context, Poll},
};

use engineioxide_core::{Packet, PacketBuf, PacketParseError, Sid};
use futures_core::Stream;
use futures_util::{
Sink, SinkExt, StreamExt,
stream::{SplitSink, SplitStream},
};
use tokio::sync::mpsc::{self, error::TrySendError};

use crate::{
HttpClient, poll,
transport::{Transport, polling::PollingSvc},
};

type SendPongFut<S> = Pin<
Box<
dyn Future<Output = Result<(), <SplitSink<Transport<S>, Packet> as Sink<Packet>>::Error>>
+ 'static,
>,
>;

pin_project_lite::pin_project! {
pub struct Client<S: PollingSvc> {
#[pin]
pub transport_rx: SplitStream<Transport<S>>,
// TODO: is this the right implementation? We need something that can be driven itself.
// Otherwise we need a way to drive the transport_tx. Normally it should be driven by the user.
// But what if we need to send a PONG packet from the inner lib?
#[pin]
pub transport_tx: SplitSink<Transport<S>, Packet>,

pub sid: Sid,
// pub tx: mpsc::Sender<PacketBuf>,
// pub(crate) rx: Mutex<mpsc::Receiver<PacketBuf>>,
}
}

impl<S: PollingSvc> Client<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
pub async fn connect(svc: S) -> Result<Self, ()> {
let mut inner = HttpClient::new(svc);
let packet = inner.handshake().await.unwrap();

let transport = Transport::Polling { inner };
let (transport_tx, transport_rx) = transport.split();
let client = Client {
transport_tx,
transport_rx,
sid: packet.sid,
};

Ok(client)
}
}

impl<S: PollingSvc> Stream for Client<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
type Item = Result<Packet, PacketParseError>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
match poll!(this.transport_rx.poll_next(cx)) {
Some(Ok(Packet::Ping)) => {
cx.waker().wake_by_ref();
// let mut tx = self.transport_tx.clone();
// let fut = async move {
// tx.send(Packet::Pong).await?;
// tx.flush().await
// };
// this.pending_pong.set(Some(Box::pin(fut)));

Poll::Pending
}
packet => Poll::Ready(packet),
}
}
}

impl<S: PollingSvc> Sink<Packet> for Client<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
type Error = ();

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().transport_tx.poll_ready(cx)
}

fn start_send(self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
self.project().transport_tx.start_send(item)
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().transport_tx.poll_flush(cx)
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.project().transport_tx.poll_close(cx)
}
}
1 change: 1 addition & 0 deletions crates/engineioxide-client/src/io.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

19 changes: 19 additions & 0 deletions crates/engineioxide-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// #![warn(clippy::pedantic)]
#![allow(clippy::similar_names)]
//! Engine.IO client library for Rust.

mod client;
mod io;
mod transport;
pub use crate::client::Client;
pub use crate::transport::polling::HttpClient;

#[macro_export]
macro_rules! poll {
($expr:expr) => {
match $expr {
std::task::Poll::Pending => return std::task::Poll::Pending,
std::task::Poll::Ready(value) => value,
}
};
}
81 changes: 81 additions & 0 deletions crates/engineioxide-client/src/transport/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::{
fmt,
pin::Pin,
task::{Context, Poll},
};

use engineioxide_core::{Packet, PacketParseError, TransportType};
use futures_core::Stream;
use futures_util::Sink;

use crate::{HttpClient, transport::polling::PollingSvc};

pub mod polling;

pin_project_lite::pin_project! {
#[project = TransportProj]
pub enum Transport<S: PollingSvc> {
Polling {
#[pin]
inner: HttpClient<S>
},
Websocket {
#[pin]
inner: HttpClient<S>
}
}
}

impl<S: PollingSvc> Transport<S> {
pub fn transport_type(&self) -> TransportType {
match self {
Transport::Polling { .. } => TransportType::Polling,
Transport::Websocket { .. } => TransportType::Websocket,
}
}
}

impl<S: PollingSvc> Stream for Transport<S>
where
S::Error: fmt::Debug,
<S::Body as http_body::Body>::Error: fmt::Debug,
{
type Item = Result<Packet, PacketParseError>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match self.as_mut().project() {
TransportProj::Polling { inner } => inner.poll_next(cx),
TransportProj::Websocket { inner } => inner.poll_next(cx),
}
}
}
impl<S: PollingSvc> Sink<Packet> for Transport<S> {
type Error = ();

fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
TransportProj::Polling { inner } => inner.poll_ready(cx),
TransportProj::Websocket { inner } => inner.poll_ready(cx),
}
}

fn start_send(self: Pin<&mut Self>, item: Packet) -> Result<(), Self::Error> {
match self.project() {
TransportProj::Polling { inner } => inner.start_send(item),
TransportProj::Websocket { inner } => inner.start_send(item),
}
}

fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
TransportProj::Polling { inner } => inner.poll_flush(cx),
TransportProj::Websocket { inner } => inner.poll_flush(cx),
}
}

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
match self.project() {
TransportProj::Polling { inner } => inner.poll_close(cx),
TransportProj::Websocket { inner } => inner.poll_close(cx),
}
}
}
Loading