Skip to content

Commit

Permalink
Bump mysql_common version
Browse files Browse the repository at this point in the history
  • Loading branch information
blackbeam committed Mar 18, 2024
1 parent 97b3e3b commit c40205f
Show file tree
Hide file tree
Showing 7 changed files with 94 additions and 67 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ crossbeam = "0.8.1"
io-enum = "1.0.0"
flate2 = { version = "1.0", default-features = false }
lru = "0.10"
mysql_common = { version = "0.31", default-features = false }
mysql_common = { version = "0.32", default-features = false }
socket2 = "0.5.2"
once_cell = "1.7.2"
pem = "2.0.1"
Expand Down
61 changes: 25 additions & 36 deletions src/conn/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,10 @@ impl Conn {
Some(self.0.auth_plugin.clone()),
self.0.capability_flags,
self.connect_attrs(),
self.0
.opts
.get_max_allowed_packet()
.unwrap_or(DEFAULT_MAX_ALLOWED_PACKET) as u32,
);

let mut buf = get_buffer();
Expand Down Expand Up @@ -864,24 +868,14 @@ impl Conn {
}
0x04 => {
if !self.is_insecure() || self.is_socket() {
let mut pass = self
.0
.opts
.get_pass()
.map(Vec::from)
.unwrap_or_else(Vec::new);
let mut pass = self.0.opts.get_pass().map(Vec::from).unwrap_or_default();
pass.push(0);
self.write_packet(&mut pass.as_slice())?;
} else {
self.write_packet(&mut &[0x02][..])?;
let payload = self.read_packet()?;
let key = &payload[1..];
let mut pass = self
.0
.opts
.get_pass()
.map(Vec::from)
.unwrap_or_else(Vec::new);
let mut pass = self.0.opts.get_pass().map(Vec::from).unwrap_or_default();
pass.push(0);
for (i, c) in pass.iter_mut().enumerate() {
*(c) ^= self.0.nonce[i % self.0.nonce.len()];
Expand Down Expand Up @@ -1157,11 +1151,12 @@ impl Conn {
return Ok(());
}
self.do_handshake()
.and_then(|_| {
Ok(from_value_opt::<usize>(
.and_then(|_| match self.0.opts.get_max_allowed_packet() {
Some(x) => Ok(x),
None => Ok(from_value_opt::<usize>(
self.get_system_var("max_allowed_packet")?.unwrap_or(NULL),
)
.unwrap_or(0))
.unwrap_or(0)),
})
.and_then(|max_allowed_packet| {
if max_allowed_packet == 0 {
Expand Down Expand Up @@ -1240,7 +1235,7 @@ impl Conn {

/// Turns this connection into a binlog stream.
///
/// You can use `SHOW BINARY LOGS` to get the current logfile and position from the master.
/// You can use `SHOW BINARY LOGS` to get the current log file and position from the master.
/// If the request's `filename` is empty, the server will send the binlog-stream
/// of the first known binlog.
pub fn get_binlog_stream(mut self, request: BinlogRequest<'_>) -> Result<BinlogStream> {
Expand Down Expand Up @@ -1278,7 +1273,7 @@ impl Queryable for Conn {
let parsed = ParsedNamedParams::parse(query.as_bytes())?;
let named_params: Vec<Vec<u8>> =
parsed.params().iter().map(|param| param.to_vec()).collect();
let named_params = if named_params.len() == 0 {
let named_params = if named_params.is_empty() {
None
} else {
Some(named_params)
Expand Down Expand Up @@ -1684,7 +1679,7 @@ mod test {

#[test]
fn manually_closed_stmt() {
let opts = OptsBuilder::from(get_opts()).stmt_cache_size(1);
let opts = get_opts().stmt_cache_size(1);
let mut conn = Conn::new(opts).unwrap();
let stmt = conn.prep("SELECT 1").unwrap();
conn.exec_drop(&stmt, ()).unwrap();
Expand All @@ -1703,15 +1698,13 @@ mod test {
"CREATE TEMPORARY TABLE mysql.tbl(id INT NOT NULL PRIMARY KEY AUTO_INCREMENT, a INT)",
)
.unwrap();
let _ = conn
.start_transaction(TxOpts::default())
.and_then(|mut t| {
conn.start_transaction(TxOpts::default())
.map(|mut t| {
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
assert_eq!(t.last_insert_id(), Some(1));
assert_eq!(t.affected_rows(), 1);
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
t.commit().unwrap();
Ok(())
})
.unwrap();
assert_eq!(
Expand All @@ -1723,11 +1716,9 @@ mod test {
.unwrap(),
vec![Bytes(b"2".to_vec())]
);
let _ = conn
.start_transaction(TxOpts::default())
.and_then(|mut t| {
conn.start_transaction(TxOpts::default())
.map(|mut t| {
t.query_drop("INSERT INTO tbl2(a) VALUES(1)").unwrap_err();
Ok(())
// implicit rollback
})
.unwrap();
Expand All @@ -1740,13 +1731,11 @@ mod test {
.unwrap(),
vec![Bytes(b"2".to_vec())]
);
let _ = conn
.start_transaction(TxOpts::default())
.and_then(|mut t| {
conn.start_transaction(TxOpts::default())
.map(|mut t| {
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
t.rollback().unwrap();
Ok(())
})
.unwrap();
assert_eq!(
Expand Down Expand Up @@ -1793,7 +1782,7 @@ mod test {
let mut cell_data = vec![b'Z'; 65535];
cell_data.push(b'\n');
for _ in 0..1536 {
stream.write_all(&*cell_data)?;
stream.write_all(&cell_data)?;
}
Ok(())
})));
Expand Down Expand Up @@ -2166,7 +2155,7 @@ mod test {
conn.exec_first::<crate::Row, _, _>(&stmt, params! {"a" => 1, "b" => 2, "c" => 3,});
match result {
Err(DriverError(MissingNamedParameter(ref x))) if x == "d" => (),
_ => assert!(false),
_ => panic!("MissingNamedParameter error expected"),
}
}

Expand All @@ -2177,7 +2166,7 @@ mod test {
let result = conn.exec_drop(&stmt, params! {"a" => 1, "b" => 2, "c" => 3,});
match result {
Err(DriverError(NamedParamsForPositionalQuery)) => (),
_ => assert!(false),
_ => panic!("NamedParamsForPositionalQuery error expected"),
}
}

Expand Down Expand Up @@ -2380,7 +2369,7 @@ mod test {
}
let attrs_size: i32 =
get_system_variable(&mut conn, "performance_schema_session_connect_attrs_size");
if attrs_size >= 0 && attrs_size <= 128 {
if (0..=128).contains(&attrs_size) {
panic!("The system variable `performance_schema_session_connect_attrs_size` is {}. Restart the MySQL server with `--performance_schema_session_connect_attrs_size=-1` to pass the test.", attrs_size);
}

Expand Down Expand Up @@ -2561,7 +2550,7 @@ mod test {
// iterate using COM_BINLOG_DUMP with BINLOG_DUMP_NON_BLOCK flag
let (conn, filename, pos) = get_conn().unwrap();

let mut binlog_stream = conn
let binlog_stream = conn
.get_binlog_stream(
BinlogRequest::new(14)
.with_filename(filename)
Expand All @@ -2571,7 +2560,7 @@ mod test {
.unwrap();

events_num = 0;
while let Some(event) = binlog_stream.next() {
for event in binlog_stream {
let event = event.unwrap();
events_num += 1;
event.header().event_type().unwrap();
Expand Down
44 changes: 39 additions & 5 deletions src/conn/opts/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl SslOpts {
/// Options structure is quite large so we'll store it separately.
#[derive(Debug, Clone, Eq, PartialEq)]
pub(crate) struct InnerOpts {
/// Address of mysql server (defaults to `127.0.0.1`). Hostnames should also work.
/// Address of mysql server (defaults to `127.0.0.1`). Host names should also work.
ip_or_hostname: url::Host,
/// TCP port of mysql server (defaults to `3306`).
tcp_port: u16,
Expand Down Expand Up @@ -232,6 +232,12 @@ pub(crate) struct InnerOpts {
/// consider using TLS or encrypted tunnels for server connection.
enable_cleartext_plugin: bool,

/// Client side `max_allowed_packet` value (defaults to `None`).
///
/// By default `Conn` will query this value from the server. One can avoid this step
/// by explicitly specifying it.
max_allowed_packet: Option<usize>,

/// For tests only
#[cfg(test)]
pub injected_socket: Option<String>,
Expand All @@ -243,6 +249,7 @@ impl Default for InnerOpts {
ip_or_hostname: url::Host::Domain(String::from("localhost")),
tcp_port: 3306,
socket: None,
max_allowed_packet: None,
user: None,
pass: None,
db_name: None,
Expand Down Expand Up @@ -307,7 +314,7 @@ impl Opts {
self.0.ip_or_hostname.clone()
}

/// Address of mysql server (defaults to `127.0.0.1`). Hostnames should also work.
/// Address of mysql server (defaults to `127.0.0.1`). Host names should also work.
pub fn get_ip_or_hostname(&self) -> Cow<str> {
self.0.ip_or_hostname.to_string().into()
}
Expand All @@ -319,6 +326,15 @@ impl Opts {
pub fn get_socket(&self) -> Option<&str> {
self.0.socket.as_deref()
}
/// Client side `max_allowed_packet` value (defaults to `None`).
///
/// By default `Conn` will query this value from the server. One can avoid this step
/// by explicitly specifying it. Server side default is 4MB.
///
/// Available in connection URL via `max_allowed_packet` parameter.
pub fn get_max_allowed_packet(&self) -> Option<usize> {
self.0.max_allowed_packet
}
/// User (defaults to `None`).
pub fn get_user(&self) -> Option<&str> {
self.0.user.as_deref()
Expand Down Expand Up @@ -736,6 +752,12 @@ impl OptsBuilder {
return Err(UrlError::InvalidValue(key.to_string(), value.to_string()))
}
},
"max_allowed_packet" => match value.parse::<usize>() {
Ok(parsed) => self.opts.0.max_allowed_packet = Some(parsed),
Err(_) => {
return Err(UrlError::InvalidValue(key.to_string(), value.to_string()))
}
},
_ => {
//throw an error if there is an unrecognized param
return Err(UrlError::UnknownParameter(key.to_string()));
Expand All @@ -755,7 +777,7 @@ impl OptsBuilder {
Ok(self)
}

/// Address of mysql server (defaults to `127.0.0.1`). Hostnames should also work.
/// Address of mysql server (defaults to `127.0.0.1`). Host names should also work.
///
/// **Note:** IPv6 addresses must be given in square brackets, e.g. `[::1]`.
pub fn ip_or_hostname<T: Into<String>>(mut self, ip_or_hostname: Option<T>) -> Self {
Expand All @@ -781,6 +803,16 @@ impl OptsBuilder {
self
}

/// Defines `max_allowed_packet` option. See [`Opts::max_allowed_packet`].
///
/// Note that it'll saturate to proper minimum and maximum values
/// for this parameter (see MySql documentation).
pub fn max_allowed_packet(mut self, max_allowed_packet: Option<usize>) -> Self {
self.opts.0.max_allowed_packet =
max_allowed_packet.map(|x| std::cmp::max(1024, std::cmp::min(1073741824, x)));
self
}

/// User (defaults to `None`).
pub fn user<T: Into<String>>(mut self, user: Option<T>) -> Self {
self.opts.0.user = user.map(Into::into);
Expand Down Expand Up @@ -1397,7 +1429,8 @@ mod test {
"tcp_keepalive_time_ms".to_string() => "5000".to_string(),
"compress".to_string() => "best".to_string(),
"tcp_connect_timeout_ms".to_string() => "1000".to_string(),
"stmt_cache_size".to_string() => "33".to_string()
"stmt_cache_size".to_string() => "33".to_string(),
"max_allowed_packet".to_string() => "65536".to_string()
};
#[cfg(any(target_os = "linux", target_os = "macos",))]
cnf_map.insert(
Expand All @@ -1414,7 +1447,8 @@ mod test {
assert_eq!(parsed_opts.opts.get_ip_or_hostname(), "127.0.0.1");
assert_eq!(parsed_opts.opts.get_tcp_port(), 8080);
assert_eq!(parsed_opts.opts.get_db_name(), Some("test_db"));
assert_eq!(parsed_opts.opts.get_prefer_socket(), false);
assert_eq!(parsed_opts.opts.get_max_allowed_packet(), Some(65536));
assert!(!parsed_opts.opts.get_prefer_socket());
assert_eq!(parsed_opts.opts.get_tcp_keepalive_time_ms(), Some(5000));
#[cfg(any(target_os = "linux", target_os = "macos",))]
assert_eq!(
Expand Down
34 changes: 16 additions & 18 deletions src/conn/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -266,18 +266,6 @@ impl PooledConn {
self.conn.take().unwrap().get_binlog_stream(request)
}

/// Gives mutable reference to the wrapped
/// [`Conn`](struct.Conn.html).
pub fn as_mut(&mut self) -> &mut Conn {
self.conn.as_mut().unwrap()
}

/// Gives reference to the wrapped
/// [`Conn`](struct.Conn.html).
pub fn as_ref(&self) -> &Conn {
self.conn.as_ref().unwrap()
}

/// Unwraps wrapped [`Conn`](struct.Conn.html).
pub fn unwrap(mut self) -> Conn {
self.conn.take().unwrap()
Expand Down Expand Up @@ -316,6 +304,18 @@ impl PooledConn {
}
}

impl AsRef<Conn> for PooledConn {
fn as_ref(&self) -> &Conn {
self.conn.as_ref().unwrap()
}
}

impl AsMut<Conn> for PooledConn {
fn as_mut(&mut self) -> &mut Conn {
self.conn.as_mut().unwrap()
}
}

impl Queryable for PooledConn {
fn query_iter<T: AsRef<str>>(&mut self, query: T) -> Result<QueryResult<'_, '_, '_, Text>> {
self.conn.as_mut().unwrap().query_iter(query)
Expand Down Expand Up @@ -489,8 +489,8 @@ mod test {
let conn2 = pool.try_get_conn(Duration::from_millis(357));
assert!(conn2.is_err());
match conn2 {
Err(Error::DriverError(DriverError::Timeout)) => assert!(true),
_ => assert!(false),
Err(Error::DriverError(DriverError::Timeout)) => (),
_ => panic!("Timeout error expected"),
}
drop(conn1);
assert!(pool.try_get_conn(Duration::from_millis(357)).is_ok());
Expand Down Expand Up @@ -572,10 +572,9 @@ mod test {
2_u8
);
pool.start_transaction(TxOpts::default())
.and_then(|mut t| {
.map(|mut t| {
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
Ok(())
})
.unwrap();
assert_eq!(
Expand Down Expand Up @@ -640,10 +639,9 @@ mod test {
assert_eq!(from_value::<u8>(x.take(0).unwrap()), 2u8);
}
conn.start_transaction(TxOpts::default())
.and_then(|mut t| {
.map(|mut t| {
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(1)").unwrap();
t.query_drop("INSERT INTO mysql.tbl(a) VALUES(2)").unwrap();
Ok(())
})
.unwrap();
for x in conn.query_iter("SELECT COUNT(a) FROM mysql.tbl").unwrap() {
Expand Down
4 changes: 3 additions & 1 deletion src/conn/query_result.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,8 +391,10 @@ impl<'a> SetColumns<'a> {
.as_ref()
.and_then(|cols| cols.iter().position(|col| col.name_ref() == name))
}
}

pub fn as_ref(&self) -> &[Column] {
impl AsRef<[Column]> for SetColumns<'_> {
fn as_ref(&self) -> &[Column] {
self.inner
.as_ref()
.map(|cols| &(*cols)[..])
Expand Down
Loading

0 comments on commit c40205f

Please sign in to comment.