Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion sqlx-core/src/net/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ mod socket;
pub mod tls;

pub use socket::{
connect_tcp, connect_uds, BufferedSocket, Socket, SocketIntoBox, WithSocket, WriteBuffer,
connect_tcp, connect_uds, connect_with, BufferedSocket, Socket, SocketIntoBox, WithSocket,
WriteBuffer,
};
10 changes: 10 additions & 0 deletions sqlx-core/src/net/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,16 @@ async fn connect_tcp_async_io(host: &str, port: u16) -> crate::Result<impl Socke
.into())
}

/// Use a pre-connected socket, passing it to the given [`WithSocket`] handler.
///
/// This is the analog of [`connect_tcp`] and [`connect_uds`] for sockets
/// that have already been established by the caller. This enables custom
/// transports such as in-memory pipes, simulation frameworks (e.g. turmoil),
/// SSH tunnels, or SOCKS proxies.
pub async fn connect_with<S: Socket, Ws: WithSocket>(socket: S, with_socket: Ws) -> Ws::Output {
with_socket.with_socket(socket).await
}

/// Connect a Unix Domain Socket at the given path.
///
/// Returns an error if Unix Domain Sockets are not supported on this platform.
Expand Down
25 changes: 24 additions & 1 deletion sqlx-mysql/src/connection/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,31 @@ impl MySqlConnection {
None => crate::net::connect_tcp(&options.host, options.port, do_handshake).await?,
};

let stream = handshake?;
Self::from_stream(options, handshake?)
}

/// Establish a connection over a pre-connected socket.
///
/// The provided socket must already be connected to a MySQL-compatible
/// server. The MySQL handshake and authentication will be performed
/// using the credentials from `options`.
///
/// This enables custom transports such as in-memory pipes, simulation
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
///
/// Note: this only performs the low-level handshake and authentication.
/// Use [`MySqlConnectOptions::connect_with_socket()`] for a fully
/// initialized connection (with `SET NAMES`, `sql_mode`, etc.).
pub async fn connect_with_socket<S: Socket>(
options: &MySqlConnectOptions,
socket: S,
) -> Result<Self, Error> {
let do_handshake = DoHandshake::new(options)?;
let stream = crate::net::connect_with(socket, do_handshake).await?;
Self::from_stream(options, stream)
}

fn from_stream(options: &MySqlConnectOptions, stream: MySqlStream) -> Result<Self, Error> {
Ok(Self {
inner: Box::new(MySqlConnectionInner {
stream,
Expand Down
117 changes: 75 additions & 42 deletions sqlx-mysql/src/options/connect.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,68 @@
use crate::connection::ConnectOptions;
use crate::error::Error;
use crate::executor::Executor;
use crate::net::Socket;
use crate::{MySqlConnectOptions, MySqlConnection};
use log::LevelFilter;
use sqlx_core::sql_str::AssertSqlSafe;
use sqlx_core::Url;
use std::time::Duration;

impl ConnectOptions for MySqlConnectOptions {
type Connection = MySqlConnection;

fn from_url(url: &Url) -> Result<Self, Error> {
Self::parse_from_url(url)
}

fn to_url_lossy(&self) -> Url {
self.build_url()
impl MySqlConnectOptions {
/// Establish a fully initialized connection over a pre-connected socket.
///
/// This performs the MySQL handshake, authentication, and post-connect
/// initialization (`SET NAMES`, `sql_mode`, `time_zone`) over the
/// provided socket.
///
/// The socket must already be connected to a MySQL-compatible server.
/// This enables custom transports such as in-memory pipes, simulation
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
///
/// # Example
///
/// ```rust,ignore
/// use sqlx::mysql::MySqlConnectOptions;
///
/// let options = MySqlConnectOptions::new()
/// .username("root")
/// .database("mydb");
///
/// let stream = tokio::net::TcpStream::connect("127.0.0.1:3306").await?;
/// let conn = options.connect_with_socket(stream).await?;
/// ```
pub async fn connect_with_socket<S: Socket>(
&self,
socket: S,
) -> Result<MySqlConnection, Error> {
let mut conn = MySqlConnection::connect_with_socket(self, socket).await?;
self.after_connect(&mut conn).await?;
Ok(conn)
}

async fn connect(&self) -> Result<Self::Connection, Error>
where
Self::Connection: Sized,
{
let mut conn = MySqlConnection::establish(self).await?;

// After the connection is established, we initialize by configuring a few
// connection parameters

// https://mariadb.com/kb/en/sql-mode/

// PIPES_AS_CONCAT - Allows using the pipe character (ASCII 124) as string concatenation operator.
// This means that "A" || "B" can be used in place of CONCAT("A", "B").

// NO_ENGINE_SUBSTITUTION - If not set, if the available storage engine specified by a CREATE TABLE is
// not available, a warning is given and the default storage
// engine is used instead.

// NO_ZERO_DATE - Don't allow '0000-00-00'. This is invalid in Rust.

// NO_ZERO_IN_DATE - Don't allow 'YYYY-00-00'. This is invalid in Rust.

// --

// Setting the time zone allows us to assume that the output
// from a TIMESTAMP field is UTC

// --

// https://mathiasbynens.be/notes/mysql-utf8mb4

/// Post-connection initialization shared between `connect()` and
/// `connect_with_socket()`.
///
/// After the connection is established, we initialize by configuring a few
/// connection parameters:
///
/// - <https://mariadb.com/kb/en/sql-mode/>
///
/// - `PIPES_AS_CONCAT` - Allows using the pipe character (ASCII 124) as string concatenation
/// operator. This means that "A" || "B" can be used in place of CONCAT("A", "B").
///
/// - `NO_ENGINE_SUBSTITUTION` - If not set, if the available storage engine specified by a
/// CREATE TABLE is not available, a warning is given and the default storage engine is used
/// instead.
///
/// - `NO_ZERO_DATE` - Don't allow '0000-00-00'. This is invalid in Rust.
///
/// - `NO_ZERO_IN_DATE` - Don't allow 'YYYY-00-00'. This is invalid in Rust.
///
/// Setting the time zone allows us to assume that the output from a TIMESTAMP field is UTC.
///
/// - <https://mathiasbynens.be/notes/mysql-utf8mb4>
async fn after_connect(&self, conn: &mut MySqlConnection) -> Result<(), Error> {
let mut sql_mode = Vec::new();
if self.pipes_as_concat {
sql_mode.push(r#"PIPES_AS_CONCAT"#);
Expand All @@ -64,11 +78,9 @@ impl ConnectOptions for MySqlConnectOptions {
sql_mode.join(",")
));
}

if let Some(timezone) = &self.timezone {
options.push(format!(r#"time_zone='{}'"#, timezone));
}

if self.set_names {
// As it turns out, we don't _have_ to set a collation if we don't want to.
// We can let the server choose the default collation for the charset.
Expand All @@ -88,6 +100,27 @@ impl ConnectOptions for MySqlConnectOptions {
.await?;
}

Ok(())
}
}

impl ConnectOptions for MySqlConnectOptions {
type Connection = MySqlConnection;

fn from_url(url: &Url) -> Result<Self, Error> {
Self::parse_from_url(url)
}

fn to_url_lossy(&self) -> Url {
self.build_url()
}

async fn connect(&self) -> Result<Self::Connection, Error>
where
Self::Connection: Sized,
{
let mut conn = MySqlConnection::establish(self).await?;
self.after_connect(&mut conn).await?;
Ok(conn)
}

Expand Down
31 changes: 29 additions & 2 deletions sqlx-postgres/src/connection/establish.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use crate::io::StatementId;
use crate::message::{
Authentication, BackendKeyData, BackendMessageFormat, Password, ReadyForQuery, Startup,
};
use crate::net::Socket;
use crate::{PgConnectOptions, PgConnection};

use super::PgConnectionInner;
Expand All @@ -16,9 +17,35 @@ use super::PgConnectionInner;

impl PgConnection {
pub(crate) async fn establish(options: &PgConnectOptions) -> Result<Self, Error> {
// Upgrade to TLS if we were asked to and the server supports it
let mut stream = PgStream::connect(options).await?;
let stream = PgStream::connect(options).await?;
Self::establish_with_stream(options, stream).await
}

/// Establish a connection over a pre-connected socket.
///
/// The provided socket must already be connected to a
/// PostgreSQL-compatible server. The startup handshake, TLS upgrade
/// (if configured), and authentication will be performed over this
/// socket.
///
/// This enables custom transports such as in-memory pipes, simulation
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
///
/// Note: this only performs the low-level handshake and authentication.
/// Use [`PgConnectOptions::connect_with_socket()`] for a fully
/// initialized connection.
pub async fn connect_with_socket<S: Socket>(
options: &PgConnectOptions,
socket: S,
) -> Result<Self, Error> {
let stream = PgStream::connect_with_socket(options, socket).await?;
Self::establish_with_stream(options, stream).await
}

async fn establish_with_stream(
options: &PgConnectOptions,
mut stream: PgStream,
) -> Result<Self, Error> {
// To begin a session, a frontend opens a connection to the server
// and sends a startup message.

Expand Down
18 changes: 18 additions & 0 deletions sqlx-postgres/src/connection/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,24 @@ impl PgStream {
})
}

/// Create a stream from a pre-connected socket.
///
/// The socket must already be connected to a PostgreSQL server.
/// TLS upgrade will be attempted if configured in `options`.
pub(super) async fn connect_with_socket<S: Socket>(
options: &PgConnectOptions,
socket: S,
) -> Result<Self, Error> {
let socket = net::connect_with(socket, MaybeUpgradeTls(options)).await?;

Ok(Self {
inner: BufferedSocket::new(socket),
notifications: None,
parameter_statuses: BTreeMap::default(),
server_version_num: None,
})
}

#[inline(always)]
pub(crate) fn write_msg(&mut self, message: impl FrontendMessage) -> Result<(), Error> {
self.write(EncodeMessage(message))
Expand Down
28 changes: 28 additions & 0 deletions sqlx-postgres/src/options/connect.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,39 @@
use crate::connection::ConnectOptions;
use crate::error::Error;
use crate::net::Socket;
use crate::{PgConnectOptions, PgConnection};
use log::LevelFilter;
use sqlx_core::Url;
use std::future::Future;
use std::time::Duration;

impl PgConnectOptions {
/// Establish a connection over a pre-connected socket.
///
/// This performs the PostgreSQL startup handshake, TLS upgrade
/// (if configured), and authentication over the provided socket.
///
/// The socket must already be connected to a PostgreSQL-compatible server.
/// This enables custom transports such as in-memory pipes, simulation
/// frameworks (e.g. turmoil), SSH tunnels, or SOCKS proxies.
///
/// # Example
///
/// ```rust,ignore
/// use sqlx::postgres::PgConnectOptions;
///
/// let options = PgConnectOptions::new()
/// .username("postgres")
/// .database("mydb");
///
/// let stream = tokio::net::TcpStream::connect("127.0.0.1:5432").await?;
/// let conn = options.connect_with_socket(stream).await?;
/// ```
pub async fn connect_with_socket<S: Socket>(&self, socket: S) -> Result<PgConnection, Error> {
PgConnection::connect_with_socket(self, socket).await
}
}

impl ConnectOptions for PgConnectOptions {
type Connection = PgConnection;

Expand Down
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ pub use sqlx_core::types::Type;
pub use sqlx_core::value::{Value, ValueRef};
pub use sqlx_core::Either;

/// Networking primitives used by SQLx database drivers.
///
/// The [`Socket`][net::Socket] trait allows implementing custom transports
/// for database connections (e.g. in-memory pipes, simulation frameworks,
/// SSH tunnels, SOCKS proxies). Use with
/// [`MySqlConnectOptions::connect_with_socket()`][crate::mysql::MySqlConnectOptions::connect_with_socket]
/// or [`PgConnectOptions::connect_with_socket()`][crate::postgres::PgConnectOptions::connect_with_socket].
pub mod net {
pub use sqlx_core::io::ReadBuf;
pub use sqlx_core::net::{connect_with, Socket};
}

#[doc(inline)]
pub use sqlx_core::error::{self, Error, Result};

Expand Down
Loading