sqlx_mysql/
any.rs

1use crate::protocol::text::ColumnType;
2use crate::{
3    MySql, MySqlColumn, MySqlConnectOptions, MySqlConnection, MySqlQueryResult, MySqlRow,
4    MySqlTransactionManager, MySqlTypeInfo,
5};
6use either::Either;
7use futures_core::future::BoxFuture;
8use futures_core::stream::BoxStream;
9use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt};
10use sqlx_core::any::{
11    Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
12    AnyStatement, AnyTypeInfo, AnyTypeInfoKind,
13};
14use sqlx_core::connection::Connection;
15use sqlx_core::database::Database;
16use sqlx_core::describe::Describe;
17use sqlx_core::executor::Executor;
18use sqlx_core::transaction::TransactionManager;
19use std::borrow::Cow;
20use std::{future, pin::pin};
21
22sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);
23
24impl AnyConnectionBackend for MySqlConnection {
25    fn name(&self) -> &str {
26        <MySql as Database>::NAME
27    }
28
29    fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
30        Connection::close(*self)
31    }
32
33    fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
34        Connection::close_hard(*self)
35    }
36
37    fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
38        Connection::ping(self)
39    }
40
41    fn begin(
42        &mut self,
43        statement: Option<Cow<'static, str>>,
44    ) -> BoxFuture<'_, sqlx_core::Result<()>> {
45        MySqlTransactionManager::begin(self, statement)
46    }
47
48    fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
49        MySqlTransactionManager::commit(self)
50    }
51
52    fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
53        MySqlTransactionManager::rollback(self)
54    }
55
56    fn start_rollback(&mut self) {
57        MySqlTransactionManager::start_rollback(self)
58    }
59
60    fn get_transaction_depth(&self) -> usize {
61        MySqlTransactionManager::get_transaction_depth(self)
62    }
63
64    fn shrink_buffers(&mut self) {
65        Connection::shrink_buffers(self);
66    }
67
68    fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
69        Connection::flush(self)
70    }
71
72    fn should_flush(&self) -> bool {
73        Connection::should_flush(self)
74    }
75
76    #[cfg(feature = "migrate")]
77    fn as_migrate(
78        &mut self,
79    ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
80        Ok(self)
81    }
82
83    fn fetch_many<'q>(
84        &'q mut self,
85        query: &'q str,
86        persistent: bool,
87        arguments: Option<AnyArguments<'q>>,
88    ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
89        let persistent = persistent && arguments.is_some();
90        let arguments = match arguments.as_ref().map(AnyArguments::convert_to).transpose() {
91            Ok(arguments) => arguments,
92            Err(error) => {
93                return stream::once(future::ready(Err(sqlx_core::Error::Encode(error)))).boxed()
94            }
95        };
96
97        Box::pin(
98            self.run(query, arguments, persistent)
99                .try_flatten_stream()
100                .map(|res| {
101                    Ok(match res? {
102                        Either::Left(result) => Either::Left(map_result(result)),
103                        Either::Right(row) => Either::Right(AnyRow::try_from(&row)?),
104                    })
105                }),
106        )
107    }
108
109    fn fetch_optional<'q>(
110        &'q mut self,
111        query: &'q str,
112        persistent: bool,
113        arguments: Option<AnyArguments<'q>>,
114    ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
115        let persistent = persistent && arguments.is_some();
116        let arguments = arguments
117            .as_ref()
118            .map(AnyArguments::convert_to)
119            .transpose()
120            .map_err(sqlx_core::Error::Encode);
121
122        Box::pin(async move {
123            let arguments = arguments?;
124            let mut stream = pin!(self.run(query, arguments, persistent).await?);
125
126            while let Some(result) = stream.try_next().await? {
127                if let Either::Right(row) = result {
128                    return Ok(Some(AnyRow::try_from(&row)?));
129                }
130            }
131
132            Ok(None)
133        })
134    }
135
136    fn prepare_with<'c, 'q: 'c>(
137        &'c mut self,
138        sql: &'q str,
139        _parameters: &[AnyTypeInfo],
140    ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
141        Box::pin(async move {
142            let statement = Executor::prepare_with(self, sql, &[]).await?;
143            AnyStatement::try_from_statement(
144                sql,
145                &statement,
146                statement.metadata.column_names.clone(),
147            )
148        })
149    }
150
151    fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
152        Box::pin(async move {
153            let describe = Executor::describe(self, sql).await?;
154            describe.try_into_any()
155        })
156    }
157}
158
159impl<'a> TryFrom<&'a MySqlTypeInfo> for AnyTypeInfo {
160    type Error = sqlx_core::Error;
161
162    fn try_from(type_info: &'a MySqlTypeInfo) -> Result<Self, Self::Error> {
163        Ok(AnyTypeInfo {
164            kind: match &type_info.r#type {
165                ColumnType::Null => AnyTypeInfoKind::Null,
166                ColumnType::Short => AnyTypeInfoKind::SmallInt,
167                ColumnType::Long => AnyTypeInfoKind::Integer,
168                ColumnType::LongLong => AnyTypeInfoKind::BigInt,
169                ColumnType::Float => AnyTypeInfoKind::Real,
170                ColumnType::Double => AnyTypeInfoKind::Double,
171                ColumnType::Blob
172                | ColumnType::TinyBlob
173                | ColumnType::MediumBlob
174                | ColumnType::LongBlob => AnyTypeInfoKind::Blob,
175                ColumnType::String | ColumnType::VarString | ColumnType::VarChar => {
176                    AnyTypeInfoKind::Text
177                }
178                _ => {
179                    return Err(sqlx_core::Error::AnyDriverError(
180                        format!("Any driver does not support MySql type {type_info:?}").into(),
181                    ))
182                }
183            },
184        })
185    }
186}
187
188impl<'a> TryFrom<&'a MySqlColumn> for AnyColumn {
189    type Error = sqlx_core::Error;
190
191    fn try_from(column: &'a MySqlColumn) -> Result<Self, Self::Error> {
192        let type_info = AnyTypeInfo::try_from(&column.type_info)?;
193
194        Ok(AnyColumn {
195            ordinal: column.ordinal,
196            name: column.name.clone(),
197            type_info,
198        })
199    }
200}
201
202impl<'a> TryFrom<&'a MySqlRow> for AnyRow {
203    type Error = sqlx_core::Error;
204
205    fn try_from(row: &'a MySqlRow) -> Result<Self, Self::Error> {
206        AnyRow::map_from(row, row.column_names.clone())
207    }
208}
209
210impl<'a> TryFrom<&'a AnyConnectOptions> for MySqlConnectOptions {
211    type Error = sqlx_core::Error;
212
213    fn try_from(any_opts: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
214        let mut opts = Self::parse_from_url(&any_opts.database_url)?;
215        opts.log_settings = any_opts.log_settings.clone();
216        Ok(opts)
217    }
218}
219
220fn map_result(result: MySqlQueryResult) -> AnyQueryResult {
221    AnyQueryResult {
222        rows_affected: result.rows_affected,
223        // Don't expect this to be a problem
224        #[allow(clippy::cast_possible_wrap)]
225        last_insert_id: Some(result.last_insert_id as i64),
226    }
227}