Skip to content

Commit

Permalink
fix: add router when build request context for mysql (apache#809)
Browse files Browse the repository at this point in the history
  • Loading branch information
jiacai2050 authored Apr 6, 2023
1 parent 9574d1a commit cedb0d3
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 3 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ jobs:
working-directory: integration_tests
run: |
make run-rust
- name: Run MySQL client tests
working-directory: integration_tests
run: |
make run-mysql
- name: Upload Logs
if: always()
uses: actions/upload-artifact@v3
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ SHELL = /bin/bash

DIR=$(shell pwd)

.DEFAULT_GOAL := integration-test

init:
echo "init"
echo "Git branch: $GITBRANCH"
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ run-go:

run-rust:
cd sdk/rust && cargo run

run-mysql:
cd mysql && ./basic.sh
7 changes: 7 additions & 0 deletions integration_tests/mysql/basic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

# This only ensure query by mysql protocol is OK,
# Full SQL test in ensured by sqlness tests.
mysql -h 127.0.0.1 -P 3307 -e 'show tables'

mysql -h 127.0.0.1 -P 3307 -e 'select 1, now()'
14 changes: 12 additions & 2 deletions server/src/mysql/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};

use query_engine::executor::Executor as QueryExecutor;
use router::RouterRef;
use snafu::{OptionExt, ResultExt};
use table_engine::engine::EngineRuntimes;

use crate::{
instance::InstanceRef,
mysql::{
error::{MissingInstance, MissingRuntimes, ParseIpAddr, Result},
error::{MissingInstance, MissingRouter, MissingRuntimes, ParseIpAddr, Result},
service::MysqlService,
},
};
Expand All @@ -18,6 +19,7 @@ pub struct Builder<Q> {
config: Config,
runtimes: Option<Arc<EngineRuntimes>>,
instance: Option<InstanceRef<Q>>,
router: Option<RouterRef>,
}

#[derive(Debug)]
Expand All @@ -33,6 +35,7 @@ impl<Q> Builder<Q> {
config,
runtimes: None,
instance: None,
router: None,
}
}

Expand All @@ -45,18 +48,25 @@ impl<Q> Builder<Q> {
self.instance = Some(instance);
self
}

pub fn router(mut self, router: RouterRef) -> Self {
self.router = Some(router);
self
}
}

impl<Q: QueryExecutor + 'static> Builder<Q> {
pub fn build(self) -> Result<MysqlService<Q>> {
let runtimes = self.runtimes.context(MissingRuntimes)?;
let instance = self.instance.context(MissingInstance)?;
let router = self.router.context(MissingRouter)?;

let addr: SocketAddr = format!("{}:{}", self.config.ip, self.config.port)
.parse()
.context(ParseIpAddr { ip: self.config.ip })?;

let mysql_handler = MysqlService::new(instance, runtimes, addr, self.config.timeout);
let mysql_handler =
MysqlService::new(instance, runtimes, router, addr, self.config.timeout);
Ok(mysql_handler)
}
}
3 changes: 3 additions & 0 deletions server/src/mysql/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub enum Error {
#[snafu(display("Missing instance to build service.\nBacktrace:\n{}", backtrace))]
MissingInstance { backtrace: Backtrace },

#[snafu(display("Missing router to build service.\nBacktrace:\n{}", backtrace))]
MissingRouter { backtrace: Backtrace },

#[snafu(display(
"Failed to parse ip addr, ip:{}, err:{}.\nBacktrace:\n{}",
ip,
Expand Down
9 changes: 8 additions & 1 deletion server/src/mysql/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use common_util::runtime::JoinHandle;
use log::{error, info};
use opensrv_mysql::AsyncMysqlIntermediary;
use query_engine::executor::Executor as QueryExecutor;
use router::RouterRef;
use table_engine::engine::EngineRuntimes;
use tokio::sync::oneshot::{self, Receiver, Sender};

Expand All @@ -17,6 +18,7 @@ use crate::{
pub struct MysqlService<Q> {
instance: InstanceRef<Q>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
socket_addr: SocketAddr,
join_handler: Option<JoinHandle<()>>,
tx: Option<Sender<()>>,
Expand All @@ -27,12 +29,14 @@ impl<Q> MysqlService<Q> {
pub fn new(
instance: Arc<Instance<Q>>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
socket_addr: SocketAddr,
timeout: Option<Duration>,
) -> MysqlService<Q> {
Self {
instance,
runtimes,
router,
socket_addr,
join_handler: None,
tx: None,
Expand All @@ -53,6 +57,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
self.join_handler = Some(rt.bg_runtime.spawn(Self::loop_accept(
self.instance.clone(),
self.runtimes.clone(),
self.router.clone(),
self.socket_addr,
self.timeout,
rx,
Expand All @@ -69,6 +74,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
async fn loop_accept(
instance: InstanceRef<Q>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
socket_addr: SocketAddr,
timeout: Option<Duration>,
mut rx: Receiver<()>,
Expand All @@ -90,10 +96,11 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
};
let instance = instance.clone();
let runtimes = runtimes.clone();
let router = router.clone();

let rt = runtimes.read_runtime.clone();
rt.spawn(AsyncMysqlIntermediary::run_on(
MysqlWorker::new(instance, runtimes, timeout),
MysqlWorker::new(instance, runtimes, router, timeout),
stream,
));
},
Expand Down
5 changes: 5 additions & 0 deletions server/src/mysql/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use interpreters::interpreter::Output;
use log::{error, info};
use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter};
use query_engine::executor::Executor as QueryExecutor;
use router::RouterRef;
use snafu::ResultExt;
use table_engine::engine::EngineRuntimes;

Expand All @@ -26,6 +27,7 @@ pub struct MysqlWorker<W: std::io::Write + Send + Sync, Q> {
generic_hold: PhantomData<W>,
instance: Arc<Instance<Q>>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
timeout: Option<Duration>,
}

Expand All @@ -37,12 +39,14 @@ where
pub fn new(
instance: Arc<Instance<Q>>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
timeout: Option<Duration>,
) -> Self {
Self {
generic_hold: PhantomData::default(),
instance,
runtimes,
router,
timeout,
}
}
Expand Down Expand Up @@ -144,6 +148,7 @@ where
.runtime(runtime)
.enable_partition_table_access(false)
.timeout(self.timeout)
.router(self.router.clone())
.build()
.context(CreateContext)
}
Expand Down
1 change: 1 addition & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let mysql_service = mysql::Builder::new(mysql_config)
.runtimes(engine_runtimes.clone())
.instance(instance.clone())
.router(router.clone())
.build()
.context(BuildMysqlService)?;

Expand Down

0 comments on commit cedb0d3

Please sign in to comment.