tokio_postgres/
transaction_builder.rs1use postgres_protocol::message::frontend;
2
3use crate::{codec::FrontendMessage, connection::RequestMessages, Client, Error, Transaction};
4
5#[derive(Debug, Copy, Clone)]
7#[non_exhaustive]
8pub enum IsolationLevel {
9 ReadUncommitted,
11
12 ReadCommitted,
14
15 RepeatableRead,
18
19 Serializable,
22}
23
24pub struct TransactionBuilder<'a> {
26 client: &'a mut Client,
27 isolation_level: Option<IsolationLevel>,
28 read_only: Option<bool>,
29 deferrable: Option<bool>,
30}
31
32impl<'a> TransactionBuilder<'a> {
33 pub(crate) fn new(client: &'a mut Client) -> TransactionBuilder<'a> {
34 TransactionBuilder {
35 client,
36 isolation_level: None,
37 read_only: None,
38 deferrable: None,
39 }
40 }
41
42 pub fn isolation_level(mut self, isolation_level: IsolationLevel) -> Self {
44 self.isolation_level = Some(isolation_level);
45 self
46 }
47
48 pub fn read_only(mut self, read_only: bool) -> Self {
50 self.read_only = Some(read_only);
51 self
52 }
53
54 pub fn deferrable(mut self, deferrable: bool) -> Self {
60 self.deferrable = Some(deferrable);
61 self
62 }
63
64 pub async fn start(self) -> Result<Transaction<'a>, Error> {
68 let mut query = "START TRANSACTION".to_string();
69 let mut first = true;
70
71 if let Some(level) = self.isolation_level {
72 first = false;
73
74 query.push_str(" ISOLATION LEVEL ");
75 let level = match level {
76 IsolationLevel::ReadUncommitted => "READ UNCOMMITTED",
77 IsolationLevel::ReadCommitted => "READ COMMITTED",
78 IsolationLevel::RepeatableRead => "REPEATABLE READ",
79 IsolationLevel::Serializable => "SERIALIZABLE",
80 };
81 query.push_str(level);
82 }
83
84 if let Some(read_only) = self.read_only {
85 if !first {
86 query.push(',');
87 }
88 first = false;
89
90 let s = if read_only {
91 " READ ONLY"
92 } else {
93 " READ WRITE"
94 };
95 query.push_str(s);
96 }
97
98 if let Some(deferrable) = self.deferrable {
99 if !first {
100 query.push(',');
101 }
102
103 let s = if deferrable {
104 " DEFERRABLE"
105 } else {
106 " NOT DEFERRABLE"
107 };
108 query.push_str(s);
109 }
110
111 struct RollbackIfNotDone<'me> {
112 client: &'me Client,
113 done: bool,
114 }
115
116 impl Drop for RollbackIfNotDone<'_> {
117 fn drop(&mut self) {
118 if self.done {
119 return;
120 }
121
122 let buf = self.client.inner().with_buf(|buf| {
123 frontend::query("ROLLBACK", buf).unwrap();
124 buf.split().freeze()
125 });
126 let _ = self
127 .client
128 .inner()
129 .send(RequestMessages::Single(FrontendMessage::Raw(buf)));
130 }
131 }
132
133 {
139 let mut cleaner = RollbackIfNotDone {
140 client: self.client,
141 done: false,
142 };
143 self.client.batch_execute(&query).await?;
144 cleaner.done = true;
145 }
146
147 Ok(Transaction::new(self.client))
148 }
149}