1use futures::{SinkExt, StreamExt};
4use serde_json::{json, Value};
5use tokio::{
6 net::UnixStream,
7 sync::{broadcast, mpsc, oneshot},
8};
9use tokio_util::codec::{Framed, LinesCodec};
10
11use crate::MpvError;
12
13pub(crate) struct MpvIpc {
16 socket: Framed<UnixStream, LinesCodec>,
17 command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
18 event_channel: broadcast::Sender<MpvIpcEvent>,
19}
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub(crate) enum MpvIpcCommand {
24 Command(Vec<String>),
25 GetProperty(String),
26 SetProperty(String, Value),
27 ObserveProperty(u64, String),
28 UnobserveProperty(u64),
29 Exit,
30}
31
32#[derive(Debug)]
34pub(crate) struct MpvIpcResponse(pub(crate) Result<Option<Value>, MpvError>);
35
36#[derive(Debug, Clone)]
38pub(crate) struct MpvIpcEvent(pub(crate) Value);
39
40impl MpvIpc {
41 pub(crate) fn new(
42 socket: UnixStream,
43 command_channel: mpsc::Receiver<(MpvIpcCommand, oneshot::Sender<MpvIpcResponse>)>,
44 event_channel: broadcast::Sender<MpvIpcEvent>,
45 ) -> Self {
46 MpvIpc {
47 socket: Framed::new(socket, LinesCodec::new()),
48 command_channel,
49 event_channel,
50 }
51 }
52
53 pub(crate) async fn send_command(
54 &mut self,
55 command: &[Value],
56 ) -> Result<Option<Value>, MpvError> {
57 let ipc_command = json!({ "command": command });
58 let ipc_command_str =
59 serde_json::to_string(&ipc_command).map_err(MpvError::JsonParseError)?;
60
61 log::trace!("Sending command: {}", ipc_command_str);
62
63 self.socket
64 .send(ipc_command_str)
65 .await
66 .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?;
67
68 let response = loop {
69 let response = self
70 .socket
71 .next()
72 .await
73 .ok_or(MpvError::MpvSocketConnectionError(
74 "Could not receive response from mpv".to_owned(),
75 ))?
76 .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))?;
77
78 let parsed_response =
79 serde_json::from_str::<Value>(&response).map_err(MpvError::JsonParseError);
80
81 if parsed_response
82 .as_ref()
83 .ok()
84 .and_then(|v| v.as_object().map(|o| o.contains_key("event")))
85 .unwrap_or(false)
86 {
87 self.handle_event(parsed_response).await;
88 } else {
89 break parsed_response;
90 }
91 };
92
93 log::trace!("Received response: {:?}", response);
94
95 parse_mpv_response_data(response?, command)
96 }
97
98 pub(crate) async fn get_mpv_property(
99 &mut self,
100 property: &str,
101 ) -> Result<Option<Value>, MpvError> {
102 self.send_command(&[json!("get_property"), json!(property)])
103 .await
104 }
105
106 pub(crate) async fn set_mpv_property(
107 &mut self,
108 property: &str,
109 value: Value,
110 ) -> Result<Option<Value>, MpvError> {
111 self.send_command(&[json!("set_property"), json!(property), value])
112 .await
113 }
114
115 pub(crate) async fn observe_property(
116 &mut self,
117 id: u64,
118 property: &str,
119 ) -> Result<Option<Value>, MpvError> {
120 self.send_command(&[json!("observe_property"), json!(id), json!(property)])
121 .await
122 }
123
124 pub(crate) async fn unobserve_property(&mut self, id: u64) -> Result<Option<Value>, MpvError> {
125 self.send_command(&[json!("unobserve_property"), json!(id)])
126 .await
127 }
128
129 async fn handle_event(&mut self, event: Result<Value, MpvError>) {
130 match &event {
131 Ok(event) => {
132 log::trace!("Parsed event: {:?}", event);
133 if let Err(broadcast::error::SendError(_)) =
134 self.event_channel.send(MpvIpcEvent(event.to_owned()))
135 {
136 log::trace!("Failed to send event to channel, ignoring");
137 }
138 }
139 Err(e) => {
140 log::trace!("Error parsing event, ignoring:\n {:?}\n {:?}", &event, e);
141 }
142 }
143 }
144
145 pub(crate) async fn run(mut self) -> Result<(), MpvError> {
146 loop {
147 tokio::select! {
148 Some(event) = self.socket.next() => {
149 log::trace!("Got event: {:?}", event);
150
151 let parsed_event = event
152 .map_err(|why| MpvError::MpvSocketConnectionError(why.to_string()))
153 .and_then(|event|
154 serde_json::from_str::<Value>(&event)
155 .map_err(MpvError::JsonParseError));
156
157 self.handle_event(parsed_event).await;
158 }
159 Some((cmd, tx)) = self.command_channel.recv() => {
160 log::trace!("Handling command: {:?}", cmd);
161 match cmd {
162 MpvIpcCommand::Command(command) => {
163 let refs = command.iter().map(|s| json!(s)).collect::<Vec<Value>>();
164 let response = self.send_command(refs.as_slice()).await;
165 tx.send(MpvIpcResponse(response)).unwrap()
166 }
167 MpvIpcCommand::GetProperty(property) => {
168 let response = self.get_mpv_property(&property).await;
169 tx.send(MpvIpcResponse(response)).unwrap()
170 }
171 MpvIpcCommand::SetProperty(property, value) => {
172 let response = self.set_mpv_property(&property, value).await;
173 tx.send(MpvIpcResponse(response)).unwrap()
174 }
175 MpvIpcCommand::ObserveProperty(id, property) => {
176 let response = self.observe_property(id, &property).await;
177 tx.send(MpvIpcResponse(response)).unwrap()
178 }
179 MpvIpcCommand::UnobserveProperty(id) => {
180 let response = self.unobserve_property(id).await;
181 tx.send(MpvIpcResponse(response)).unwrap()
182 }
183 MpvIpcCommand::Exit => {
184 tx.send(MpvIpcResponse(Ok(None))).unwrap();
185 return Ok(());
186 }
187 }
188 }
189 }
190 }
191 }
192}
193
194fn parse_mpv_response_data(value: Value, command: &[Value]) -> Result<Option<Value>, MpvError> {
198 log::trace!("Parsing mpv response data: {:?}", value);
199 let result = value
200 .as_object()
201 .ok_or(MpvError::ValueContainsUnexpectedType {
202 expected_type: "object".to_string(),
203 received: value.clone(),
204 })
205 .and_then(|o| {
206 let error = o
207 .get("error")
208 .ok_or(MpvError::MissingKeyInObject {
209 key: "error".to_string(),
210 map: o.clone(),
211 })?
212 .as_str()
213 .ok_or(MpvError::ValueContainsUnexpectedType {
214 expected_type: "string".to_string(),
215 received: o.get("error").unwrap().clone(),
216 })?;
217
218 let data = o.get("data");
219
220 Ok((error, data))
221 })
222 .and_then(|(error, data)| match error {
223 "success" => Ok(data),
224 "property unavailable" => Ok(None),
225 err => Err(MpvError::MpvError {
226 command: command.to_owned(),
227 message: err.to_string(),
228 }),
229 });
230
231 match &result {
232 Ok(v) => log::trace!("Successfully parsed mpv response data: {:?}", v),
233 Err(e) => log::trace!("Error parsing mpv response data: {:?}", e),
234 }
235
236 result.map(|opt| opt.cloned())
237}