sqlx_mysql/
migrate.rs

1use std::str::FromStr;
2use std::time::Duration;
3use std::time::Instant;
4
5use futures_core::future::BoxFuture;
6pub(crate) use sqlx_core::migrate::*;
7
8use crate::connection::{ConnectOptions, Connection};
9use crate::error::Error;
10use crate::executor::Executor;
11use crate::query::query;
12use crate::query_as::query_as;
13use crate::query_scalar::query_scalar;
14use crate::{MySql, MySqlConnectOptions, MySqlConnection};
15
16fn parse_for_maintenance(url: &str) -> Result<(MySqlConnectOptions, String), Error> {
17    let mut options = MySqlConnectOptions::from_str(url)?;
18
19    let database = if let Some(database) = &options.database {
20        database.to_owned()
21    } else {
22        return Err(Error::Configuration(
23            "DATABASE_URL does not specify a database".into(),
24        ));
25    };
26
27    // switch us to <no> database for create/drop commands
28    options.database = None;
29
30    Ok((options, database))
31}
32
33impl MigrateDatabase for MySql {
34    fn create_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
35        Box::pin(async move {
36            let (options, database) = parse_for_maintenance(url)?;
37            let mut conn = options.connect().await?;
38
39            let _ = conn
40                .execute(&*format!("CREATE DATABASE `{database}`"))
41                .await?;
42
43            Ok(())
44        })
45    }
46
47    fn database_exists(url: &str) -> BoxFuture<'_, Result<bool, Error>> {
48        Box::pin(async move {
49            let (options, database) = parse_for_maintenance(url)?;
50            let mut conn = options.connect().await?;
51
52            let exists: bool = query_scalar(
53                "select exists(SELECT 1 from INFORMATION_SCHEMA.SCHEMATA WHERE SCHEMA_NAME = ?)",
54            )
55            .bind(database)
56            .fetch_one(&mut conn)
57            .await?;
58
59            Ok(exists)
60        })
61    }
62
63    fn drop_database(url: &str) -> BoxFuture<'_, Result<(), Error>> {
64        Box::pin(async move {
65            let (options, database) = parse_for_maintenance(url)?;
66            let mut conn = options.connect().await?;
67
68            let _ = conn
69                .execute(&*format!("DROP DATABASE IF EXISTS `{database}`"))
70                .await?;
71
72            Ok(())
73        })
74    }
75}
76
77impl Migrate for MySqlConnection {
78    fn ensure_migrations_table(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
79        Box::pin(async move {
80            // language=MySQL
81            self.execute(
82                r#"
83CREATE TABLE IF NOT EXISTS _sqlx_migrations (
84    version BIGINT PRIMARY KEY,
85    description TEXT NOT NULL,
86    installed_on TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP,
87    success BOOLEAN NOT NULL,
88    checksum BLOB NOT NULL,
89    execution_time BIGINT NOT NULL
90);
91                "#,
92            )
93            .await?;
94
95            Ok(())
96        })
97    }
98
99    fn dirty_version(&mut self) -> BoxFuture<'_, Result<Option<i64>, MigrateError>> {
100        Box::pin(async move {
101            // language=SQL
102            let row: Option<(i64,)> = query_as(
103                "SELECT version FROM _sqlx_migrations WHERE success = false ORDER BY version LIMIT 1",
104            )
105            .fetch_optional(self)
106            .await?;
107
108            Ok(row.map(|r| r.0))
109        })
110    }
111
112    fn list_applied_migrations(
113        &mut self,
114    ) -> BoxFuture<'_, Result<Vec<AppliedMigration>, MigrateError>> {
115        Box::pin(async move {
116            // language=SQL
117            let rows: Vec<(i64, Vec<u8>)> =
118                query_as("SELECT version, checksum FROM _sqlx_migrations ORDER BY version")
119                    .fetch_all(self)
120                    .await?;
121
122            let migrations = rows
123                .into_iter()
124                .map(|(version, checksum)| AppliedMigration {
125                    version,
126                    checksum: checksum.into(),
127                })
128                .collect();
129
130            Ok(migrations)
131        })
132    }
133
134    fn lock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
135        Box::pin(async move {
136            let database_name = current_database(self).await?;
137            let lock_id = generate_lock_id(&database_name);
138
139            // create an application lock over the database
140            // this function will not return until the lock is acquired
141
142            // https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS
143            // https://www.postgresql.org/docs/current/functions-admin.html#FUNCTIONS-ADVISORY-LOCKS-TABLE
144
145            // language=MySQL
146            let _ = query("SELECT GET_LOCK(?, -1)")
147                .bind(lock_id)
148                .execute(self)
149                .await?;
150
151            Ok(())
152        })
153    }
154
155    fn unlock(&mut self) -> BoxFuture<'_, Result<(), MigrateError>> {
156        Box::pin(async move {
157            let database_name = current_database(self).await?;
158            let lock_id = generate_lock_id(&database_name);
159
160            // language=MySQL
161            let _ = query("SELECT RELEASE_LOCK(?)")
162                .bind(lock_id)
163                .execute(self)
164                .await?;
165
166            Ok(())
167        })
168    }
169
170    fn apply<'e: 'm, 'm>(
171        &'e mut self,
172        migration: &'m Migration,
173    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
174        Box::pin(async move {
175            // Use a single transaction for the actual migration script and the essential bookeeping so we never
176            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
177            // The `execution_time` however can only be measured for the whole transaction. This value _only_ exists for
178            // data lineage and debugging reasons, so it is not super important if it is lost. So we initialize it to -1
179            // and update it once the actual transaction completed.
180            let mut tx = self.begin().await?;
181            let start = Instant::now();
182
183            // For MySQL we cannot really isolate migrations due to implicit commits caused by table modification, see
184            // https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
185            //
186            // To somewhat try to detect this, we first insert the migration into the migration table with
187            // `success=FALSE` and later modify the flag.
188            //
189            // language=MySQL
190            let _ = query(
191                r#"
192    INSERT INTO _sqlx_migrations ( version, description, success, checksum, execution_time )
193    VALUES ( ?, ?, FALSE, ?, -1 )
194                "#,
195            )
196            .bind(migration.version)
197            .bind(&*migration.description)
198            .bind(&*migration.checksum)
199            .execute(&mut *tx)
200            .await?;
201
202            let _ = tx
203                .execute(&*migration.sql)
204                .await
205                .map_err(|e| MigrateError::ExecuteMigration(e, migration.version))?;
206
207            // language=MySQL
208            let _ = query(
209                r#"
210    UPDATE _sqlx_migrations
211    SET success = TRUE
212    WHERE version = ?
213                "#,
214            )
215            .bind(migration.version)
216            .execute(&mut *tx)
217            .await?;
218
219            tx.commit().await?;
220
221            // Update `elapsed_time`.
222            // NOTE: The process may disconnect/die at this point, so the elapsed time value might be lost. We accept
223            //       this small risk since this value is not super important.
224
225            let elapsed = start.elapsed();
226
227            #[allow(clippy::cast_possible_truncation)]
228            let _ = query(
229                r#"
230    UPDATE _sqlx_migrations
231    SET execution_time = ?
232    WHERE version = ?
233                "#,
234            )
235            .bind(elapsed.as_nanos() as i64)
236            .bind(migration.version)
237            .execute(self)
238            .await?;
239
240            Ok(elapsed)
241        })
242    }
243
244    fn revert<'e: 'm, 'm>(
245        &'e mut self,
246        migration: &'m Migration,
247    ) -> BoxFuture<'m, Result<Duration, MigrateError>> {
248        Box::pin(async move {
249            // Use a single transaction for the actual migration script and the essential bookeeping so we never
250            // execute migrations twice. See https://github.com/launchbadge/sqlx/issues/1966.
251            let mut tx = self.begin().await?;
252            let start = Instant::now();
253
254            // For MySQL we cannot really isolate migrations due to implicit commits caused by table modification, see
255            // https://dev.mysql.com/doc/refman/8.0/en/implicit-commit.html
256            //
257            // To somewhat try to detect this, we first insert the migration into the migration table with
258            // `success=FALSE` and later remove the migration altogether.
259            //
260            // language=MySQL
261            let _ = query(
262                r#"
263    UPDATE _sqlx_migrations
264    SET success = FALSE
265    WHERE version = ?
266                "#,
267            )
268            .bind(migration.version)
269            .execute(&mut *tx)
270            .await?;
271
272            tx.execute(&*migration.sql).await?;
273
274            // language=SQL
275            let _ = query(r#"DELETE FROM _sqlx_migrations WHERE version = ?"#)
276                .bind(migration.version)
277                .execute(&mut *tx)
278                .await?;
279
280            tx.commit().await?;
281
282            let elapsed = start.elapsed();
283
284            Ok(elapsed)
285        })
286    }
287}
288
289async fn current_database(conn: &mut MySqlConnection) -> Result<String, MigrateError> {
290    // language=MySQL
291    Ok(query_scalar("SELECT DATABASE()").fetch_one(conn).await?)
292}
293
294// inspired from rails: https://github.com/rails/rails/blob/6e49cc77ab3d16c06e12f93158eaf3e507d4120e/activerecord/lib/active_record/migration.rb#L1308
295fn generate_lock_id(database_name: &str) -> String {
296    const CRC_IEEE: crc::Crc<u32> = crc::Crc::<u32>::new(&crc::CRC_32_ISO_HDLC);
297    // 0x3d32ad9e chosen by fair dice roll
298    format!(
299        "{:x}",
300        0x3d32ad9e * (CRC_IEEE.checksum(database_name.as_bytes()) as i64)
301    )
302}