1use std::str::FromStr;
2use std::time::Duration;
3use std::time::Instant;
4
5use futures_core::future::BoxFuture;
6pub(crate) use sqlx_core::migrate::*;
7
8use crate::connection::{ConnectOptions, Connection};
9use crate::error::Error;
10use crate::executor::Executor;
11use crate::query::query;
12use crate::query_as::query_as;
13use crate::query_scalar::query_scalar;
14use crate::{MySql, MySqlConnectOptions, MySqlConnection};
15
16fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Error> {
17 let mut options = MySqlConnectOptions::from_str(url)?;
18
19 let database = if let Some(database) = &options.database {
20 database.to_owned()
21 } else {
22 return Err(Error::Configuration(
23 "DATABASE_URL does not specify a database".into(),
24 ));
25 };
26
27 options.database = None;
29
30 Ok((options, database))
31}
32
33impl MigrateDatabase for MySql {
34 fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
35 Box::pin(async move {
36 let (options, database) = parse_for_maintenance(url)?;
37 let mut conn = options.connect().await?;
38
39 let _ = conn
40 .execute(&*format!("CREATE DATABASE `{database}`"))
41 .await?;
42
43 Ok(())
44 })
45 }
46
47 fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
48 Box::pin(async move {
49 let (options, database) = parse_for_maintenance(url)?;
50 let mut conn = options.connect().await?;
51
52 let exists: bool = query_scalar(
53 "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)",
54 )
55 .bind(database)
56 .fetch_one(&mut conn)
57 .await?;
58
59 Ok(exists)
60 })
61 }
62
63 fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
64 Box::pin(async move {
65 let (options, database) = parse_for_maintenance(url)?;
66 let mut conn = options.connect().await?;
67
68 let _ = conn
69 .execute(&*format!("DROP DATABASE IF EXISTS `{database}`"))
70 .await?;
71
72 Ok(())
73 })
74 }
75}
76
77impl Migrate for MySqlConnection {
78 fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
79 Box::pin(async move {
80 self.execute(
82 r#"
83CREATE TABLE IF NOT EXISTS _sqlx_migrations (
84 version BIGINT PRIMARY KEY,
85 description TEXT NOT NULL,
86 installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87 success BOOLEAN NOT NULL,
88 checksum BLOB NOT NULL,
89 execution_time BIGINT NOT NULL
90);
91 "#,
92 )
93 .await?;
94
95 Ok(())
96 })
97 }
98
99 fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
100 Box::pin(async move {
101 let row: Option<(i64,)> = query_as(
103 "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
104 )
105 .fetch_optional(self)
106 .await?;
107
108 Ok(row.map(|r| r.0))
109 })
110 }
111
112 fn list_applied_migrations(
113 &mut self,
114 ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
115 Box::pin(async move {
116 let rows: Vec<(i64, Vec<u8>)> =
118 query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
119 .fetch_all(self)
120 .await?;
121
122 let migrations = rows
123 .into_iter()
124 .map(|(version, checksum)| AppliedMigration {
125 version,
126 checksum: checksum.into(),
127 })
128 .collect();
129
130 Ok(migrations)
131 })
132 }
133
134 fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
135 Box::pin(async move {
136 let database_name = current_database(self).await?;
137 let lock_id = generate_lock_id(&database_name);
138
139 let _ = query("SELECT GET_LOCK(?, -1)")
147 .bind(lock_id)
148 .execute(self)
149 .await?;
150
151 Ok(())
152 })
153 }
154
155 fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
156 Box::pin(async move {
157 let database_name = current_database(self).await?;
158 let lock_id = generate_lock_id(&database_name);
159
160 let _ = query("SELECT RELEASE_LOCK(?)")
162 .bind(lock_id)
163 .execute(self)
164 .await?;
165
166 Ok(())
167 })
168 }
169
170 fn apply<'e: 'm, 'm>(
171 &'e mut self,
172 migration: &'m Migration,
173 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
174 Box::pin(async move {
175 let mut tx = self.begin().await?;
181 let start = Instant::now();
182
183 let _ = query(
191 r#"
192 INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
193 VALUES ( ?, ?, FALSE, ?, -1 )
194 "#,
195 )
196 .bind(migration.version)
197 .bind(&*migration.description)
198 .bind(&*migration.checksum)
199 .execute(&mut *tx)
200 .await?;
201
202 let _ = tx
203 .execute(&*migration.sql)
204 .await
205 .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
206
207 let _ = query(
209 r#"
210 UPDATE _sqlx_migrations
211 SET success = TRUE
212 WHERE version = ?
213 "#,
214 )
215 .bind(migration.version)
216 .execute(&mut *tx)
217 .await?;
218
219 tx.commit().await?;
220
221 let elapsed = start.elapsed();
226
227 #[allow(clippy::cast_possible_truncation)]
228 let _ = query(
229 r#"
230 UPDATE _sqlx_migrations
231 SET execution_time = ?
232 WHERE version = ?
233 "#,
234 )
235 .bind(elapsed.as_nanos() as i64)
236 .bind(migration.version)
237 .execute(self)
238 .await?;
239
240 Ok(elapsed)
241 })
242 }
243
244 fn revert<'e: 'm, 'm>(
245 &'e mut self,
246 migration: &'m Migration,
247 ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
248 Box::pin(async move {
249 let mut tx = self.begin().await?;
252 let start = Instant::now();
253
254 let _ = query(
262 r#"
263 UPDATE _sqlx_migrations
264 SET success = FALSE
265 WHERE version = ?
266 "#,
267 )
268 .bind(migration.version)
269 .execute(&mut *tx)
270 .await?;
271
272 tx.execute(&*migration.sql).await?;
273
274 let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
276 .bind(migration.version)
277 .execute(&mut *tx)
278 .await?;
279
280 tx.commit().await?;
281
282 let elapsed = start.elapsed();
283
284 Ok(elapsed)
285 })
286 }
287}
288
289async fn current_database(conn: &mut MySqlConnection) -> Result<String, MigrateError> {
290 Ok(query_scalar("SELECT DATABASE()").fetch_one(conn).await?)
292}
293
294fn generate_lock_id(database_name: &str) -> String {
296 const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
297 format!(
299 "{:x}",
300 0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
301 )
302}