1use futures::StreamExt;
4use serde::{Deserialize, Serialize};
5use serde_json::Value;
6use std::{collections::HashMap, fmt};
7use tokio::{
8 net::UnixStream,
9 sync::{broadcast, mpsc, oneshot},
10};
11
12use crate::{
13 ipc::{MpvIpc, MpvIpcCommand, MpvIpcEvent, MpvIpcResponse},
14 message_parser::TypeHandler,
15 Event, MpvError,
16};
17
18#[derive(Debug, Clone, Serialize, Deserialize)]
29pub enum MpvCommand {
30 LoadFile {
32 file: String,
33 option: PlaylistAddOptions,
34 },
35
36 LoadList {
38 file: String,
39 option: PlaylistAddOptions,
40 },
41
42 PlaylistClear,
44
45 PlaylistMove { from: usize, to: usize },
50
51 Observe { id: u64, property: String },
55
56 PlaylistNext,
58
59 PlaylistPrev,
61
62 PlaylistRemove(usize),
64
65 PlaylistShuffle,
67
68 Quit,
70
71 ScriptMessage(Vec<String>),
75
76 ScriptMessageTo { target: String, args: Vec<String> },
78
79 Seek { seconds: f64, option: SeekOptions },
81
82 Stop,
85
86 Unobserve(u64),
89}
90
91pub(crate) trait IntoRawCommandPart {
93 fn into_raw_command_part(self) -> String;
94}
95
96#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
98#[serde(untagged)]
99pub enum MpvDataType {
100 Array(Vec<MpvDataType>),
101 Bool(bool),
102 Double(f64),
103 HashMap(HashMap<String, MpvDataType>),
104 Null,
105 MinusOne,
106 Playlist(Playlist),
107 String(String),
108 Usize(usize),
109}
110
111#[derive(Debug, Clone, PartialEq, Eq, Default, Serialize, Deserialize)]
113pub struct Playlist(pub Vec<PlaylistEntry>);
114
115#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
117pub struct PlaylistEntry {
118 pub id: usize,
119 pub filename: String,
120 pub title: Option<String>,
121 pub current: bool,
122}
123
124#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
126pub enum PlaylistAddOptions {
127 Replace,
128 Append,
129}
130
131impl IntoRawCommandPart for PlaylistAddOptions {
132 fn into_raw_command_part(self) -> String {
133 match self {
134 PlaylistAddOptions::Replace => "replace".to_string(),
135 PlaylistAddOptions::Append => "append".to_string(),
136 }
137 }
138}
139
140#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
142pub enum SeekOptions {
143 Relative,
144 Absolute,
145 RelativePercent,
146 AbsolutePercent,
147}
148
149impl IntoRawCommandPart for SeekOptions {
150 fn into_raw_command_part(self) -> String {
151 match self {
152 SeekOptions::Relative => "relative".to_string(),
153 SeekOptions::Absolute => "absolute".to_string(),
154 SeekOptions::RelativePercent => "relative-percent".to_string(),
155 SeekOptions::AbsolutePercent => "absolute-percent".to_string(),
156 }
157 }
158}
159
160pub trait GetPropertyTypeHandler: Sized {
162 #[allow(async_fn_in_trait)]
164 async fn get_property_generic(instance: &Mpv, property: &str)
165 -> Result<Option<Self>, MpvError>;
166}
167
168impl<T> GetPropertyTypeHandler for T
169where
170 T: TypeHandler,
171{
172 async fn get_property_generic(instance: &Mpv, property: &str) -> Result<Option<T>, MpvError> {
173 instance
174 .get_property_value(property)
175 .await
176 .and_then(|value| match value {
177 Some(v) => T::get_value(v).map(|v| Some(v)),
178 None => Ok(None),
179 })
180 }
181}
182
183pub trait SetPropertyTypeHandler<T> {
185 #[allow(async_fn_in_trait)]
187 async fn set_property_generic(instance: &Mpv, property: &str, value: T)
188 -> Result<(), MpvError>;
189}
190
191impl<T> SetPropertyTypeHandler<T> for T
192where
193 T: Serialize,
194{
195 async fn set_property_generic(
196 instance: &Mpv,
197 property: &str,
198 value: T,
199 ) -> Result<(), MpvError> {
200 let (res_tx, res_rx) = oneshot::channel();
201 let value = serde_json::to_value(value).map_err(MpvError::JsonParseError)?;
202
203 instance
204 .command_sender
205 .send((
206 MpvIpcCommand::SetProperty(property.to_owned(), value.to_owned()),
207 res_tx,
208 ))
209 .await
210 .map_err(|err| MpvError::InternalConnectionError(err.to_string()))?;
211
212 match res_rx.await {
213 Ok(MpvIpcResponse(response)) => response.map(|_| ()),
214 Err(err) => Err(MpvError::InternalConnectionError(err.to_string())),
215 }
216 }
217}
218
219#[derive(Clone)]
228pub struct Mpv {
229 command_sender: mpsc::Sender<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
230 broadcast_channel: broadcast::Sender<MpvIpcEvent>,
231}
232
233impl fmt::Debug for Mpv {
235 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
236 fmt.debug_struct("Mpv").finish()
237 }
238}
239
240impl Mpv {
241 pub async fn connect(socket_path: &str) -> Result<Mpv, MpvError> {
244 log::debug!("Connecting to mpv socket at {}", socket_path);
245
246 let socket = match UnixStream::connect(socket_path).await {
247 Ok(stream) => Ok(stream),
248 Err(err) => Err(MpvError::MpvSocketConnectionError(err.to_string())),
249 }?;
250
251 Self::connect_socket(socket).await
252 }
253
254 pub async fn connect_socket(socket: UnixStream) -> Result<Mpv, MpvError> {
259 let (com_tx, com_rx) = mpsc::channel(100);
260 let (ev_tx, _) = broadcast::channel(100);
261 let ipc = MpvIpc::new(socket, com_rx, ev_tx.clone());
262
263 log::debug!("Starting IPC handler");
264 tokio::spawn(ipc.run());
265
266 Ok(Mpv {
267 command_sender: com_tx,
268 broadcast_channel: ev_tx,
269 })
270 }
271
272 pub async fn disconnect(&self) -> Result<(), MpvError> {
278 let (res_tx, res_rx) = oneshot::channel();
279 self.command_sender
280 .send((MpvIpcCommand::Exit, res_tx))
281 .await
282 .map_err(|err| MpvError::InternalConnectionError(err.to_string()))?;
283
284 match res_rx.await {
285 Ok(MpvIpcResponse(response)) => response.map(|_| ()),
286 Err(err) => Err(MpvError::InternalConnectionError(err.to_string())),
287 }
288 }
289
290 pub async fn get_event_stream(&self) -> impl futures::Stream<Item = Result<Event, MpvError>> {
295 tokio_stream::wrappers::BroadcastStream::new(self.broadcast_channel.subscribe()).map(
296 |event| match event {
297 Ok(event) => crate::event_parser::parse_event(event),
298 Err(err) => Err(MpvError::InternalConnectionError(err.to_string())),
299 },
300 )
301 }
302
303 pub async fn run_command_raw(
307 &self,
308 command: &str,
309 args: &[&str],
310 ) -> Result<Option<Value>, MpvError> {
311 let command_vec = Vec::from(
312 [command]
313 .iter()
314 .chain(args.iter())
315 .map(|s| s.to_string())
316 .collect::<Vec<String>>()
317 .as_slice(),
318 );
319 let (res_tx, res_rx) = oneshot::channel();
320 self.command_sender
321 .send((MpvIpcCommand::Command(command_vec.clone()), res_tx))
322 .await
323 .map_err(|err| MpvError::InternalConnectionError(err.to_string()))?;
324
325 match res_rx.await {
326 Ok(MpvIpcResponse(response)) => response,
327 Err(err) => Err(MpvError::InternalConnectionError(err.to_string())),
328 }
329 }
330
331 async fn run_command_raw_ignore_value(
333 &self,
334 command: &str,
335 args: &[&str],
336 ) -> Result<(), MpvError> {
337 self.run_command_raw(command, args).await.map(|_| ())
338 }
339
340 pub async fn run_command(&self, command: MpvCommand) -> Result<(), MpvError> {
369 log::trace!("Running command: {:?}", command);
370 let result = match command {
371 MpvCommand::LoadFile { file, option } => {
372 self.run_command_raw_ignore_value(
373 "loadfile",
374 &[file.as_ref(), option.into_raw_command_part().as_str()],
375 )
376 .await
377 }
378 MpvCommand::LoadList { file, option } => {
379 self.run_command_raw_ignore_value(
380 "loadlist",
381 &[file.as_ref(), option.into_raw_command_part().as_str()],
382 )
383 .await
384 }
385 MpvCommand::Observe { id, property } => {
386 let (res_tx, res_rx) = oneshot::channel();
387 self.command_sender
388 .send((MpvIpcCommand::ObserveProperty(id, property), res_tx))
389 .await
390 .map_err(|err| MpvError::InternalConnectionError(err.to_string()))?;
391
392 match res_rx.await {
393 Ok(MpvIpcResponse(response)) => response.map(|_| ()),
394 Err(err) => Err(MpvError::InternalConnectionError(err.to_string())),
395 }
396 }
397 MpvCommand::PlaylistClear => {
398 self.run_command_raw_ignore_value("playlist-clear", &[])
399 .await
400 }
401 MpvCommand::PlaylistMove { from, to } => {
402 self.run_command_raw_ignore_value(
403 "playlist-move",
404 &[&from.to_string(), &to.to_string()],
405 )
406 .await
407 }
408 MpvCommand::PlaylistNext => {
409 self.run_command_raw_ignore_value("playlist-next", &[])
410 .await
411 }
412 MpvCommand::PlaylistPrev => {
413 self.run_command_raw_ignore_value("playlist-prev", &[])
414 .await
415 }
416 MpvCommand::PlaylistRemove(id) => {
417 self.run_command_raw_ignore_value("playlist-remove", &[&id.to_string()])
418 .await
419 }
420 MpvCommand::PlaylistShuffle => {
421 self.run_command_raw_ignore_value("playlist-shuffle", &[])
422 .await
423 }
424 MpvCommand::Quit => self.run_command_raw_ignore_value("quit", &[]).await,
425 MpvCommand::ScriptMessage(args) => {
426 let str_args: Vec<_> = args.iter().map(String::as_str).collect();
427 self.run_command_raw_ignore_value("script-message", &str_args)
428 .await
429 }
430 MpvCommand::ScriptMessageTo { target, args } => {
431 let mut cmd_args: Vec<_> = vec![target.as_str()];
432 let mut str_args: Vec<_> = args.iter().map(String::as_str).collect();
433 cmd_args.append(&mut str_args);
434 self.run_command_raw_ignore_value("script-message-to", &cmd_args)
435 .await
436 }
437 MpvCommand::Seek { seconds, option } => {
438 self.run_command_raw_ignore_value(
439 "seek",
440 &[
441 &seconds.to_string(),
442 option.into_raw_command_part().as_str(),
443 ],
444 )
445 .await
446 }
447 MpvCommand::Stop => self.run_command_raw_ignore_value("stop", &[]).await,
448 MpvCommand::Unobserve(id) => {
449 let (res_tx, res_rx) = oneshot::channel();
450 self.command_sender
451 .send((MpvIpcCommand::UnobserveProperty(id), res_tx))
452 .await
453 .map_err(|err| MpvError::InternalConnectionError(err.to_string()))?;
454
455 match res_rx.await {
456 Ok(MpvIpcResponse(response)) => response.map(|_| ()),
457 Err(err) => Err(MpvError::InternalConnectionError(err.to_string())),
458 }
459 }
460 };
461 log::trace!("Command result: {:?}", result);
462 result
463 }
464
465 pub async fn get_property<T: GetPropertyTypeHandler>(
494 &self,
495 property: &str,
496 ) -> Result<Option<T>, MpvError> {
497 T::get_property_generic(self, property).await
498 }
499
500 pub async fn get_property_value(&self, property: &str) -> Result<Option<Value>, MpvError> {
522 let (res_tx, res_rx) = oneshot::channel();
523 self.command_sender
524 .send((MpvIpcCommand::GetProperty(property.to_owned()), res_tx))
525 .await
526 .map_err(|err| MpvError::InternalConnectionError(err.to_string()))?;
527
528 match res_rx.await {
529 Ok(MpvIpcResponse(response)) => response,
530 Err(err) => Err(MpvError::InternalConnectionError(err.to_string())),
531 }
532 }
533
534 pub async fn set_property<T>(&self, property: &str, value: T) -> Result<(), MpvError>
559 where
560 T: SetPropertyTypeHandler<T> + Clone + fmt::Debug,
561 {
562 T::set_property_generic(self, property, value.clone()).await
563 }
564}