1
//! IPC handling thread/task. Handles communication between [`Mpv`](crate::Mpv) instances and mpv's unix socket
2

            
3
use futures::{SinkExt, StreamExt};
4
use serde_json::{json, Value};
5
use tokio::{
6
    net::UnixStream,
7
    sync::{broadcast, mpsc, oneshot},
8
};
9
use tokio_util::codec::{Framed, LinesCodec};
10

            
11
use crate::MpvError;
12

            
13
/// Container for all state that regards communication with the mpv IPC socket
14
/// and message passing with [`Mpv`](crate::Mpv) controllers.
15
pub(crate) struct MpvIpc {
16
    socket: Framed<UnixStream, LinesCodec>,
17
    command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
18
    event_channel: broadcast::Sender<MpvIpcEvent>,
19
}
20

            
21
/// Commands that can be sent to [`MpvIpc`]
22
#[derive(Debug, Clone, PartialEq, Eq)]
23
pub(crate) enum MpvIpcCommand {
24
    Command(Vec<String>),
25
    GetProperty(String),
26
    SetProperty(String, Value),
27
    ObserveProperty(u64, String),
28
    UnobserveProperty(u64),
29
    Exit,
30
}
31

            
32
/// [`MpvIpc`]'s response to a [`MpvIpcCommand`].
33
#[derive(Debug)]
34
pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, MpvError>);
35

            
36
/// A deserialized and partially parsed event from mpv.
37
#[derive(Debug, Clone)]
38
pub(crate) struct MpvIpcEvent(pub(crate) Value);
39

            
40
impl MpvIpc {
41
42
    pub(crate) fn new(
42
42
        socket: UnixStream,
43
42
        command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
44
42
        event_channel: broadcast::Sender<MpvIpcEvent>,
45
42
    ) -> Self {
46
42
        MpvIpc {
47
42
            socket: Framed::new(socket, LinesCodec::new()),
48
42
            command_channel,
49
42
            event_channel,
50
42
        }
51
42
    }
52

            
53
6040
    pub(crate) async fn send_command(
54
6040
        &mut self,
55
6040
        command: &[Value],
56
6040
    ) -> Result<Option<Value>, MpvError> {
57
3020
        let ipc_command = json!({ "command": command });
58
3020
        let ipc_command_str =
59
3020
            serde_json::to_string(&ipc_command).map_err(MpvError::JsonParseError)?;
60

            
61
3020
        log::trace!("Sending command: {}", ipc_command_str);
62

            
63
3020
        self.socket
64
3020
            .send(ipc_command_str)
65
3020
            .await
66
3020
            .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?;
67

            
68
3016
        let response = loop {
69
3019
            let response = self
70
3019
                .socket
71
3019
                .next()
72
3019
                .await
73
3017
                .ok_or(MpvError::MpvSocketConnectionError(
74
3017
                    "Could not receive response from mpv".to_owned(),
75
3017
                ))?
76
3017
                .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?;
77

            
78
3017
            let parsed_response =
79
3017
                serde_json::from_str::<Value>(&response).map_err(MpvError::JsonParseError);
80
3017

            
81
3017
            if parsed_response
82
3017
                .as_ref()
83
3017
                .ok()
84
3017
                .and_then(|v| v.as_object().map(|o| o.contains_key("event")))
85
3017
                .unwrap_or(false)
86
            {
87
1
                self.handle_event(parsed_response).await;
88
            } else {
89
3016
                break parsed_response;
90
3016
            }
91
3016
        };
92
3016

            
93
3016
        log::trace!("Received response: {:?}", response);
94

            
95
3016
        parse_mpv_response_data(response?, command)
96
3018
    }
97

            
98
2884
    pub(crate) async fn get_mpv_property(
99
2884
        &mut self,
100
2884
        property: &str,
101
2884
    ) -> Result<Option<Value>, MpvError> {
102
1442
        self.send_command(&[json!("get_property"), json!(property)])
103
1442
            .await
104
1441
    }
105

            
106
3130
    pub(crate) async fn set_mpv_property(
107
3130
        &mut self,
108
3130
        property: &str,
109
3130
        value: Value,
110
3130
    ) -> Result<Option<Value>, MpvError> {
111
1565
        self.send_command(&[json!("set_property"), json!(property), value])
112
1565
            .await
113
1564
    }
114

            
115
10
    pub(crate) async fn observe_property(
116
10
        &mut self,
117
10
        id: u64,
118
10
        property: &str,
119
10
    ) -> Result<Option<Value>, MpvError> {
120
5
        self.send_command(&[json!("observe_property"), json!(id), json!(property)])
121
5
            .await
122
5
    }
123

            
124
    pub(crate) async fn unobserve_property(&mut self, id: u64) -> Result<Option<Value>, MpvError> {
125
        self.send_command(&[json!("unobserve_property"), json!(id)])
126
            .await
127
    }
128

            
129
24
    async fn handle_event(&mut self, event: Result<Value, MpvError>) {
130
12
        match &event {
131
12
            Ok(event) => {
132
12
                log::trace!("Parsed event: {:?}", event);
133
                if let Err(broadcast::error::SendError(_)) =
134
12
                    self.event_channel.send(MpvIpcEvent(event.to_owned()))
135
                {
136
                    log::trace!("Failed to send event to channel, ignoring");
137
12
                }
138
            }
139
            Err(e) => {
140
                log::trace!("Error parsing event, ignoring:\n  {:?}\n  {:?}", &event, e);
141
            }
142
        }
143
12
    }
144

            
145
42
    pub(crate) async fn run(mut self) -> Result<(), MpvError> {
146
        loop {
147
3050
            tokio::select! {
148
3050
              Some(event) = self.socket.next() => {
149
11
                log::trace!("Got event: {:?}", event);
150

            
151
11
                let parsed_event = event
152
11
                    .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))
153
11
                    .and_then(|event|
154
11
                        serde_json::from_str::<Value>(&event)
155
11
                        .map_err(MpvError::JsonParseError));
156
11

            
157
11
                self.handle_event(parsed_event).await;
158
              }
159
3050
              Some((cmd, tx)) = self.command_channel.recv() => {
160
3020
                  log::trace!("Handling command: {:?}", cmd);
161
3020
                  match cmd {
162
8
                      MpvIpcCommand::Command(command) => {
163
8
                          let refs = command.iter().map(|s| json!(s)).collect::<Vec<Value>>();
164
8
                          let response = self.send_command(refs.as_slice()).await;
165
8
                          tx.send(MpvIpcResponse(response)).unwrap()
166
                      }
167
1442
                      MpvIpcCommand::GetProperty(property) => {
168
1442
                          let response = self.get_mpv_property(&property).await;
169
1441
                          tx.send(MpvIpcResponse(response)).unwrap()
170
                      }
171
1565
                      MpvIpcCommand::SetProperty(property, value) => {
172
1565
                          let response = self.set_mpv_property(&property, value).await;
173
1564
                          tx.send(MpvIpcResponse(response)).unwrap()
174
                      }
175
5
                      MpvIpcCommand::ObserveProperty(id, property) => {
176
5
                          let response = self.observe_property(id, &property).await;
177
5
                          tx.send(MpvIpcResponse(response)).unwrap()
178
                      }
179
                      MpvIpcCommand::UnobserveProperty(id) => {
180
                          let response = self.unobserve_property(id).await;
181
                          tx.send(MpvIpcResponse(response)).unwrap()
182
                      }
183
                      MpvIpcCommand::Exit => {
184
                        tx.send(MpvIpcResponse(Ok(None))).unwrap();
185
                        return Ok(());
186
                      }
187
                  }
188
              }
189
            }
190
        }
191
    }
192
}
193

            
194
/// This function does the most basic JSON parsing and error handling
195
/// for status codes and errors that all responses from mpv are
196
/// expected to contain.
197
6032
fn parse_mpv_response_data(value: Value, command: &[Value]) -> Result<Option<Value>, MpvError> {
198
6032
    log::trace!("Parsing mpv response data: {:?}", value);
199
6032
    let result = value
200
6032
        .as_object()
201
6032
        .ok_or(MpvError::ValueContainsUnexpectedType {
202
6032
            expected_type: "object".to_string(),
203
6032
            received: value.clone(),
204
6032
        })
205
6032
        .and_then(|o| {
206
6032
            let error = o
207
6032
                .get("error")
208
6032
                .ok_or(MpvError::MissingKeyInObject {
209
6032
                    key: "error".to_string(),
210
6032
                    map: o.clone(),
211
6032
                })?
212
6032
                .as_str()
213
6032
                .ok_or(MpvError::ValueContainsUnexpectedType {
214
6032
                    expected_type: "string".to_string(),
215
6032
                    received: o.get("error").unwrap().clone(),
216
6032
                })?;
217

            
218
6032
            let data = o.get("data");
219
6032

            
220
6032
            Ok((error, data))
221
6032
        })
222
6032
        .and_then(|(error, data)| match error {
223
6032
            "success" => Ok(data),
224
274
            "property unavailable" => Ok(None),
225
270
            err => Err(MpvError::MpvError {
226
270
                command: command.to_owned(),
227
270
                message: err.to_string(),
228
270
            }),
229
6032
        });
230
6032

            
231
6032
    match &result {
232
5762
        Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v),
233
270
        Err(e) => log::trace!("Error parsing mpv response data: {:?}", e),
234
    }
235

            
236
6032
    result.map(|opt| opt.cloned())
237
6032
}