Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add router when build request context for mysql #809

Merged
merged 1 commit into from
Apr 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ jobs:
working-directory: integration_tests
run: |
make run-rust
- name: Run MySQL client tests
working-directory: integration_tests
run: |
make run-mysql
- name: Upload Logs
if: always()
uses: actions/upload-artifact@v3
Expand Down
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ SHELL = /bin/bash

DIR=$(shell pwd)

.DEFAULT_GOAL := integration-test

init:
echo "init"
echo "Git branch: $GITBRANCH"
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,3 +55,6 @@ run-go:

run-rust:
cd sdk/rust && cargo run

run-mysql:
cd mysql && ./basic.sh
7 changes: 7 additions & 0 deletions integration_tests/mysql/basic.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
#!/usr/bin/env bash

# This only ensure query by mysql protocol is OK,
# Full SQL test in ensured by sqlness tests.
mysql -h 127.0.0.1 -P 3307 -e 'show tables'

mysql -h 127.0.0.1 -P 3307 -e 'select 1, now()'
14 changes: 12 additions & 2 deletions server/src/mysql/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,14 @@
use std::{net::SocketAddr, sync::Arc, time::Duration};

use query_engine::executor::Executor as QueryExecutor;
use router::RouterRef;
use snafu::{OptionExt, ResultExt};
use table_engine::engine::EngineRuntimes;

use crate::{
instance::InstanceRef,
mysql::{
error::{MissingInstance, MissingRuntimes, ParseIpAddr, Result},
error::{MissingInstance, MissingRouter, MissingRuntimes, ParseIpAddr, Result},
service::MysqlService,
},
};
Expand All @@ -18,6 +19,7 @@ pub struct Builder<Q> {
config: Config,
runtimes: Option<Arc<EngineRuntimes>>,
instance: Option<InstanceRef<Q>>,
router: Option<RouterRef>,
}

#[derive(Debug)]
Expand All @@ -33,6 +35,7 @@ impl<Q> Builder<Q> {
config,
runtimes: None,
instance: None,
router: None,
}
}

Expand All @@ -45,18 +48,25 @@ impl<Q> Builder<Q> {
self.instance = Some(instance);
self
}

pub fn router(mut self, router: RouterRef) -> Self {
self.router = Some(router);
self
}
}

impl<Q: QueryExecutor + 'static> Builder<Q> {
pub fn build(self) -> Result<MysqlService<Q>> {
let runtimes = self.runtimes.context(MissingRuntimes)?;
let instance = self.instance.context(MissingInstance)?;
let router = self.router.context(MissingRouter)?;

let addr: SocketAddr = format!("{}:{}", self.config.ip, self.config.port)
.parse()
.context(ParseIpAddr { ip: self.config.ip })?;

let mysql_handler = MysqlService::new(instance, runtimes, addr, self.config.timeout);
let mysql_handler =
MysqlService::new(instance, runtimes, router, addr, self.config.timeout);
Ok(mysql_handler)
}
}
3 changes: 3 additions & 0 deletions server/src/mysql/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ pub enum Error {
#[snafu(display("Missing instance to build service.\nBacktrace:\n{}", backtrace))]
MissingInstance { backtrace: Backtrace },

#[snafu(display("Missing router to build service.\nBacktrace:\n{}", backtrace))]
MissingRouter { backtrace: Backtrace },

#[snafu(display(
"Failed to parse ip addr, ip:{}, err:{}.\nBacktrace:\n{}",
ip,
Expand Down
9 changes: 8 additions & 1 deletion server/src/mysql/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use common_util::runtime::JoinHandle;
use log::{error, info};
use opensrv_mysql::AsyncMysqlIntermediary;
use query_engine::executor::Executor as QueryExecutor;
use router::RouterRef;
use table_engine::engine::EngineRuntimes;
use tokio::sync::oneshot::{self, Receiver, Sender};

Expand All @@ -17,6 +18,7 @@ use crate::{
pub struct MysqlService<Q> {
instance: InstanceRef<Q>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
socket_addr: SocketAddr,
join_handler: Option<JoinHandle<()>>,
tx: Option<Sender<()>>,
Expand All @@ -27,12 +29,14 @@ impl<Q> MysqlService<Q> {
pub fn new(
instance: Arc<Instance<Q>>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
socket_addr: SocketAddr,
timeout: Option<Duration>,
) -> MysqlService<Q> {
Self {
instance,
runtimes,
router,
socket_addr,
join_handler: None,
tx: None,
Expand All @@ -53,6 +57,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
self.join_handler = Some(rt.bg_runtime.spawn(Self::loop_accept(
self.instance.clone(),
self.runtimes.clone(),
self.router.clone(),
self.socket_addr,
self.timeout,
rx,
Expand All @@ -69,6 +74,7 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
async fn loop_accept(
instance: InstanceRef<Q>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
socket_addr: SocketAddr,
timeout: Option<Duration>,
mut rx: Receiver<()>,
Expand All @@ -90,10 +96,11 @@ impl<Q: QueryExecutor + 'static> MysqlService<Q> {
};
let instance = instance.clone();
let runtimes = runtimes.clone();
let router = router.clone();

let rt = runtimes.read_runtime.clone();
rt.spawn(AsyncMysqlIntermediary::run_on(
MysqlWorker::new(instance, runtimes, timeout),
MysqlWorker::new(instance, runtimes, router, timeout),
stream,
));
},
Expand Down
5 changes: 5 additions & 0 deletions server/src/mysql/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use interpreters::interpreter::Output;
use log::{error, info};
use opensrv_mysql::{AsyncMysqlShim, ErrorKind, QueryResultWriter, StatementMetaWriter};
use query_engine::executor::Executor as QueryExecutor;
use router::RouterRef;
use snafu::ResultExt;
use table_engine::engine::EngineRuntimes;

Expand All @@ -26,6 +27,7 @@ pub struct MysqlWorker<W: std::io::Write + Send + Sync, Q> {
generic_hold: PhantomData<W>,
instance: Arc<Instance<Q>>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
timeout: Option<Duration>,
}

Expand All @@ -37,12 +39,14 @@ where
pub fn new(
instance: Arc<Instance<Q>>,
runtimes: Arc<EngineRuntimes>,
router: RouterRef,
timeout: Option<Duration>,
) -> Self {
Self {
generic_hold: PhantomData::default(),
instance,
runtimes,
router,
timeout,
}
}
Expand Down Expand Up @@ -144,6 +148,7 @@ where
.runtime(runtime)
.enable_partition_table_access(false)
.timeout(self.timeout)
.router(self.router.clone())
.build()
.context(CreateContext)
}
Expand Down
1 change: 1 addition & 0 deletions server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ impl<Q: QueryExecutor + 'static> Builder<Q> {
let mysql_service = mysql::Builder::new(mysql_config)
.runtimes(engine_runtimes.clone())
.instance(instance.clone())
.router(router.clone())
.build()
.context(BuildMysqlService)?;

Expand Down