Skip to content

Commit

Permalink
Make daemon loop over coordinator connection to make it possible to c…
Browse files Browse the repository at this point in the history
…reate a system service awaiting coordinator connection (#689)

Loop over trying to connect to the coordinator on first connection to
make it possible to await the coordinator to be ready.

This notably will make it possible to make dora daemon a system service
which can
```bash
sudo tee /etc/systemd/system/dora-daemon.service << EOF
[Unit]
Description=Dora Daemon in Conda Environment
After=network.target

[Service]
Environment="PATH=$PATH"
User=HwHiAiUser
WorkingDirectory=/home/HwHiAiUser
ExecStart=/bin/bash --login  -c 'source /home/HwHiAiUser/.bashrc && source $CONDA_PATH/bin/activate base && dora daemon'
Restart=always
RestartSec=3
StartLimitInterval=60
StartLimitBurst=3

[Install]
WantedBy=multi-user.target
EOF

sudo systemctl daemon-reload
sudo systemctl enable dora-daemon.service
sudo systemctl start dora-daemon.service
```
  • Loading branch information
haixuanTao authored Oct 13, 2024
2 parents a90221d + 9dbaa95 commit 4f70121
Showing 1 changed file with 19 additions and 4 deletions.
23 changes: 19 additions & 4 deletions binaries/daemon/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,16 @@ use dora_message::{
daemon_to_coordinator::{CoordinatorRequest, DaemonCoordinatorReply, DaemonRegisterRequest},
};
use eyre::{eyre, Context};
use std::{io::ErrorKind, net::SocketAddr};
use std::{io::ErrorKind, net::SocketAddr, time::Duration};
use tokio::{
net::TcpStream,
sync::{mpsc, oneshot},
time::sleep,
};
use tokio_stream::{wrappers::ReceiverStream, Stream};
use tracing::warn;

const DAEMON_COORDINATOR_RETRY_INTERVAL: std::time::Duration = Duration::from_secs(1);

#[derive(Debug)]
pub struct CoordinatorEvent {
Expand All @@ -28,9 +32,20 @@ pub async fn register(
listen_port: u16,
clock: &HLC,
) -> eyre::Result<impl Stream<Item = Timestamped<CoordinatorEvent>>> {
let mut stream = TcpStream::connect(addr)
.await
.wrap_err("failed to connect to dora-coordinator")?;
let mut stream = loop {
match TcpStream::connect(addr)
.await
.wrap_err("failed to connect to dora-coordinator")
{
Err(err) => {
warn!("Could not connect to: {addr}, with error: {err}. Retring in {DAEMON_COORDINATOR_RETRY_INTERVAL:#?}..");
sleep(DAEMON_COORDINATOR_RETRY_INTERVAL).await;
}
Ok(stream) => {
break stream;
}
};
};
stream
.set_nodelay(true)
.wrap_err("failed to set TCP_NODELAY")?;
Expand Down

0 comments on commit 4f70121

Please sign in to comment.