Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

11章のコードをActix-Web 2.0.0に移植する #2

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,831 changes: 1,113 additions & 718 deletions ch11/log-collector/Cargo.lock

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions ch11/log-collector/api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,10 @@ authors = ["Rust Bicycle Book <bicycle-book@example.com>"]
edition = "2018"

[dependencies]
serde = "1.0.8"
serde_derive = "1.0.8"
serde = "1"
serde_derive = "1"

[dependencies.chrono]
features = ["serde"]
version = "0.4.0"
version = "0.4"

14 changes: 8 additions & 6 deletions ch11/log-collector/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,19 @@ authors = ["Rust Bicycle Book <bicycle-book@example.com>"]
edition = "2018"

[dependencies]
env_logger = "0.6"
log = "0.4"
actix-web = "0.7"
failure = "0.1"
actix-web = "2"
api = {path = "../api"}
dotenv = "0.13"
chrono = "0.4"
csv = "1"
actix-web-multipart-file = "0.1"
futures = "0.1"
futures = "0.3"
env_logger = "0.7"
itertools = "0.8"
log = "0.4"
actix-rt = "1"
actix-multipart = "0.2"
tempfile = "3"
actix-service = "1.0.1"

[dependencies.diesel]
features = ["postgres", "chrono", "r2d2"]
Expand Down
157 changes: 106 additions & 51 deletions ch11/log-collector/server/src/handlers.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,51 @@
use crate::db;
use crate::Server;
use actix_web::{FutureResponse, HttpResponse, State};
use actix_web::{Json, Query};
use actix_web_multipart_file::{FormData, Multiparts};
use actix_multipart::Multipart;
use actix_web::error::{ErrorBadRequest, ErrorInternalServerError};
use actix_web::{web, HttpResponse, Result};
use diesel::pg::PgConnection;
use failure::Error;
use futures::prelude::*;
use itertools::Itertools;
use log::debug;
use std::io::{BufReader, Read};

fn load_file(conn: &PgConnection, file: impl Read) -> Result<usize, Error> {
use std::io::{self, prelude::*};

// ハンドラで大きく変わったのは3箇所。
// 1. ハンドラが `async` に
// 2. エラー型がFailureの `Error` から独自の `Error` に
// + 各所に `.map_err` を挟む必要が出た
// 3. マルチパートの処理をactix-multipartで書き直した
//
// マルチパートはActix-Web 0.7の頃とあまりAPIに変更はないが、`async` / `await` が入ったことにより
// 泥臭い処理をしててもある程度読みやすくなったのでラッパを作らずにそのままのAPIを使うことにした。
//
// 全体的な注意としては `async` ハンドラの中でファイルIOやDB接続などのブロッキング処理をしている点。
// ブロッキング処理をするとスレッドを止めてしまうので、
// 非同期処理のメリットである「1スレッドでいくつもの処理をこなせる」を完全に殺してしまう。
// とはいえCSVリーダーやテンポラリファイル、DB接続など外部のクレートはどうしようもない。
// このあたりは非同期のエコシステムが成熟するのを待つことになる。
//
// どうしても非同期処理の中で同期処理を扱いときは別スレッドを立てて同期処理を逃がすのが定石。
// Actixも `actix_rt::Arbiter::exec` というAPIを用意している。
// Arbiter (=決定者) はスレッド+イベントループを表わすエンティティで、Arbiterを作ればスレッドが作られるし、
// その上でFutureを走らせることができる。
//
// 今回はActix-Web 0.7の頃から非同期処理の中で同期処理を書いてしまっていたのもあってArbiterは使わずに同期処理をしてしまった。
// 他には扱っているデータがスレッドアンセーフなので別スレッドに送ろうとすると面倒事が多いなどの理由もある。
// 現実的に問題を解決したいならActixのアクターシステムを使うなどの手段もある。
// 1. DBはDB用のActorを立てて、SQLを発行する関数は全てそこで実行してしまう。
// 2. マルチパートやCSVの扱いは一旦ファイルに保存するまでをリクエストハンドラのArbiter内でやり、CSVファイルをデコードしてDBに保存するまでを別Actorにする
//

async fn load_file(conn: &PgConnection, file: impl Read) -> Result<usize> {
use crate::model::NewLog;

let mut ret = 0;

// CSVファイルが渡される`csv::Reader`を用いてapi::Logにデコードしていく
let in_csv = BufReader::new(file);
let in_csv = io::BufReader::new(file);
let in_log = csv::Reader::from_reader(in_csv).into_deserialize::<::api::Log>();
// Itertoolsのchunksを用いて1000件ずつ処理する
// blocking
for logs in &in_log.chunks(1000) {
let logs = logs
// Logとしてパースできた行だけ集める
Expand All @@ -30,45 +57,69 @@ fn load_file(conn: &PgConnection, file: impl Read) -> Result<usize, Error> {
})
.collect_vec();

let inserted = db::insert_logs(conn, &logs)?;
// blocking
let inserted = db::insert_logs(conn, &logs).map_err(ErrorInternalServerError)?;
ret += inserted.len();
}

Ok(ret)
}

/// `POST /csv`のハンドラ
pub fn handle_post_csv(
server: State<Server>,
// actix_web_multipart_fileを使ってマルチパートのリクエストをファイルに保存する
multiparts: Multiparts,
) -> FutureResponse<HttpResponse> {
// multipartsは`Stream`になっているのでそのままメソッドを繋げる
// `Stream` は非同期版イテレータのような存在
let fut = multiparts
.from_err()
pub async fn handle_post_csv(
server: web::Data<Server>,
// actix-multipartを使ってマルチパートのリクエストを受け取る
mut multipart: Multipart,
) -> Result<HttpResponse> {
let conn = server.pool.get().map_err(ErrorInternalServerError)?;
let mut total_size = 0;

// multipartsは `Stream` になっている。
// `Future` は1度待つと1つの値が得られるが、 `Stream` は何度も待って何度も値が得られる。
// 非同期版のイテレータのような存在。
//
// 生の `Stream` をまともに扱う手段はないが、futuresのおかげで `.next()` メソッドが生えてきている。
// これでイテレータのように扱える。
while let Some(field) = multipart.next().await {
// データ(= ここではファイル)を取り出す度に `await` しているので並列性はない。
// ただし他のリクエストハンドラが動けるのでサーバ全体のパフォーマンスは上がる。

let mut field = field.map_err(ErrorBadRequest)?;
// text/csvでなければスキップ
.filter(|field| field.content_type == "text/csv")
if field.content_type().as_ref() != "text/csv" {
continue;
}
// ファイルでなければスキップ
.filter_map(|field| match field.form_data {
FormData::File { file, .. } => Some(file),
FormData::Data { .. } => None,
})
// 1ファイルずつ処理する
.and_then(move |file| load_file(&*server.pool.get()?, file))
// usize の Stream(イテレータのようなもの) -> それらの和
.fold(0, |acc, x| Ok::<_, Error>(acc + x))
.map(|sum| HttpResponse::Ok().json(api::csv::post::Response(sum)))
.from_err();

Box::new(fut)
if !field
.content_disposition()
.map(|c| c.is_attachment())
.unwrap_or(false)
{
continue;
}
// 一旦データをファイルに書き出す
// blocking
let mut file = io::BufWriter::new(tempfile::tempfile().map_err(ErrorInternalServerError)?);
// データが分割されて送られて来るのでチマチマ受け取ってファイルに書く。
while let Some(data) = field.next().await {
let data = data.map_err(ErrorInternalServerError)?;
// blocking
file.write(&data).map_err(ErrorInternalServerError)?;
}
// 書き出したデータをDBにロードする
// blocking
let file = file.into_inner().map_err(ErrorInternalServerError)?;
// blocking
total_size = load_file(&conn, file).await?;
}
Ok(HttpResponse::Ok().json(api::csv::post::Response(total_size)))
}

/// `POST /logs`のハンドラ
pub fn handle_post_logs(
server: State<Server>,
log: Json<api::logs::post::Request>,
) -> Result<HttpResponse, Error> {
pub async fn handle_post_logs(
server: web::Data<Server>,
log: web::Json<api::logs::post::Request>,
) -> Result<HttpResponse> {
use crate::model::NewLog;
use chrono::Utc;

Expand All @@ -77,23 +128,25 @@ pub fn handle_post_logs(
response_time: log.response_time,
timestamp: log.timestamp.unwrap_or_else(|| Utc::now()).naive_utc(),
};
let conn = server.pool.get()?;
db::insert_log(&conn, &log)?;
let conn = server.pool.get().map_err(ErrorInternalServerError)?;
// blocking
db::insert_log(&conn, &log).map_err(ErrorInternalServerError)?;

debug!("received log: {:?}", log);

Ok(HttpResponse::Accepted().finish())
}

/// `GET /logs`のハンドラ
pub fn handle_get_logs(
server: State<Server>,
range: Query<api::logs::get::Query>,
) -> Result<HttpResponse, Error> {
pub async fn handle_get_logs(
server: web::Data<Server>,
range: web::Query<api::logs::get::Query>,
) -> Result<HttpResponse> {
use chrono::{DateTime, Utc};

let conn = server.pool.get()?;
let logs = db::logs(&conn, range.from, range.until)?;
let conn = server.pool.get().map_err(ErrorInternalServerError)?;
// blocking
let logs = db::logs(&conn, range.from, range.until).map_err(ErrorInternalServerError)?;
let logs = logs
.into_iter()
.map(|log| api::Log {
Expand All @@ -107,14 +160,15 @@ pub fn handle_get_logs(
}

/// `GET /csv`のハンドラ
pub fn handle_get_csv(
server: State<Server>,
range: Query<api::csv::get::Query>,
) -> Result<HttpResponse, Error> {
pub async fn handle_get_csv(
server: web::Data<Server>,
range: web::Query<api::csv::get::Query>,
) -> Result<HttpResponse> {
use chrono::{DateTime, Utc};

let conn = server.pool.get()?;
let logs = db::logs(&conn, range.from, range.until)?;
let conn = server.pool.get().map_err(ErrorInternalServerError)?;
// blocking
let logs = db::logs(&conn, range.from, range.until).map_err(ErrorInternalServerError)?;
let v = Vec::new();
let mut w = csv::Writer::from_writer(v);

Expand All @@ -123,10 +177,11 @@ pub fn handle_get_csv(
response_time: log.response_time,
timestamp: DateTime::from_utc(log.timestamp, Utc),
}) {
w.serialize(log)?;
// ブロッキングに見えるが実体は `Vec` なのでブロックしない
w.serialize(log).map_err(ErrorInternalServerError)?;
}

let csv = w.into_inner()?;
let csv = w.into_inner().map_err(ErrorInternalServerError)?;
Ok(HttpResponse::Ok()
.header("Content-Type", "text/csv")
.body(csv))
Expand Down
55 changes: 39 additions & 16 deletions ch11/log-collector/server/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
#[macro_use]
extern crate diesel;
use actix_web::http::Method;
use actix_web::App;
use actix_service::ServiceFactory;
use actix_web::body::Body;
use actix_web::dev::{ServiceRequest, ServiceResponse};
use actix_web::error::Error;
use actix_web::{web, App, HttpServer};
use diesel::pg::PgConnection;
use diesel::r2d2::{ConnectionManager, Pool};
use dotenv::dotenv;
Expand Down Expand Up @@ -30,26 +33,46 @@ impl Server {
}
}

// ルーティングなどを書く
pub fn app(server: Server) -> App<Server> {
// ルーティングなどを書く。
// ちょっと返り値の型がゴツい。
pub fn app(
server: Server,
) -> App<
impl ServiceFactory<
Config = (),
Request = ServiceRequest,
Response = ServiceResponse<Body>,
Error = Error,
InitError = (),
>,
Body,
> {
use crate::handlers::*;

let app: App<Server> = App::with_state(server)
.route("/logs", Method::POST, handle_post_logs)
.route("/csv", Method::POST, handle_post_csv)
.route("/csv", Method::GET, handle_get_csv)
.route("/logs", Method::GET, handle_get_logs);

app
App::new()
.data(server)
.service(
web::resource("/logs")
.route(web::post().to(handle_post_logs))
.route(web::get().to(handle_get_logs)),
)
.service(
web::resource("/csv")
.route(web::post().to(handle_post_csv))
.route(web::get().to(handle_get_csv)),
)
}

fn main() {
#[actix_rt::main]
async fn main() -> std::io::Result<()> {
// 環境変数でログレベルを設定する
env_logger::init();

let server = Server::new();
::actix_web::server::new(move || app(server.clone()))
.bind("localhost:3000")
.expect("Can not bind to port 3000")
.run();
// サーバデータ(= コネクションプール)をcloneしているので
// スレッド間で共通のコネクションプールを使うことになる。
HttpServer::new(move || app(server.clone()))
.bind("localhost:3000")?
.run()
.await
}
Loading