1use crate::protocol::text::ColumnType;
2use crate::{
3 MySql, MySqlColumn, MySqlConnectOptions, MySqlConnection, MySqlQueryResult, MySqlRow,
4 MySqlTransactionManager, MySqlTypeInfo,
5};
6use either::Either;
7use futures_core::future::BoxFuture;
8use futures_core::stream::BoxStream;
9use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt};
10use sqlx_core::any::{
11 Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
12 AnyStatement, AnyTypeInfo, AnyTypeInfoKind,
13};
14use sqlx_core::connection::Connection;
15use sqlx_core::database::Database;
16use sqlx_core::describe::Describe;
17use sqlx_core::executor::Executor;
18use sqlx_core::transaction::TransactionManager;
19use std::borrow::Cow;
20use std::{future, pin::pin};
21
22sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);
23
24impl AnyConnectionBackend for MySqlConnection {
25 fn name(&self) -> &str {
26 <MySql as Database>::NAME
27 }
28
29 fn close(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
30 Connection::close(*self)
31 }
32
33 fn close_hard(self: Box<Self>) -> BoxFuture<'static, sqlx_core::Result<()>> {
34 Connection::close_hard(*self)
35 }
36
37 fn ping(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
38 Connection::ping(self)
39 }
40
41 fn begin(
42 &mut self,
43 statement: Option<Cow<'static, str>>,
44 ) -> BoxFuture<'_, sqlx_core::Result<()>> {
45 MySqlTransactionManager::begin(self, statement)
46 }
47
48 fn commit(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
49 MySqlTransactionManager::commit(self)
50 }
51
52 fn rollback(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
53 MySqlTransactionManager::rollback(self)
54 }
55
56 fn start_rollback(&mut self) {
57 MySqlTransactionManager::start_rollback(self)
58 }
59
60 fn get_transaction_depth(&self) -> usize {
61 MySqlTransactionManager::get_transaction_depth(self)
62 }
63
64 fn shrink_buffers(&mut self) {
65 Connection::shrink_buffers(self);
66 }
67
68 fn flush(&mut self) -> BoxFuture<'_, sqlx_core::Result<()>> {
69 Connection::flush(self)
70 }
71
72 fn should_flush(&self) -> bool {
73 Connection::should_flush(self)
74 }
75
76 #[cfg(feature = "migrate")]
77 fn as_migrate(
78 &mut self,
79 ) -> sqlx_core::Result<&mut (dyn sqlx_core::migrate::Migrate + Send + 'static)> {
80 Ok(self)
81 }
82
83 fn fetch_many<'q>(
84 &'q mut self,
85 query: &'q str,
86 persistent: bool,
87 arguments: Option<AnyArguments<'q>>,
88 ) -> BoxStream<'q, sqlx_core::Result<Either<AnyQueryResult, AnyRow>>> {
89 let persistent = persistent && arguments.is_some();
90 let arguments = match arguments.as_ref().map(AnyArguments::convert_to).transpose() {
91 Ok(arguments) => arguments,
92 Err(error) => {
93 return stream::once(future::ready(Err(sqlx_core::Error::Encode(error)))).boxed()
94 }
95 };
96
97 Box::pin(
98 self.run(query, arguments, persistent)
99 .try_flatten_stream()
100 .map(|res| {
101 Ok(match res? {
102 Either::Left(result) => Either::Left(map_result(result)),
103 Either::Right(row) => Either::Right(AnyRow::try_from(&row)?),
104 })
105 }),
106 )
107 }
108
109 fn fetch_optional<'q>(
110 &'q mut self,
111 query: &'q str,
112 persistent: bool,
113 arguments: Option<AnyArguments<'q>>,
114 ) -> BoxFuture<'q, sqlx_core::Result<Option<AnyRow>>> {
115 let persistent = persistent && arguments.is_some();
116 let arguments = arguments
117 .as_ref()
118 .map(AnyArguments::convert_to)
119 .transpose()
120 .map_err(sqlx_core::Error::Encode);
121
122 Box::pin(async move {
123 let arguments = arguments?;
124 let mut stream = pin!(self.run(query, arguments, persistent).await?);
125
126 while let Some(result) = stream.try_next().await? {
127 if let Either::Right(row) = result {
128 return Ok(Some(AnyRow::try_from(&row)?));
129 }
130 }
131
132 Ok(None)
133 })
134 }
135
136 fn prepare_with<'c, 'q: 'c>(
137 &'c mut self,
138 sql: &'q str,
139 _parameters: &[AnyTypeInfo],
140 ) -> BoxFuture<'c, sqlx_core::Result<AnyStatement<'q>>> {
141 Box::pin(async move {
142 let statement = Executor::prepare_with(self, sql, &[]).await?;
143 AnyStatement::try_from_statement(
144 sql,
145 &statement,
146 statement.metadata.column_names.clone(),
147 )
148 })
149 }
150
151 fn describe<'q>(&'q mut self, sql: &'q str) -> BoxFuture<'q, sqlx_core::Result<Describe<Any>>> {
152 Box::pin(async move {
153 let describe = Executor::describe(self, sql).await?;
154 describe.try_into_any()
155 })
156 }
157}
158
159impl<'a> TryFrom<&'a MySqlTypeInfo> for AnyTypeInfo {
160 type Error = sqlx_core::Error;
161
162 fn try_from(type_info: &'a MySqlTypeInfo) -> Result<Self, Self::Error> {
163 Ok(AnyTypeInfo {
164 kind: match &type_info.r#type {
165 ColumnType::Null => AnyTypeInfoKind::Null,
166 ColumnType::Short => AnyTypeInfoKind::SmallInt,
167 ColumnType::Long => AnyTypeInfoKind::Integer,
168 ColumnType::LongLong => AnyTypeInfoKind::BigInt,
169 ColumnType::Float => AnyTypeInfoKind::Real,
170 ColumnType::Double => AnyTypeInfoKind::Double,
171 ColumnType::Blob
172 | ColumnType::TinyBlob
173 | ColumnType::MediumBlob
174 | ColumnType::LongBlob => AnyTypeInfoKind::Blob,
175 ColumnType::String | ColumnType::VarString | ColumnType::VarChar => {
176 AnyTypeInfoKind::Text
177 }
178 _ => {
179 return Err(sqlx_core::Error::AnyDriverError(
180 format!("Any driver does not support MySql type {type_info:?}").into(),
181 ))
182 }
183 },
184 })
185 }
186}
187
188impl<'a> TryFrom<&'a MySqlColumn> for AnyColumn {
189 type Error = sqlx_core::Error;
190
191 fn try_from(column: &'a MySqlColumn) -> Result<Self, Self::Error> {
192 let type_info = AnyTypeInfo::try_from(&column.type_info)?;
193
194 Ok(AnyColumn {
195 ordinal: column.ordinal,
196 name: column.name.clone(),
197 type_info,
198 })
199 }
200}
201
202impl<'a> TryFrom<&'a MySqlRow> for AnyRow {
203 type Error = sqlx_core::Error;
204
205 fn try_from(row: &'a MySqlRow) -> Result<Self, Self::Error> {
206 AnyRow::map_from(row, row.column_names.clone())
207 }
208}
209
210impl<'a> TryFrom<&'a AnyConnectOptions> for MySqlConnectOptions {
211 type Error = sqlx_core::Error;
212
213 fn try_from(any_opts: &'a AnyConnectOptions) -> Result<Self, Self::Error> {
214 let mut opts = Self::parse_from_url(&any_opts.database_url)?;
215 opts.log_settings = any_opts.log_settings.clone();
216 Ok(opts)
217 }
218}
219
220fn map_result(result: MySqlQueryResult) -> AnyQueryResult {
221 AnyQueryResult {
222 rows_affected: result.rows_affected,
223 #[allow(clippy::cast_possible_wrap)]
225 last_insert_id: Some(result.last_insert_id as i64),
226 }
227}