Skip to content

Commit 09a63d6

Browse files
committed
Move to local runtimes per connection
This avoids a bunch of context switches and cross-thread synchronization, which ends up improving the performance of a simple query by ~20%, from 252us to 216us.
1 parent d6163c0 commit 09a63d6

File tree

9 files changed

+151
-179
lines changed

9 files changed

+151
-179
lines changed

postgres/Cargo.toml

+8-7
Original file line numberDiff line numberDiff line change
@@ -10,16 +10,17 @@ readme = "../README.md"
1010
keywords = ["database", "postgres", "postgresql", "sql"]
1111
categories = ["database"]
1212

13+
[[bench]]
14+
name = "bench"
15+
harness = false
16+
1317
[package.metadata.docs.rs]
1418
all-features = true
1519

1620
[badges]
1721
circle-ci = { repository = "sfackler/rust-postgres" }
1822

1923
[features]
20-
default = ["runtime"]
21-
runtime = ["tokio-postgres/runtime", "tokio", "lazy_static", "log"]
22-
2324
with-bit-vec-0_6 = ["tokio-postgres/with-bit-vec-0_6"]
2425
with-chrono-0_4 = ["tokio-postgres/with-chrono-0_4"]
2526
with-eui48-0_4 = ["tokio-postgres/with-eui48-0_4"]
@@ -32,11 +33,11 @@ with-uuid-0_8 = ["tokio-postgres/with-uuid-0_8"]
3233
bytes = "0.5"
3334
fallible-iterator = "0.2"
3435
futures = "0.3"
35-
tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres", default-features = false }
36+
tokio-postgres = { version = "=0.5.0-alpha.2", path = "../tokio-postgres" }
3637

37-
tokio = { version = "0.2", optional = true, features = ["rt-threaded"] }
38-
lazy_static = { version = "1.0", optional = true }
39-
log = { version = "0.4", optional = true }
38+
tokio = { version = "0.2", features = ["rt-core"] }
39+
log = "0.4"
4040

4141
[dev-dependencies]
42+
criterion = "0.3"
4243
tokio = "0.2"

postgres/benches/bench.rs

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
use criterion::{criterion_group, criterion_main, Criterion};
2+
use postgres::{Client, NoTls};
3+
4+
// spawned: 249us 252us 255us
5+
// local: 214us 216us 219us
6+
fn query_prepared(c: &mut Criterion) {
7+
let mut client = Client::connect("host=localhost port=5433 user=postgres", NoTls).unwrap();
8+
9+
let stmt = client.prepare("SELECT $1::INT8").unwrap();
10+
11+
c.bench_function("query_prepared", move |b| {
12+
b.iter(|| client.query(&stmt, &[&1i64]).unwrap())
13+
});
14+
}
15+
16+
criterion_group!(group, query_prepared);
17+
criterion_main!(group);

postgres/src/client.rs

+31-51
Original file line numberDiff line numberDiff line change
@@ -1,27 +1,25 @@
1-
#[cfg(feature = "runtime")]
2-
use crate::Config;
3-
use crate::{CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction};
4-
use futures::executor;
1+
use crate::{Config, CopyInWriter, CopyOutReader, RowIter, Statement, ToStatement, Transaction};
2+
use tokio::runtime::Runtime;
53
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
64
use tokio_postgres::types::{ToSql, Type};
7-
#[cfg(feature = "runtime")]
8-
use tokio_postgres::Socket;
9-
use tokio_postgres::{Error, Row, SimpleQueryMessage};
5+
use tokio_postgres::{Error, Row, SimpleQueryMessage, Socket};
106

117
/// A synchronous PostgreSQL client.
12-
///
13-
/// This is a lightweight wrapper over the asynchronous tokio_postgres `Client`.
14-
pub struct Client(tokio_postgres::Client);
8+
pub struct Client {
9+
runtime: Runtime,
10+
client: tokio_postgres::Client,
11+
}
1512

1613
impl Client {
14+
pub(crate) fn new(runtime: Runtime, client: tokio_postgres::Client) -> Client {
15+
Client { runtime, client }
16+
}
17+
1718
/// A convenience function which parses a configuration string into a `Config` and then connects to the database.
1819
///
1920
/// See the documentation for [`Config`] for information about the connection syntax.
2021
///
21-
/// Requires the `runtime` Cargo feature (enabled by default).
22-
///
2322
/// [`Config`]: config/struct.Config.html
24-
#[cfg(feature = "runtime")]
2523
pub fn connect<T>(params: &str, tls_mode: T) -> Result<Client, Error>
2624
where
2725
T: MakeTlsConnect<Socket> + 'static + Send,
@@ -78,7 +76,7 @@ impl Client {
7876
where
7977
T: ?Sized + ToStatement,
8078
{
81-
executor::block_on(self.0.execute(query, params))
79+
self.runtime.block_on(self.client.execute(query, params))
8280
}
8381

8482
/// Executes a statement, returning the resulting rows.
@@ -114,7 +112,7 @@ impl Client {
114112
where
115113
T: ?Sized + ToStatement,
116114
{
117-
executor::block_on(self.0.query(query, params))
115+
self.runtime.block_on(self.client.query(query, params))
118116
}
119117

120118
/// Executes a statement which returns a single row, returning it.
@@ -151,7 +149,7 @@ impl Client {
151149
where
152150
T: ?Sized + ToStatement,
153151
{
154-
executor::block_on(self.0.query_one(query, params))
152+
self.runtime.block_on(self.client.query_one(query, params))
155153
}
156154

157155
/// Executes a statement which returns zero or one rows, returning it.
@@ -197,7 +195,7 @@ impl Client {
197195
where
198196
T: ?Sized + ToStatement,
199197
{
200-
executor::block_on(self.0.query_opt(query, params))
198+
self.runtime.block_on(self.client.query_opt(query, params))
201199
}
202200

203201
/// A maximally-flexible version of `query`.
@@ -235,8 +233,10 @@ impl Client {
235233
I: IntoIterator<Item = &'a dyn ToSql>,
236234
I::IntoIter: ExactSizeIterator,
237235
{
238-
let stream = executor::block_on(self.0.query_raw(query, params))?;
239-
Ok(RowIter::new(stream))
236+
let stream = self
237+
.runtime
238+
.block_on(self.client.query_raw(query, params))?;
239+
Ok(RowIter::new(&mut self.runtime, stream))
240240
}
241241

242242
/// Creates a new prepared statement.
@@ -263,7 +263,7 @@ impl Client {
263263
/// # }
264264
/// ```
265265
pub fn prepare(&mut self, query: &str) -> Result<Statement, Error> {
266-
executor::block_on(self.0.prepare(query))
266+
self.runtime.block_on(self.client.prepare(query))
267267
}
268268

269269
/// Like `prepare`, but allows the types of query parameters to be explicitly specified.
@@ -294,7 +294,8 @@ impl Client {
294294
/// # }
295295
/// ```
296296
pub fn prepare_typed(&mut self, query: &str, types: &[Type]) -> Result<Statement, Error> {
297-
executor::block_on(self.0.prepare_typed(query, types))
297+
self.runtime
298+
.block_on(self.client.prepare_typed(query, types))
298299
}
299300

300301
/// Executes a `COPY FROM STDIN` statement, returning the number of rows created.
@@ -327,8 +328,8 @@ impl Client {
327328
where
328329
T: ?Sized + ToStatement,
329330
{
330-
let sink = executor::block_on(self.0.copy_in(query, params))?;
331-
Ok(CopyInWriter::new(sink))
331+
let sink = self.runtime.block_on(self.client.copy_in(query, params))?;
332+
Ok(CopyInWriter::new(&mut self.runtime, sink))
332333
}
333334

334335
/// Executes a `COPY TO STDOUT` statement, returning a reader of the resulting data.
@@ -358,8 +359,8 @@ impl Client {
358359
where
359360
T: ?Sized + ToStatement,
360361
{
361-
let stream = executor::block_on(self.0.copy_out(query, params))?;
362-
CopyOutReader::new(stream)
362+
let stream = self.runtime.block_on(self.client.copy_out(query, params))?;
363+
CopyOutReader::new(&mut self.runtime, stream)
363364
}
364365

365366
/// Executes a sequence of SQL statements using the simple query protocol.
@@ -378,7 +379,7 @@ impl Client {
378379
/// functionality to safely imbed that data in the request. Do not form statements via string concatenation and pass
379380
/// them to this method!
380381
pub fn simple_query(&mut self, query: &str) -> Result<Vec<SimpleQueryMessage>, Error> {
381-
executor::block_on(self.0.simple_query(query))
382+
self.runtime.block_on(self.client.simple_query(query))
382383
}
383384

384385
/// Executes a sequence of SQL statements using the simple query protocol.
@@ -392,7 +393,7 @@ impl Client {
392393
/// functionality to safely embed that data in the request. Do not form statements via string concatenation and pass
393394
/// them to this method!
394395
pub fn batch_execute(&mut self, query: &str) -> Result<(), Error> {
395-
executor::block_on(self.0.batch_execute(query))
396+
self.runtime.block_on(self.client.batch_execute(query))
396397
}
397398

398399
/// Begins a new database transaction.
@@ -416,35 +417,14 @@ impl Client {
416417
/// # }
417418
/// ```
418419
pub fn transaction(&mut self) -> Result<Transaction<'_>, Error> {
419-
let transaction = executor::block_on(self.0.transaction())?;
420-
Ok(Transaction::new(transaction))
420+
let transaction = self.runtime.block_on(self.client.transaction())?;
421+
Ok(Transaction::new(&mut self.runtime, transaction))
421422
}
422423

423424
/// Determines if the client's connection has already closed.
424425
///
425426
/// If this returns `true`, the client is no longer usable.
426427
pub fn is_closed(&self) -> bool {
427-
self.0.is_closed()
428-
}
429-
430-
/// Returns a shared reference to the inner nonblocking client.
431-
pub fn get_ref(&self) -> &tokio_postgres::Client {
432-
&self.0
433-
}
434-
435-
/// Returns a mutable reference to the inner nonblocking client.
436-
pub fn get_mut(&mut self) -> &mut tokio_postgres::Client {
437-
&mut self.0
438-
}
439-
440-
/// Consumes the client, returning the inner nonblocking client.
441-
pub fn into_inner(self) -> tokio_postgres::Client {
442-
self.0
443-
}
444-
}
445-
446-
impl From<tokio_postgres::Client> for Client {
447-
fn from(c: tokio_postgres::Client) -> Client {
448-
Client(c)
428+
self.client.is_closed()
449429
}
450430
}

postgres/src/config.rs

+14-49
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,19 @@
22
//!
33
//! Requires the `runtime` Cargo feature (enabled by default).
44
5-
use crate::{Client, RUNTIME};
6-
use futures::{executor, FutureExt};
5+
use crate::Client;
6+
use futures::FutureExt;
77
use log::error;
88
use std::fmt;
9-
use std::future::Future;
109
use std::path::Path;
11-
use std::pin::Pin;
1210
use std::str::FromStr;
13-
use std::sync::{mpsc, Arc};
1411
use std::time::Duration;
12+
use tokio::runtime;
1513
#[doc(inline)]
1614
pub use tokio_postgres::config::{ChannelBinding, SslMode, TargetSessionAttrs};
1715
use tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
1816
use tokio_postgres::{Error, Socket};
1917

20-
type Spawn = dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Sync + Send;
21-
2218
/// Connection configuration.
2319
///
2420
/// Configuration can be parsed from libpq-style connection strings. These strings come in two formats:
@@ -95,7 +91,6 @@ type Spawn = dyn Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + Sync + Send;
9591
#[derive(Clone)]
9692
pub struct Config {
9793
config: tokio_postgres::Config,
98-
spawner: Option<Arc<Spawn>>,
9994
}
10095

10196
impl fmt::Debug for Config {
@@ -117,7 +112,6 @@ impl Config {
117112
pub fn new() -> Config {
118113
Config {
119114
config: tokio_postgres::Config::new(),
120-
spawner: None,
121115
}
122116
}
123117

@@ -242,17 +236,6 @@ impl Config {
242236
self
243237
}
244238

245-
/// Sets the spawner used to run the connection futures.
246-
///
247-
/// Defaults to a postgres-specific tokio `Runtime`.
248-
pub fn spawner<F>(&mut self, spawn: F) -> &mut Config
249-
where
250-
F: Fn(Pin<Box<dyn Future<Output = ()> + Send>>) + 'static + Sync + Send,
251-
{
252-
self.spawner = Some(Arc::new(spawn));
253-
self
254-
}
255-
256239
/// Opens a connection to a PostgreSQL database.
257240
pub fn connect<T>(&self, tls: T) -> Result<Client, Error>
258241
where
@@ -261,38 +244,23 @@ impl Config {
261244
T::Stream: Send,
262245
<T::TlsConnect as TlsConnect<Socket>>::Future: Send,
263246
{
264-
let (client, connection) = match &self.spawner {
265-
Some(spawn) => {
266-
let (tx, rx) = mpsc::channel();
267-
let config = self.config.clone();
268-
let connect = async move {
269-
let r = config.connect(tls).await;
270-
let _ = tx.send(r);
271-
};
272-
spawn(Box::pin(connect));
273-
rx.recv().unwrap()?
274-
}
275-
None => {
276-
let connect = self.config.connect(tls);
277-
RUNTIME.handle().enter(|| executor::block_on(connect))?
278-
}
279-
};
247+
let mut runtime = runtime::Builder::new()
248+
.enable_all()
249+
.basic_scheduler()
250+
.build()
251+
.unwrap(); // FIXME don't unwrap
252+
253+
let (client, connection) = runtime.block_on(self.config.connect(tls))?;
280254

255+
// FIXME don't spawn this so error reporting is less weird.
281256
let connection = connection.map(|r| {
282257
if let Err(e) = r {
283258
error!("postgres connection error: {}", e)
284259
}
285260
});
286-
match &self.spawner {
287-
Some(spawn) => {
288-
spawn(Box::pin(connection));
289-
}
290-
None => {
291-
RUNTIME.spawn(connection);
292-
}
293-
}
261+
runtime.spawn(connection);
294262

295-
Ok(Client::from(client))
263+
Ok(Client::new(runtime, client))
296264
}
297265
}
298266

@@ -306,9 +274,6 @@ impl FromStr for Config {
306274

307275
impl From<tokio_postgres::Config> for Config {
308276
fn from(config: tokio_postgres::Config) -> Config {
309-
Config {
310-
config,
311-
spawner: None,
312-
}
277+
Config { config }
313278
}
314279
}

0 commit comments

Comments
 (0)