sqlx_mysql/testing/
mod.rs1use std::ops::Deref;
2use std::str::FromStr;
3use std::time::Duration;
4
5use futures_core::future::BoxFuture;
6
7use crate::error::Error;
8use crate::executor::Executor;
9use crate::pool::{Pool, PoolOptions};
10use crate::query::query;
11use crate::{MySql, MySqlConnectOptions, MySqlConnection, MySqlDatabaseError};
12use once_cell::sync::OnceCell;
13use sqlx_core::connection::Connection;
14use sqlx_core::query_builder::QueryBuilder;
15use sqlx_core::query_scalar::query_scalar;
16use std::fmt::Write;
17
18pub(crate) use sqlx_core::testing::*;
19
20static MASTER_POOL: OnceCell<Pool<MySql>> = OnceCell::new();
22
23impl TestSupport for MySql {
24 fn test_context(args: &TestArgs) -> BoxFuture<'_, Result<TestContext<Self>, Error>> {
25 Box::pin(async move { test_context(args).await })
26 }
27
28 fn cleanup_test(db_name: &str) -> BoxFuture<'_, Result<(), Error>> {
29 Box::pin(async move {
30 let mut conn = MASTER_POOL
31 .get()
32 .expect("cleanup_test() invoked outside `#[sqlx::test]`")
33 .acquire()
34 .await?;
35
36 do_cleanup(&mut conn, db_name).await
37 })
38 }
39
40 fn cleanup_test_dbs() -> BoxFuture<'static, Result<Option<usize>, Error>> {
41 Box::pin(async move {
42 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
43
44 let mut conn = MySqlConnection::connect(&url).await?;
45
46 let delete_db_names: Vec<String> =
47 query_scalar("select db_name from _sqlx_test_databases")
48 .fetch_all(&mut conn)
49 .await?;
50
51 if delete_db_names.is_empty() {
52 return Ok(None);
53 }
54
55 let mut deleted_db_names = Vec::with_capacity(delete_db_names.len());
56
57 let mut command = String::new();
58
59 for db_name in &delete_db_names {
60 command.clear();
61
62 let db_name = format!("_sqlx_test_database_{db_name}");
63
64 writeln!(command, "drop database if exists {db_name};").ok();
65 match conn.execute(&*command).await {
66 Ok(_deleted) => {
67 deleted_db_names.push(db_name);
68 }
69 Err(Error::Database(dbe)) => {
71 eprintln!("could not clean test database {db_name:?}: {dbe}")
72 }
73 Err(e) => return Err(e),
75 }
76 }
77
78 if deleted_db_names.is_empty() {
79 return Ok(None);
80 }
81
82 let mut query =
83 QueryBuilder::new("delete from _sqlx_test_databases where db_name in (");
84
85 let mut separated = query.separated(",");
86
87 for db_name in &deleted_db_names {
88 separated.push_bind(db_name);
89 }
90
91 query.push(")").build().execute(&mut conn).await?;
92
93 let _ = conn.close().await;
94 Ok(Some(delete_db_names.len()))
95 })
96 }
97
98 fn snapshot(
99 _conn: &mut Self::Connection,
100 ) -> BoxFuture<'_, Result<FixtureSnapshot<Self>, Error>> {
101 todo!()
104 }
105}
106
107async fn test_context(args: &TestArgs) -> Result<TestContext<MySql>, Error> {
108 let url = dotenvy::var("DATABASE_URL").expect("DATABASE_URL must be set");
109
110 let master_opts = MySqlConnectOptions::from_str(&url).expect("failed to parse DATABASE_URL");
111
112 let pool = PoolOptions::new()
113 .max_connections(20)
117 .after_release(|_conn, _| Box::pin(async move { Ok(false) }))
119 .connect_lazy_with(master_opts);
120
121 let master_pool = match MASTER_POOL.try_insert(pool) {
122 Ok(inserted) => inserted,
123 Err((existing, pool)) => {
124 assert_eq!(
126 existing.connect_options().host,
127 pool.connect_options().host,
128 "DATABASE_URL changed at runtime, host differs"
129 );
130
131 assert_eq!(
132 existing.connect_options().database,
133 pool.connect_options().database,
134 "DATABASE_URL changed at runtime, database differs"
135 );
136
137 existing
138 }
139 };
140
141 let mut conn = master_pool.acquire().await?;
142
143 cleanup_old_dbs(&mut conn).await?;
144
145 conn.execute(
147 r#"
148 create table if not exists _sqlx_test_databases (
149 db_name text not null,
150 test_path text not null,
151 created_at timestamp not null default current_timestamp,
152 -- BLOB/TEXT columns can only be used as index keys with a prefix length:
153 -- https://dev.mysql.com/doc/refman/8.4/en/column-indexes.html#column-indexes-prefix
154 primary key(db_name(63))
155 );
156 "#,
157 )
158 .await?;
159
160 let db_name = MySql::db_name(args);
161 do_cleanup(&mut conn, &db_name).await?;
162
163 query("insert into _sqlx_test_databases(db_name, test_path) values (?, ?)")
164 .bind(&db_name)
165 .bind(args.test_path)
166 .execute(&mut *conn)
167 .await?;
168
169 conn.execute(&format!("create database {db_name}")[..])
170 .await?;
171
172 eprintln!("created database {db_name}");
173
174 Ok(TestContext {
175 pool_opts: PoolOptions::new()
176 .max_connections(5)
180 .idle_timeout(Some(Duration::from_secs(1)))
182 .parent(master_pool.clone()),
183 connect_opts: master_pool
184 .connect_options()
185 .deref()
186 .clone()
187 .database(&db_name),
188 db_name,
189 })
190}
191
192async fn do_cleanup(conn: &mut MySqlConnection, db_name: &str) -> Result<(), Error> {
193 let delete_db_command = format!("drop database if exists {db_name};");
194 conn.execute(&*delete_db_command).await?;
195 query("delete from _sqlx_test_databases where db_name = ?")
196 .bind(db_name)
197 .execute(&mut *conn)
198 .await?;
199
200 Ok(())
201}
202
203async fn cleanup_old_dbs(conn: &mut MySqlConnection) -> Result<(), Error> {
205 let res: Result<Vec<u64>, Error> = query_scalar("select db_id from _sqlx_test_databases")
206 .fetch_all(&mut *conn)
207 .await;
208
209 let db_ids = match res {
210 Ok(db_ids) => db_ids,
211 Err(e) => {
212 if let Some(dbe) = e.as_database_error() {
213 match dbe.downcast_ref::<MySqlDatabaseError>().number() {
214 1054 => return Ok(()),
219 1146 => return Ok(()),
223 _ => (),
224 }
225 }
226
227 return Err(e);
228 }
229 };
230
231 for id in db_ids {
233 match conn
234 .execute(&*format!(
235 "drop database if exists _sqlx_test_database_{id}"
236 ))
237 .await
238 {
239 Ok(_deleted) => (),
240 Err(Error::Database(dbe)) => {
242 eprintln!("could not clean old test database _sqlx_test_database_{id}: {dbe}");
243 }
244 Err(e) => return Err(e),
246 }
247 }
248
249 conn.execute("drop table if exists _sqlx_test_databases")
250 .await?;
251
252 Ok(())
253}