Skip to content

Commit

Permalink
refined worker and init logic (#79)
Browse files Browse the repository at this point in the history
* refined setup.py

* add worker id property, use default INFO

* make worker init first then enable http server
  • Loading branch information
lkevinzc authored Oct 23, 2021
1 parent 6f932c3 commit e912e3f
Show file tree
Hide file tree
Showing 10 changed files with 40 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Before contributing to this repository, please first discuss the change you wish

1. After you have forked this repository, you could use `make install` for *the first time* to install the local development dependencies; afterward, you may use `make dev` to build the library when you have made any code changes.
2. Before committing your changes, you can use `make format && make lint` to ensure the codes follow our style standards.
3. Increase the version number (discussed with the repository owner) and update docs or README.md when necessary. The versioning scheme we use is [SemVer](https://semver.org/).
3. Please add corresponding tests to your change if that's related to new feature or API, and ensure `make test` can pass.
4. Submit your pull request.

### Contacts
Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 2 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[package]
name = "mosec"
version = "0.2.0"
authors = ["Keming <kemingy94@gmail.com>"]
version = "0.2.1"
authors = ["Keming <kemingy94@gmail.com>", "Zichen <lkevinzc@gmail.com>"]
edition = "2018"
license = "Apache-2.0"
readme = "README.md"
Expand All @@ -11,7 +11,6 @@ documentation = "https://docs.rs/mosec"
exclude = ["target", "examples", "tests", "scripts"]

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
hyper = { version = "0.14", features = ["http1", "server", "runtime"] }
bytes = "1"
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ Mosec is a high-performance and flexible model serving framework for building ML

## Installation

Mosec requires Python 3.6 or above. Install the latest PyPI package with:
Mosec requires Python 3.6 or above. Install the latest [PyPI package](https://pypi.org/project/mosec/) with:

pip install -U mosec

Expand Down
2 changes: 1 addition & 1 deletion mosec/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.2.0"
__version__ = "0.2.1"
3 changes: 2 additions & 1 deletion mosec/coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ def __init__(
self.worker = worker()
self.worker._set_mbs(max_batch_size)
self.worker._set_stage(stage)
self.worker._set_id(worker_id)

self.name = f"<{stage_id}|{worker.__name__}|{worker_id}>"

Expand All @@ -72,8 +73,8 @@ def __init__(
self.shutdown = shutdown
self.shutdown_notify = shutdown_notify

self.init_protocol()
self.init_worker()
self.init_protocol()
self.run()

def exit(self):
Expand Down
17 changes: 15 additions & 2 deletions mosec/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,12 @@ class Worker:
will follow the "_same type_" constraint.
"""

example: Any = None

def __init__(self):
self._stage = None
self._max_batch_size = 1

self.example = None
self._id = 0

def _serialize_ipc(self, data):
"""Define IPC serialize method"""
Expand All @@ -56,6 +57,18 @@ def _set_stage(self, stage):
def _set_mbs(self, mbs):
self._max_batch_size = mbs

def _set_id(self, id):
self._id = id

@property
def id(self) -> int:
"""
This property returns the worker id in the range of [1, ... ,`num`]
(`num` as defined [here][mosec.server.Server--multiprocess])
to differentiate workers in the same stage.
"""
return self._id

def serialize(self, data: Any) -> bytes:
"""
This method defines serialization of the last stage (egress).
Expand Down
7 changes: 6 additions & 1 deletion src/coordinator.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
use std::fs;
use std::path::Path;
use std::sync::Arc;
use std::time::Duration;

use async_channel::{bounded, Receiver, Sender};
use tokio::sync::Barrier;
use tracing::{error, info};

use crate::args::Opts;
Expand Down Expand Up @@ -53,7 +55,8 @@ impl Coordinator {
}
}

pub(crate) async fn run(&self) {
pub(crate) fn run(&self) -> Arc<Barrier> {
let barrier = Arc::new(Barrier::new(self.batches.len() + 1));
let mut last_receiver = self.receiver.clone();
let mut last_sender = self.sender.clone();
let wait_time = self.wait_time;
Expand All @@ -77,11 +80,13 @@ impl Coordinator {
last_receiver.clone(),
sender.clone(),
last_sender.clone(),
barrier.clone(),
));
last_receiver = receiver;
last_sender = sender;
}
tokio::spawn(finish_task(last_receiver));
barrier.clone()
}
}

Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,13 @@ async fn main() {
info!(?opts, "parse arguments");

let coordinator = Coordinator::init_from_opts(&opts);
tokio::spawn(async move {
coordinator.run().await;
});
let barrier = coordinator.run();
barrier.wait().await;

let service = make_service_fn(|_| async { Ok::<_, hyper::Error>(service_fn(service_func)) });
let addr: SocketAddr = format!("{}:{}", opts.address, opts.port).parse().unwrap();
let server = hyper::Server::bind(&addr).serve(service);
info!(?addr, "http server is running at");
let graceful = server.with_graceful_shutdown(shutdown_signal());
if let Err(err) = graceful.await {
tracing::error!(%err, "server error");
Expand Down
8 changes: 8 additions & 0 deletions src/protocol.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::{Duration, Instant};

use async_channel::{Receiver, Sender};
use bytes::{BufMut, Bytes, BytesMut};
use tokio::io::{self, AsyncReadExt, AsyncWriteExt};
use tokio::net::{UnixListener, UnixStream};
use tokio::sync::Barrier;
use tracing::{debug, error, info};

use crate::metrics::Metrics;
Expand All @@ -20,6 +22,7 @@ const BIT_STATUS_BAD_REQ: u16 = 0b10;
const BIT_STATUS_VALIDATION_ERR: u16 = 0b100;
// Others are treated as Internal Error

#[allow(clippy::too_many_arguments)]
pub(crate) async fn communicate(
path: PathBuf,
batch_size: usize,
Expand All @@ -28,6 +31,7 @@ pub(crate) async fn communicate(
receiver: Receiver<u32>,
sender: Sender<u32>,
last_sender: Sender<u32>,
barrier: Arc<Barrier>,
) {
let listener = UnixListener::bind(&path).expect("failed to bind to the socket");
let mut connection_id: u32 = 0;
Expand Down Expand Up @@ -105,6 +109,10 @@ pub(crate) async fn communicate(
}
}
});
// ensure every stage is properly initialized (including warmup)
if connection_id == 1 {
barrier.wait().await;
}
}
Err(err) => {
error!(error=%err, "accept connection error");
Expand Down

0 comments on commit e912e3f

Please sign in to comment.