mpvipc_async/
ipc.rs

1//! IPC handling thread/task. Handles communication between [`Mpv`](crate::Mpv) instances and mpv's unix socket
2
3use futures::{SinkExt, StreamExt};
4use serde_json::{json, Value};
5use tokio::{
6    net::UnixStream,
7    sync::{broadcast, mpsc, oneshot},
8};
9use tokio_util::codec::{Framed, LinesCodec};
10
11use crate::MpvError;
12
13/// Container for all state that regards communication with the mpv IPC socket
14/// and message passing with [`Mpv`](crate::Mpv) controllers.
15pub(crate) struct MpvIpc {
16    socket: Framed<UnixStream, LinesCodec>,
17    command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
18    event_channel: broadcast::Sender<MpvIpcEvent>,
19}
20
21/// Commands that can be sent to [`MpvIpc`]
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub(crate) enum MpvIpcCommand {
24    Command(Vec<String>),
25    GetProperty(String),
26    SetProperty(String, Value),
27    ObserveProperty(u64, String),
28    UnobserveProperty(u64),
29    Exit,
30}
31
32/// [`MpvIpc`]'s response to a [`MpvIpcCommand`].
33#[derive(Debug)]
34pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, MpvError>);
35
36/// A deserialized and partially parsed event from mpv.
37#[derive(Debug, Clone)]
38pub(crate) struct MpvIpcEvent(pub(crate) Value);
39
40impl MpvIpc {
41    pub(crate) fn new(
42        socket: UnixStream,
43        command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
44        event_channel: broadcast::Sender<MpvIpcEvent>,
45    ) -> Self {
46        MpvIpc {
47            socket: Framed::new(socket, LinesCodec::new()),
48            command_channel,
49            event_channel,
50        }
51    }
52
53    pub(crate) async fn send_command(
54        &mut self,
55        command: &[Value],
56    ) -> Result<Option<Value>, MpvError> {
57        let ipc_command = json!({ "command": command });
58        let ipc_command_str =
59            serde_json::to_string(&ipc_command).map_err(MpvError::JsonParseError)?;
60
61        log::trace!("Sending command: {}", ipc_command_str);
62
63        self.socket
64            .send(ipc_command_str)
65            .await
66            .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?;
67
68        let response = loop {
69            let response = self
70                .socket
71                .next()
72                .await
73                .ok_or(MpvError::MpvSocketConnectionError(
74                    "Could not receive response from mpv".to_owned(),
75                ))?
76                .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?;
77
78            let parsed_response =
79                serde_json::from_str::<Value>(&response).map_err(MpvError::JsonParseError);
80
81            if parsed_response
82                .as_ref()
83                .ok()
84                .and_then(|v| v.as_object().map(|o| o.contains_key("event")))
85                .unwrap_or(false)
86            {
87                self.handle_event(parsed_response).await;
88            } else {
89                break parsed_response;
90            }
91        };
92
93        log::trace!("Received response: {:?}", response);
94
95        parse_mpv_response_data(response?, command)
96    }
97
98    pub(crate) async fn get_mpv_property(
99        &mut self,
100        property: &str,
101    ) -> Result<Option<Value>, MpvError> {
102        self.send_command(&[json!("get_property"), json!(property)])
103            .await
104    }
105
106    pub(crate) async fn set_mpv_property(
107        &mut self,
108        property: &str,
109        value: Value,
110    ) -> Result<Option<Value>, MpvError> {
111        self.send_command(&[json!("set_property"), json!(property), value])
112            .await
113    }
114
115    pub(crate) async fn observe_property(
116        &mut self,
117        id: u64,
118        property: &str,
119    ) -> Result<Option<Value>, MpvError> {
120        self.send_command(&[json!("observe_property"), json!(id), json!(property)])
121            .await
122    }
123
124    pub(crate) async fn unobserve_property(&mut self, id: u64) -> Result<Option<Value>, MpvError> {
125        self.send_command(&[json!("unobserve_property"), json!(id)])
126            .await
127    }
128
129    async fn handle_event(&mut self, event: Result<Value, MpvError>) {
130        match &event {
131            Ok(event) => {
132                log::trace!("Parsed event: {:?}", event);
133                if let Err(broadcast::error::SendError(_)) =
134                    self.event_channel.send(MpvIpcEvent(event.to_owned()))
135                {
136                    log::trace!("Failed to send event to channel, ignoring");
137                }
138            }
139            Err(e) => {
140                log::trace!("Error parsing event, ignoring:\n  {:?}\n  {:?}", &event, e);
141            }
142        }
143    }
144
145    pub(crate) async fn run(mut self) -> Result<(), MpvError> {
146        loop {
147            tokio::select! {
148              Some(event) = self.socket.next() => {
149                log::trace!("Got event: {:?}", event);
150
151                let parsed_event = event
152                    .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))
153                    .and_then(|event|
154                        serde_json::from_str::<Value>(&event)
155                        .map_err(MpvError::JsonParseError));
156
157                self.handle_event(parsed_event).await;
158              }
159              Some((cmd, tx)) = self.command_channel.recv() => {
160                  log::trace!("Handling command: {:?}", cmd);
161                  match cmd {
162                      MpvIpcCommand::Command(command) => {
163                          let refs = command.iter().map(|s| json!(s)).collect::<Vec<Value>>();
164                          let response = self.send_command(refs.as_slice()).await;
165                          tx.send(MpvIpcResponse(response)).unwrap()
166                      }
167                      MpvIpcCommand::GetProperty(property) => {
168                          let response = self.get_mpv_property(&property).await;
169                          tx.send(MpvIpcResponse(response)).unwrap()
170                      }
171                      MpvIpcCommand::SetProperty(property, value) => {
172                          let response = self.set_mpv_property(&property, value).await;
173                          tx.send(MpvIpcResponse(response)).unwrap()
174                      }
175                      MpvIpcCommand::ObserveProperty(id, property) => {
176                          let response = self.observe_property(id, &property).await;
177                          tx.send(MpvIpcResponse(response)).unwrap()
178                      }
179                      MpvIpcCommand::UnobserveProperty(id) => {
180                          let response = self.unobserve_property(id).await;
181                          tx.send(MpvIpcResponse(response)).unwrap()
182                      }
183                      MpvIpcCommand::Exit => {
184                        tx.send(MpvIpcResponse(Ok(None))).unwrap();
185                        return Ok(());
186                      }
187                  }
188              }
189            }
190        }
191    }
192}
193
194/// This function does the most basic JSON parsing and error handling
195/// for status codes and errors that all responses from mpv are
196/// expected to contain.
197fn parse_mpv_response_data(value: Value, command: &[Value]) -> Result<Option<Value>, MpvError> {
198    log::trace!("Parsing mpv response data: {:?}", value);
199    let result = value
200        .as_object()
201        .ok_or(MpvError::ValueContainsUnexpectedType {
202            expected_type: "object".to_string(),
203            received: value.clone(),
204        })
205        .and_then(|o| {
206            let error = o
207                .get("error")
208                .ok_or(MpvError::MissingKeyInObject {
209                    key: "error".to_string(),
210                    map: o.clone(),
211                })?
212                .as_str()
213                .ok_or(MpvError::ValueContainsUnexpectedType {
214                    expected_type: "string".to_string(),
215                    received: o.get("error").unwrap().clone(),
216                })?;
217
218            let data = o.get("data");
219
220            Ok((error, data))
221        })
222        .and_then(|(error, data)| match error {
223            "success" => Ok(data),
224            "property unavailable" => Ok(None),
225            err => Err(MpvError::MpvError {
226                command: command.to_owned(),
227                message: err.to_string(),
228            }),
229        });
230
231    match &result {
232        Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v),
233        Err(e) => log::trace!("Error parsing mpv response data: {:?}", e),
234    }
235
236    result.map(|opt| opt.cloned())
237}