Skip to content

Commit

Permalink
Ensure jsonl endpoint is localhost only, limit connections
Browse files Browse the repository at this point in the history
  • Loading branch information
dcadenas committed Sep 27, 2024
1 parent 0348524 commit 264de68
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 10 deletions.
6 changes: 3 additions & 3 deletions src/google_pubsub_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use gcloud_sdk::{
};
use tracing::debug;

const ALLOWED_PUBKEYS: &[&str] = &[
const _ALLOWED_PUBKEYS: &[&str] = &[
"07ecf9838136fe430fac43fa0860dbc62a0aac0729c5a33df1192ce75e330c9f", // Bryan
"89ef92b9ebe6dc1e4ea398f6477f227e95429627b0a33dc89b640e137b256be5", // Daniel
"e8ad7c13ba55ba0a04c23fc09edce74ad7a8dddc059dc2e274ff63bc2e047782", // Daphne
Expand Down Expand Up @@ -50,9 +50,9 @@ impl GooglePubSubClient {
impl PublishEvents for GooglePubSubClient {
async fn publish_events(
&mut self,
follow_changes: Vec<NotificationMessage>,
notification_messages: Vec<NotificationMessage>,
) -> Result<(), PublisherError> {
let pubsub_messages: Result<Vec<PubsubMessage>, PublisherError> = follow_changes
let pubsub_messages: Result<Vec<PubsubMessage>, PublisherError> = notification_messages
.iter()
// .filter(|message| {
// // TODO: Temporary filter while developing this service
Expand Down
1 change: 1 addition & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ async fn start(settings: Settings) -> Result<()> {
settings.tcp_importer_port,
event_sender,
cancellation_token.clone(),
5,
)
.await?;

Expand Down
58 changes: 51 additions & 7 deletions src/tcp_importer.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
use anyhow::{Context, Result};
use nostr_sdk::prelude::*;
use std::net::IpAddr;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::net::TcpListener;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::broadcast::Sender;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio_util::{sync::CancellationToken, task::TaskTracker};
use tracing::{error, info};

Expand All @@ -13,13 +16,17 @@ pub async fn start_tcp_importer(
tcp_port: u16,
event_tx: Sender<Box<Event>>,
cancellation_token: CancellationToken,
max_connections: usize,
) -> Result<()> {
let address = SocketAddr::from(([0, 0, 0, 0], tcp_port));
let listener = TcpListener::bind(&address)
.await
.context(format!("Error opening TCP listener on port {tcp_port}"))?;

info!("Listening for tcp connections on {}", address);
info!("Listening for TCP connections on {}", address);

// Semaphore with the maximum number of allowed concurrent connections
let semaphore = Arc::new(Semaphore::new(max_connections));

task_tracker.spawn(async move {
loop {
Expand All @@ -29,11 +36,31 @@ pub async fn start_tcp_importer(
break;
}

Ok((stream, _)) = listener.accept() => {
let tx = event_tx.clone();
let cancel_token = cancellation_token.clone();
result = listener.accept() => {
match result {
Ok((stream, addr)) => {
if is_local_address(&addr) {
let tx = event_tx.clone();
let cancel_token = cancellation_token.clone();
let semaphore_clone = semaphore.clone();
let permit = semaphore_clone.acquire_owned().await;

tokio::spawn(handle_connection(stream, tx, cancel_token));
match permit {
Ok(permit) => {
tokio::spawn(handle_connection_with_permit(stream, tx, cancel_token, permit));
}
Err(e) => {
error!("Failed to acquire semaphore permit: {}", e);
}
}
} else {
info!("Ignoring connection from non-local address: {}", addr);
}
}
Err(e) => {
error!("Failed to accept connection: {}", e);
}
}
}
}
}
Expand All @@ -44,9 +71,26 @@ pub async fn start_tcp_importer(
Ok(())
}

fn is_local_address(addr: &SocketAddr) -> bool {
match addr.ip() {
IpAddr::V4(ipv4) => ipv4.is_loopback(), // Checks if in 127.0.0.0/8
IpAddr::V6(ipv6) => ipv6.is_loopback(), // Checks if ::1
}
}

async fn handle_connection_with_permit(
stream: TcpStream,
tx: Sender<Box<Event>>,
cancellation_token: CancellationToken,
_permit: OwnedSemaphorePermit,
) {
handle_connection(stream, tx, cancellation_token).await;
// Here the _permit is dropped
}

// Handle the incoming connection and read jsonl contact events
async fn handle_connection(
stream: tokio::net::TcpStream,
stream: TcpStream,
event_tx: Sender<Box<Event>>,
cancellation_token: CancellationToken,
) {
Expand Down

0 comments on commit 264de68

Please sign in to comment.