zbus/proxy/
mod.rs

1//! The client-side proxy API.
2
3use enumflags2::{bitflags, BitFlags};
4use event_listener::{Event, EventListener};
5use futures_core::{ready, stream};
6use ordered_stream::{join as join_streams, FromFuture, Join, Map, OrderedStream, PollResult};
7use std::{
8    collections::{HashMap, HashSet},
9    fmt,
10    future::Future,
11    ops::Deref,
12    pin::Pin,
13    sync::{Arc, OnceLock, RwLock, RwLockReadGuard},
14    task::{Context, Poll},
15};
16use tracing::{debug, info_span, instrument, trace, Instrument};
17
18use zbus_names::{BusName, InterfaceName, MemberName, UniqueName};
19use zvariant::{ObjectPath, OwnedValue, Str, Value};
20
21use crate::{
22    fdo::{self, IntrospectableProxy, NameOwnerChanged, PropertiesChangedStream, PropertiesProxy},
23    message::{Flags, Message, Sequence, Type},
24    AsyncDrop, Connection, Error, Executor, MatchRule, MessageStream, OwnedMatchRule, Result, Task,
25};
26
27mod builder;
28pub use builder::{Builder, CacheProperties};
29
30mod defaults;
31pub use defaults::Defaults;
32
33/// A client-side interface proxy.
34///
35/// A `Proxy` is a helper to interact with an interface on a remote object.
36///
37/// # Example
38///
39/// ```
40/// use std::result::Result;
41/// use std::error::Error;
42/// use zbus::{Connection, Proxy};
43///
44/// #[tokio::main]
45/// async fn main() -> Result<(), Box<dyn Error>> {
46///     let connection = Connection::session().await?;
47///     let p = Proxy::new(
48///         &connection,
49///         "org.freedesktop.DBus",
50///         "/org/freedesktop/DBus",
51///         "org.freedesktop.DBus",
52///     ).await?;
53///     // owned return value
54///     let _id: String = p.call("GetId", &()).await?;
55///     // borrowed return value
56///     let body = p.call_method("GetId", &()).await?.body();
57///     let _id: &str = body.deserialize()?;
58///
59///     Ok(())
60/// }
61/// ```
62///
63/// # Note
64///
65/// It is recommended to use the [`proxy`] macro, which provides a more convenient and
66/// type-safe *façade* `Proxy` derived from a Rust trait.
67///
68/// [`futures` crate]: https://crates.io/crates/futures
69/// [`proxy`]: attr.proxy.html
70#[derive(Clone, Debug)]
71pub struct Proxy<'a> {
72    pub(crate) inner: Arc<ProxyInner<'a>>,
73}
74
75/// This is required to avoid having the Drop impl extend the lifetime 'a, which breaks zbus_xmlgen
76/// (and possibly other crates).
77pub(crate) struct ProxyInnerStatic {
78    pub(crate) conn: Connection,
79    dest_owner_change_match_rule: OnceLock<OwnedMatchRule>,
80}
81
82impl fmt::Debug for ProxyInnerStatic {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        f.debug_struct("ProxyInnerStatic")
85            .field(
86                "dest_owner_change_match_rule",
87                &self.dest_owner_change_match_rule,
88            )
89            .finish_non_exhaustive()
90    }
91}
92
93#[derive(Debug)]
94pub(crate) struct ProxyInner<'a> {
95    inner_without_borrows: ProxyInnerStatic,
96    pub(crate) destination: BusName<'a>,
97    pub(crate) path: ObjectPath<'a>,
98    pub(crate) interface: InterfaceName<'a>,
99
100    /// Cache of property values.
101    property_cache: Option<OnceLock<(Arc<PropertiesCache>, Task<()>)>>,
102    /// Set of properties which do not get cached, by name.
103    /// This overrides proxy-level caching behavior.
104    uncached_properties: HashSet<Str<'a>>,
105}
106
107impl Drop for ProxyInnerStatic {
108    fn drop(&mut self) {
109        if let Some(rule) = self.dest_owner_change_match_rule.take() {
110            self.conn.queue_remove_match(rule);
111        }
112    }
113}
114
115/// A property changed event.
116///
117/// The property changed event generated by [`PropertyStream`].
118pub struct PropertyChanged<'a, T> {
119    name: &'a str,
120    properties: Arc<PropertiesCache>,
121    proxy: Proxy<'a>,
122    phantom: std::marker::PhantomData<T>,
123}
124
125impl<T> PropertyChanged<'_, T> {
126    /// The name of the property that changed.
127    pub fn name(&self) -> &str {
128        self.name
129    }
130
131    /// Get the raw value of the property that changed.
132    ///
133    /// If the notification signal contained the new value, it has been cached already and this call
134    /// will return that value. Otherwise (i.e. invalidated property), a D-Bus call is made to fetch
135    /// and cache the new value.
136    pub async fn get_raw(&self) -> Result<impl Deref<Target = Value<'static>> + '_> {
137        struct Wrapper<'w> {
138            name: &'w str,
139            values: RwLockReadGuard<'w, HashMap<String, PropertyValue>>,
140        }
141
142        impl Deref for Wrapper<'_> {
143            type Target = Value<'static>;
144
145            fn deref(&self) -> &Self::Target {
146                self.values
147                    .get(self.name)
148                    .expect("PropertyStream with no corresponding property")
149                    .value
150                    .as_ref()
151                    .expect("PropertyStream with no corresponding property")
152            }
153        }
154
155        {
156            let values = self.properties.values.read().expect("lock poisoned");
157            if values
158                .get(self.name)
159                .expect("PropertyStream with no corresponding property")
160                .value
161                .is_some()
162            {
163                return Ok(Wrapper {
164                    name: self.name,
165                    values,
166                });
167            }
168        }
169
170        // The property was invalidated, so we need to fetch the new value.
171        let properties_proxy = self.proxy.properties_proxy();
172        let value = properties_proxy
173            .get(self.proxy.inner.interface.clone(), self.name)
174            .await
175            .map_err(crate::Error::from)?;
176
177        // Save the new value
178        {
179            let mut values = self.properties.values.write().expect("lock poisoned");
180
181            values
182                .get_mut(self.name)
183                .expect("PropertyStream with no corresponding property")
184                .value = Some(value);
185        }
186
187        Ok(Wrapper {
188            name: self.name,
189            values: self.properties.values.read().expect("lock poisoned"),
190        })
191    }
192}
193
194impl<T> PropertyChanged<'_, T>
195where
196    T: TryFrom<zvariant::OwnedValue>,
197    T::Error: Into<crate::Error>,
198{
199    /// Get the value of the property that changed.
200    ///
201    /// If the notification signal contained the new value, it has been cached already and this call
202    /// will return that value. Otherwise (i.e. invalidated property), a D-Bus call is made to fetch
203    /// and cache the new value.
204    pub async fn get(&self) -> Result<T> {
205        self.get_raw()
206            .await
207            .and_then(|v| T::try_from(OwnedValue::try_from(&*v)?).map_err(Into::into))
208    }
209}
210
211/// A [`stream::Stream`] implementation that yields property change notifications.
212///
213/// Use [`Proxy::receive_property_changed`] to create an instance of this type.
214#[derive(Debug)]
215pub struct PropertyStream<'a, T> {
216    name: &'a str,
217    proxy: Proxy<'a>,
218    changed_listener: EventListener,
219    phantom: std::marker::PhantomData<T>,
220}
221
222impl<'a, T> stream::Stream for PropertyStream<'a, T>
223where
224    T: Unpin,
225{
226    type Item = PropertyChanged<'a, T>;
227
228    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
229        let m = self.get_mut();
230        let properties = match m.proxy.get_property_cache() {
231            Some(properties) => properties.clone(),
232            // With no cache, we will get no updates; return immediately
233            None => return Poll::Ready(None),
234        };
235        ready!(Pin::new(&mut m.changed_listener).poll(cx));
236
237        m.changed_listener = properties
238            .values
239            .read()
240            .expect("lock poisoned")
241            .get(m.name)
242            .expect("PropertyStream with no corresponding property")
243            .event
244            .listen();
245
246        Poll::Ready(Some(PropertyChanged {
247            name: m.name,
248            properties,
249            proxy: m.proxy.clone(),
250            phantom: std::marker::PhantomData,
251        }))
252    }
253}
254
255#[derive(Debug)]
256pub(crate) struct PropertiesCache {
257    values: RwLock<HashMap<String, PropertyValue>>,
258    caching_result: RwLock<CachingResult>,
259}
260
261#[derive(Debug)]
262enum CachingResult {
263    Caching { ready: Event },
264    Cached { result: Result<()> },
265}
266
267impl PropertiesCache {
268    #[instrument(skip_all)]
269    fn new(
270        proxy: PropertiesProxy<'static>,
271        interface: InterfaceName<'static>,
272        executor: &Executor<'_>,
273        uncached_properties: HashSet<zvariant::Str<'static>>,
274    ) -> (Arc<Self>, Task<()>) {
275        let cache = Arc::new(PropertiesCache {
276            values: Default::default(),
277            caching_result: RwLock::new(CachingResult::Caching {
278                ready: Event::new(),
279            }),
280        });
281
282        let cache_clone = cache.clone();
283        let task_name = format!("{interface} proxy caching");
284        let proxy_caching = async move {
285            let result = cache_clone
286                .init(proxy, interface, uncached_properties)
287                .await;
288            let (prop_changes, interface, uncached_properties) = {
289                let mut caching_result = cache_clone.caching_result.write().expect("lock poisoned");
290                let ready = match &*caching_result {
291                    CachingResult::Caching { ready } => ready,
292                    // SAFETY: This is the only part of the code that changes this state and it's
293                    // only run once.
294                    _ => unreachable!(),
295                };
296                match result {
297                    Ok((prop_changes, interface, uncached_properties)) => {
298                        ready.notify(usize::MAX);
299                        *caching_result = CachingResult::Cached { result: Ok(()) };
300
301                        (prop_changes, interface, uncached_properties)
302                    }
303                    Err(e) => {
304                        ready.notify(usize::MAX);
305                        *caching_result = CachingResult::Cached { result: Err(e) };
306
307                        return;
308                    }
309                }
310            };
311
312            if let Err(e) = cache_clone
313                .keep_updated(prop_changes, interface, uncached_properties)
314                .await
315            {
316                debug!("Error keeping properties cache updated: {e}");
317            }
318        }
319        .instrument(info_span!("{}", task_name));
320        let task = executor.spawn(proxy_caching, &task_name);
321
322        (cache, task)
323    }
324
325    /// new() runs this in a task it spawns for initialization of properties cache.
326    async fn init(
327        &self,
328        proxy: PropertiesProxy<'static>,
329        interface: InterfaceName<'static>,
330        uncached_properties: HashSet<zvariant::Str<'static>>,
331    ) -> Result<(
332        PropertiesChangedStream,
333        InterfaceName<'static>,
334        HashSet<zvariant::Str<'static>>,
335    )> {
336        use ordered_stream::OrderedStreamExt;
337
338        let prop_changes = proxy.receive_properties_changed().await?.map(Either::Left);
339
340        let get_all = proxy
341            .inner()
342            .connection()
343            .call_method_raw(
344                Some(proxy.inner().destination()),
345                proxy.inner().path(),
346                Some(proxy.inner().interface()),
347                "GetAll",
348                BitFlags::empty(),
349                &interface,
350            )
351            .await
352            .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
353
354        let mut join = join_streams(prop_changes, get_all);
355
356        loop {
357            match join.next().await {
358                Some(Either::Left(_update)) => {
359                    // discard updates prior to the initial population
360                }
361                Some(Either::Right(populate)) => {
362                    populate?.body().deserialize().map(|values| {
363                        self.update_cache(&uncached_properties, &values, &[], &interface);
364                    })?;
365                    break;
366                }
367                None => break,
368            }
369        }
370        if let Some((Either::Left(update), _)) = Pin::new(&mut join).take_buffered() {
371            // if an update was buffered, then it happened after the get_all returned and needs to
372            // be applied before we discard the join
373            if let Ok(args) = update.args() {
374                if args.interface_name == interface {
375                    self.update_cache(
376                        &uncached_properties,
377                        &args.changed_properties,
378                        &args.invalidated_properties,
379                        &interface,
380                    );
381                }
382            }
383        }
384        // This is needed to avoid a "implementation of `OrderedStream` is not general enough"
385        // error that occurs if you apply the map and join to Pin::new(&mut prop_changes) instead
386        // of directly to the stream.
387        let prop_changes = join.into_inner().0.into_inner();
388
389        Ok((prop_changes, interface, uncached_properties))
390    }
391
392    /// new() runs this in a task it spawns for keeping the cache in sync.
393    #[instrument(skip_all)]
394    async fn keep_updated(
395        &self,
396        mut prop_changes: PropertiesChangedStream,
397        interface: InterfaceName<'static>,
398        uncached_properties: HashSet<zvariant::Str<'static>>,
399    ) -> Result<()> {
400        use futures_lite::StreamExt;
401
402        trace!("Listening for property changes on {interface}...");
403        while let Some(update) = prop_changes.next().await {
404            if let Ok(args) = update.args() {
405                if args.interface_name == interface {
406                    self.update_cache(
407                        &uncached_properties,
408                        &args.changed_properties,
409                        &args.invalidated_properties,
410                        &interface,
411                    );
412                }
413            }
414        }
415
416        Ok(())
417    }
418
419    fn update_cache(
420        &self,
421        uncached_properties: &HashSet<Str<'_>>,
422        changed: &HashMap<&str, Value<'_>>,
423        invalidated: &[&str],
424        interface: &InterfaceName<'_>,
425    ) {
426        let mut values = self.values.write().expect("lock poisoned");
427
428        for inval in invalidated {
429            if uncached_properties.contains(&Str::from(*inval)) {
430                debug!(
431                    "Ignoring invalidation of uncached property `{}.{}`",
432                    interface, inval
433                );
434                continue;
435            }
436            trace!("Property `{interface}.{inval}` invalidated");
437
438            if let Some(entry) = values.get_mut(*inval) {
439                entry.value = None;
440                entry.event.notify(usize::MAX);
441            }
442        }
443
444        for (property_name, value) in changed {
445            if uncached_properties.contains(&Str::from(*property_name)) {
446                debug!(
447                    "Ignoring update of uncached property `{}.{}`",
448                    interface, property_name
449                );
450                continue;
451            }
452            trace!("Property `{interface}.{property_name}` updated");
453
454            let entry = values.entry(property_name.to_string()).or_default();
455
456            let value = match OwnedValue::try_from(value) {
457                Ok(value) => value,
458                Err(e) => {
459                    debug!(
460                        "Failed to convert property `{interface}.{property_name}` to OwnedValue: {e}"
461                    );
462                    continue;
463                }
464            };
465            entry.value = Some(value);
466            entry.event.notify(usize::MAX);
467        }
468    }
469
470    /// Wait for the cache to be populated and return any error encountered during population.
471    pub(crate) async fn ready(&self) -> Result<()> {
472        let listener = match &*self.caching_result.read().expect("lock poisoned") {
473            CachingResult::Caching { ready } => ready.listen(),
474            CachingResult::Cached { result } => return result.clone(),
475        };
476        listener.await;
477
478        // It must be ready now.
479        match &*self.caching_result.read().expect("lock poisoned") {
480            // SAFETY: We were just notified that state has changed to `Cached` and we never go back
481            // to `Caching` once in `Cached`.
482            CachingResult::Caching { .. } => unreachable!(),
483            CachingResult::Cached { result } => result.clone(),
484        }
485    }
486}
487
488impl<'a> ProxyInner<'a> {
489    pub(crate) fn new(
490        conn: Connection,
491        destination: BusName<'a>,
492        path: ObjectPath<'a>,
493        interface: InterfaceName<'a>,
494        cache: CacheProperties,
495        uncached_properties: HashSet<Str<'a>>,
496    ) -> Self {
497        let property_cache = match cache {
498            CacheProperties::Yes | CacheProperties::Lazily => Some(OnceLock::new()),
499            CacheProperties::No => None,
500        };
501        Self {
502            inner_without_borrows: ProxyInnerStatic {
503                conn,
504                dest_owner_change_match_rule: OnceLock::new(),
505            },
506            destination,
507            path,
508            interface,
509            property_cache,
510            uncached_properties,
511        }
512    }
513
514    /// Subscribe to the "NameOwnerChanged" signal on the bus for our destination.
515    ///
516    /// If the destination is a unique name, we will not subscribe to the signal.
517    pub(crate) async fn subscribe_dest_owner_change(&self) -> Result<()> {
518        if !self.inner_without_borrows.conn.is_bus() {
519            // Names don't mean much outside the bus context.
520            return Ok(());
521        }
522
523        let well_known_name = match &self.destination {
524            BusName::WellKnown(well_known_name) => well_known_name,
525            BusName::Unique(_) => return Ok(()),
526        };
527
528        if self
529            .inner_without_borrows
530            .dest_owner_change_match_rule
531            .get()
532            .is_some()
533        {
534            // Already watching over the bus for any name updates so nothing to do here.
535            return Ok(());
536        }
537
538        let conn = &self.inner_without_borrows.conn;
539        let signal_rule: OwnedMatchRule = MatchRule::builder()
540            .msg_type(Type::Signal)
541            .sender("org.freedesktop.DBus")?
542            .path("/org/freedesktop/DBus")?
543            .interface("org.freedesktop.DBus")?
544            .member("NameOwnerChanged")?
545            .add_arg(well_known_name.as_str())?
546            .build()
547            .to_owned()
548            .into();
549
550        conn.add_match(
551            signal_rule.clone(),
552            Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
553        )
554        .await?;
555
556        if self
557            .inner_without_borrows
558            .dest_owner_change_match_rule
559            .set(signal_rule.clone())
560            .is_err()
561        {
562            // we raced another destination_unique_name call and added it twice
563            conn.remove_match(signal_rule).await?;
564        }
565
566        Ok(())
567    }
568}
569
570const MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED: usize = 8;
571
572impl<'a> Proxy<'a> {
573    /// Create a new `Proxy` for the given destination/path/interface.
574    pub async fn new<D, P, I>(
575        conn: &Connection,
576        destination: D,
577        path: P,
578        interface: I,
579    ) -> Result<Proxy<'a>>
580    where
581        D: TryInto<BusName<'a>>,
582        P: TryInto<ObjectPath<'a>>,
583        I: TryInto<InterfaceName<'a>>,
584        D::Error: Into<Error>,
585        P::Error: Into<Error>,
586        I::Error: Into<Error>,
587    {
588        Builder::new(conn)
589            .destination(destination)?
590            .path(path)?
591            .interface(interface)?
592            .build()
593            .await
594    }
595
596    /// Create a new `Proxy` for the given destination/path/interface, taking ownership of all
597    /// passed arguments.
598    pub async fn new_owned<D, P, I>(
599        conn: Connection,
600        destination: D,
601        path: P,
602        interface: I,
603    ) -> Result<Proxy<'a>>
604    where
605        D: TryInto<BusName<'static>>,
606        P: TryInto<ObjectPath<'static>>,
607        I: TryInto<InterfaceName<'static>>,
608        D::Error: Into<Error>,
609        P::Error: Into<Error>,
610        I::Error: Into<Error>,
611    {
612        Builder::new(&conn)
613            .destination(destination)?
614            .path(path)?
615            .interface(interface)?
616            .build()
617            .await
618    }
619
620    /// Get a reference to the associated connection.
621    pub fn connection(&self) -> &Connection {
622        &self.inner.inner_without_borrows.conn
623    }
624
625    /// Get a reference to the destination service name.
626    pub fn destination(&self) -> &BusName<'a> {
627        &self.inner.destination
628    }
629
630    /// Get a reference to the object path.
631    pub fn path(&self) -> &ObjectPath<'a> {
632        &self.inner.path
633    }
634
635    /// Get a reference to the interface.
636    pub fn interface(&self) -> &InterfaceName<'a> {
637        &self.inner.interface
638    }
639
640    /// Introspect the associated object, and return the XML description.
641    ///
642    /// See the [xml](https://docs.rs/zbus_xml) crate for parsing the
643    /// result.
644    pub async fn introspect(&self) -> fdo::Result<String> {
645        let proxy = IntrospectableProxy::builder(&self.inner.inner_without_borrows.conn)
646            .destination(&self.inner.destination)?
647            .path(&self.inner.path)?
648            .build()
649            .await?;
650
651        proxy.introspect().await
652    }
653
654    fn properties_proxy(&self) -> PropertiesProxy<'_> {
655        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
656            // Safe because already checked earlier
657            .destination(self.inner.destination.as_ref())
658            .unwrap()
659            // Safe because already checked earlier
660            .path(self.inner.path.as_ref())
661            .unwrap()
662            // does not have properties
663            .cache_properties(CacheProperties::No)
664            .build_internal()
665            .unwrap()
666            .into()
667    }
668
669    fn owned_properties_proxy(&self) -> PropertiesProxy<'static> {
670        PropertiesProxy::builder(&self.inner.inner_without_borrows.conn)
671            // Safe because already checked earlier
672            .destination(self.inner.destination.to_owned())
673            .unwrap()
674            // Safe because already checked earlier
675            .path(self.inner.path.to_owned())
676            .unwrap()
677            // does not have properties
678            .cache_properties(CacheProperties::No)
679            .build_internal()
680            .unwrap()
681            .into()
682    }
683
684    /// Get the cache, starting it in the background if needed.
685    ///
686    /// Use PropertiesCache::ready() to wait for the cache to be populated and to get any errors
687    /// encountered in the population.
688    pub(crate) fn get_property_cache(&self) -> Option<&Arc<PropertiesCache>> {
689        let cache = match &self.inner.property_cache {
690            Some(cache) => cache,
691            None => return None,
692        };
693        let (cache, _) = &cache.get_or_init(|| {
694            let proxy = self.owned_properties_proxy();
695            let interface = self.interface().to_owned();
696            let uncached_properties: HashSet<zvariant::Str<'static>> = self
697                .inner
698                .uncached_properties
699                .iter()
700                .map(|s| s.to_owned())
701                .collect();
702            let executor = self.connection().executor();
703
704            PropertiesCache::new(proxy, interface, executor, uncached_properties)
705        });
706
707        Some(cache)
708    }
709
710    /// Get the cached value of the property `property_name`.
711    ///
712    /// This returns `None` if the property is not in the cache.  This could be because the cache
713    /// was invalidated by an update, because caching was disabled for this property or proxy, or
714    /// because the cache has not yet been populated.  Use `get_property` to fetch the value from
715    /// the peer.
716    pub fn cached_property<T>(&self, property_name: &str) -> Result<Option<T>>
717    where
718        T: TryFrom<OwnedValue>,
719        T::Error: Into<Error>,
720    {
721        self.cached_property_raw(property_name)
722            .as_deref()
723            .map(|v| T::try_from(OwnedValue::try_from(v)?).map_err(Into::into))
724            .transpose()
725    }
726
727    /// Get the cached value of the property `property_name`.
728    ///
729    /// Same as `cached_property`, but gives you access to the raw value stored in the cache. This
730    /// is useful if you want to avoid allocations and cloning.
731    pub fn cached_property_raw<'p>(
732        &'p self,
733        property_name: &'p str,
734    ) -> Option<impl Deref<Target = Value<'static>> + 'p> {
735        if let Some(values) = self
736            .inner
737            .property_cache
738            .as_ref()
739            .and_then(OnceLock::get)
740            .map(|c| c.0.values.read().expect("lock poisoned"))
741        {
742            // ensure that the property is in the cache.
743            values
744                .get(property_name)
745                // if the property value has not yet been cached, this will return None.
746                .and_then(|e| e.value.as_ref())?;
747
748            struct Wrapper<'a> {
749                values: RwLockReadGuard<'a, HashMap<String, PropertyValue>>,
750                property_name: &'a str,
751            }
752
753            impl Deref for Wrapper<'_> {
754                type Target = Value<'static>;
755
756                fn deref(&self) -> &Self::Target {
757                    self.values
758                        .get(self.property_name)
759                        .and_then(|e| e.value.as_ref())
760                        .map(|v| v.deref())
761                        .expect("inexistent property")
762                }
763            }
764
765            Some(Wrapper {
766                values,
767                property_name,
768            })
769        } else {
770            None
771        }
772    }
773
774    async fn get_proxy_property(&self, property_name: &str) -> Result<OwnedValue> {
775        Ok(self
776            .properties_proxy()
777            .get(self.inner.interface.as_ref(), property_name)
778            .await?)
779    }
780
781    /// Get the property `property_name`.
782    ///
783    /// Get the property value from the cache (if caching is enabled) or call the
784    /// `Get` method of the `org.freedesktop.DBus.Properties` interface.
785    pub async fn get_property<T>(&self, property_name: &str) -> Result<T>
786    where
787        T: TryFrom<OwnedValue>,
788        T::Error: Into<Error>,
789    {
790        if let Some(cache) = self.get_property_cache() {
791            cache.ready().await?;
792        }
793        if let Some(value) = self.cached_property(property_name)? {
794            return Ok(value);
795        }
796
797        let value = self.get_proxy_property(property_name).await?;
798        value.try_into().map_err(Into::into)
799    }
800
801    /// Set the property `property_name`.
802    ///
803    /// Effectively, call the `Set` method of the `org.freedesktop.DBus.Properties` interface.
804    pub async fn set_property<'t, T>(&self, property_name: &str, value: T) -> fdo::Result<()>
805    where
806        T: 't + Into<Value<'t>>,
807    {
808        self.properties_proxy()
809            .set(self.inner.interface.as_ref(), property_name, value.into())
810            .await
811    }
812
813    /// Call a method and return the reply.
814    ///
815    /// Typically, you would want to use [`call`] method instead. Use this method if you need to
816    /// deserialize the reply message manually (this way, you can avoid the memory
817    /// allocation/copying, by deserializing the reply to an unowned type).
818    ///
819    /// [`call`]: struct.Proxy.html#method.call
820    pub async fn call_method<'m, M, B>(&self, method_name: M, body: &B) -> Result<Message>
821    where
822        M: TryInto<MemberName<'m>>,
823        M::Error: Into<Error>,
824        B: serde::ser::Serialize + zvariant::DynamicType,
825    {
826        self.inner
827            .inner_without_borrows
828            .conn
829            .call_method(
830                Some(&self.inner.destination),
831                self.inner.path.as_str(),
832                Some(&self.inner.interface),
833                method_name,
834                body,
835            )
836            .await
837    }
838
839    /// Call a method and return the reply body.
840    ///
841    /// Use [`call_method`] instead if you need to deserialize the reply manually/separately.
842    ///
843    /// [`call_method`]: struct.Proxy.html#method.call_method
844    pub async fn call<'m, M, B, R>(&self, method_name: M, body: &B) -> Result<R>
845    where
846        M: TryInto<MemberName<'m>>,
847        M::Error: Into<Error>,
848        B: serde::ser::Serialize + zvariant::DynamicType,
849        R: for<'d> zvariant::DynamicDeserialize<'d>,
850    {
851        let reply = self.call_method(method_name, body).await?;
852
853        reply.body().deserialize()
854    }
855
856    /// Call a method and return the reply body, optionally supplying a set of
857    /// method flags to control the way the method call message is sent and handled.
858    ///
859    /// Use [`call`] instead if you do not need any special handling via additional flags.
860    /// If the `NoReplyExpected` flag is passed, this will return None immediately
861    /// after sending the message, similar to [`call_noreply`].
862    ///
863    /// [`call`]: struct.Proxy.html#method.call
864    /// [`call_noreply`]: struct.Proxy.html#method.call_noreply
865    pub async fn call_with_flags<'m, M, B, R>(
866        &self,
867        method_name: M,
868        flags: BitFlags<MethodFlags>,
869        body: &B,
870    ) -> Result<Option<R>>
871    where
872        M: TryInto<MemberName<'m>>,
873        M::Error: Into<Error>,
874        B: serde::ser::Serialize + zvariant::DynamicType,
875        R: for<'d> zvariant::DynamicDeserialize<'d>,
876    {
877        let flags = flags.iter().map(Flags::from).collect::<BitFlags<_>>();
878        match self
879            .inner
880            .inner_without_borrows
881            .conn
882            .call_method_raw(
883                Some(self.destination()),
884                self.path(),
885                Some(self.interface()),
886                method_name,
887                flags,
888                body,
889            )
890            .await?
891        {
892            Some(reply) => reply.await?.body().deserialize().map(Some),
893            None => Ok(None),
894        }
895    }
896
897    /// Call a method without expecting a reply.
898    ///
899    /// This sets the `NoReplyExpected` flag on the calling message and does not wait for a reply.
900    pub async fn call_noreply<'m, M, B>(&self, method_name: M, body: &B) -> Result<()>
901    where
902        M: TryInto<MemberName<'m>>,
903        M::Error: Into<Error>,
904        B: serde::ser::Serialize + zvariant::DynamicType,
905    {
906        self.call_with_flags::<_, _, ()>(method_name, MethodFlags::NoReplyExpected.into(), body)
907            .await?;
908        Ok(())
909    }
910
911    /// Create a stream for the signal named `signal_name`.
912    ///
913    /// # Errors
914    ///
915    /// Apart from general I/O errors that can result from socket communications, calling this
916    /// method will also result in an error if the destination service has not yet registered its
917    /// well-known name with the bus (assuming you're using the well-known name as destination).
918    pub async fn receive_signal<'m, M>(&self, signal_name: M) -> Result<SignalStream<'m>>
919    where
920        M: TryInto<MemberName<'m>>,
921        M::Error: Into<Error>,
922    {
923        self.receive_signal_with_args(signal_name, &[]).await
924    }
925
926    /// Same as [`Proxy::receive_signal`] but with a filter.
927    ///
928    /// The D-Bus specification allows you to filter signals by their arguments, which helps avoid
929    /// a lot of unnecessary traffic and processing since the filter is run on the server side. Use
930    /// this method where possible. Note that this filtering is limited to arguments of string
931    /// types.
932    ///
933    /// The arguments are passed as tuples of argument index and expected value.
934    pub async fn receive_signal_with_args<'m, M>(
935        &self,
936        signal_name: M,
937        args: &[(u8, &str)],
938    ) -> Result<SignalStream<'m>>
939    where
940        M: TryInto<MemberName<'m>>,
941        M::Error: Into<Error>,
942    {
943        let signal_name = signal_name.try_into().map_err(Into::into)?;
944        self.receive_signals(Some(signal_name), args).await
945    }
946
947    async fn receive_signals<'m>(
948        &self,
949        signal_name: Option<MemberName<'m>>,
950        args: &[(u8, &str)],
951    ) -> Result<SignalStream<'m>> {
952        self.inner.subscribe_dest_owner_change().await?;
953
954        SignalStream::new(self.clone(), signal_name, args).await
955    }
956
957    /// Create a stream for all signals emitted by this service.
958    pub async fn receive_all_signals(&self) -> Result<SignalStream<'static>> {
959        self.receive_signals(None, &[]).await
960    }
961
962    /// Get a stream to receive property changed events.
963    ///
964    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
965    /// will only receive the last update.
966    ///
967    /// If caching is not enabled on this proxy, the resulting stream will not return any events.
968    pub async fn receive_property_changed<'name: 'a, T>(
969        &self,
970        name: &'name str,
971    ) -> PropertyStream<'a, T> {
972        let properties = self.get_property_cache();
973        let changed_listener = if let Some(properties) = &properties {
974            let mut values = properties.values.write().expect("lock poisoned");
975            let entry = values
976                .entry(name.to_string())
977                .or_insert_with(PropertyValue::default);
978            entry.event.listen()
979        } else {
980            Event::new().listen()
981        };
982
983        PropertyStream {
984            name,
985            proxy: self.clone(),
986            changed_listener,
987            phantom: std::marker::PhantomData,
988        }
989    }
990
991    /// Get a stream to receive destination owner changed events.
992    ///
993    /// If the proxy destination is a unique name, the stream will be notified of the peer
994    /// disconnection from the bus (with a `None` value).
995    ///
996    /// If the proxy destination is a well-known name, the stream will be notified whenever the name
997    /// owner is changed, either by a new peer being granted ownership (`Some` value) or when the
998    /// name is released (with a `None` value).
999    ///
1000    /// Note that zbus doesn't queue the updates. If the listener is slower than the receiver, it
1001    /// will only receive the last update.
1002    pub async fn receive_owner_changed(&self) -> Result<OwnerChangedStream<'a>> {
1003        use ordered_stream::OrderedStreamExt;
1004        let dbus_proxy = fdo::DBusProxy::builder(self.connection())
1005            .cache_properties(CacheProperties::No)
1006            .build()
1007            .await?;
1008        Ok(OwnerChangedStream {
1009            stream: dbus_proxy
1010                .receive_name_owner_changed_with_args(&[(0, self.destination().as_str())])
1011                .await?
1012                .map(Box::new(move |signal| {
1013                    let args = signal.args().unwrap();
1014                    let new_owner = args.new_owner().as_ref().map(|owner| owner.to_owned());
1015
1016                    new_owner
1017                })),
1018            name: self.destination().clone(),
1019        })
1020    }
1021}
1022
1023#[derive(Debug, Default)]
1024struct PropertyValue {
1025    value: Option<OwnedValue>,
1026    event: Event,
1027}
1028
1029/// Flags to use with [`Proxy::call_with_flags`].
1030#[bitflags]
1031#[repr(u8)]
1032#[derive(Debug, Copy, Clone, PartialEq, Eq)]
1033pub enum MethodFlags {
1034    /// No response is expected from this method call, regardless of whether the
1035    /// signature for the interface method indicates a reply type. When passed,
1036    /// `call_with_flags` will return `Ok(None)` immediately after successfully
1037    /// sending the method call.
1038    ///
1039    /// Errors encountered while *making* the call will still be returned as
1040    /// an `Err` variant, but any errors that are triggered by the receiver's
1041    /// handling of the call will not be delivered.
1042    NoReplyExpected = 0x1,
1043
1044    /// When set on a call whose destination is a message bus, this flag will instruct
1045    /// the bus not to [launch][al] a service to handle the call if no application
1046    /// on the bus owns the requested name.
1047    ///
1048    /// This flag is ignored when using a peer-to-peer connection.
1049    ///
1050    /// [al]: https://dbus.freedesktop.org/doc/dbus-specification.html#message-bus-starting-services
1051    NoAutoStart = 0x2,
1052
1053    /// Indicates to the receiver that this client is prepared to wait for interactive
1054    /// authorization, which might take a considerable time to complete. For example, the receiver
1055    /// may query the user for confirmation via [polkit] or a similar framework.
1056    ///
1057    /// [polkit]: https://gitlab.freedesktop.org/polkit/polkit/
1058    AllowInteractiveAuth = 0x4,
1059}
1060
1061impl From<MethodFlags> for Flags {
1062    fn from(method_flag: MethodFlags) -> Self {
1063        match method_flag {
1064            MethodFlags::NoReplyExpected => Self::NoReplyExpected,
1065            MethodFlags::NoAutoStart => Self::NoAutoStart,
1066            MethodFlags::AllowInteractiveAuth => Self::AllowInteractiveAuth,
1067        }
1068    }
1069}
1070
1071type OwnerChangedStreamMap = Map<
1072    fdo::NameOwnerChangedStream,
1073    Box<dyn FnMut(fdo::NameOwnerChanged) -> Option<UniqueName<'static>> + Send + Sync + Unpin>,
1074>;
1075
1076/// A [`stream::Stream`] implementation that yields `UniqueName` when the bus owner changes.
1077///
1078/// Use [`Proxy::receive_owner_changed`] to create an instance of this type.
1079pub struct OwnerChangedStream<'a> {
1080    stream: OwnerChangedStreamMap,
1081    name: BusName<'a>,
1082}
1083
1084impl<'a> OwnerChangedStream<'a> {
1085    /// The bus name being tracked.
1086    pub fn name(&self) -> &BusName<'a> {
1087        &self.name
1088    }
1089}
1090
1091impl stream::Stream for OwnerChangedStream<'_> {
1092    type Item = Option<UniqueName<'static>>;
1093
1094    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1095        OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1096    }
1097}
1098
1099impl OrderedStream for OwnerChangedStream<'_> {
1100    type Data = Option<UniqueName<'static>>;
1101    type Ordering = Sequence;
1102
1103    fn poll_next_before(
1104        self: Pin<&mut Self>,
1105        cx: &mut Context<'_>,
1106        before: Option<&Self::Ordering>,
1107    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1108        Pin::new(&mut self.get_mut().stream).poll_next_before(cx, before)
1109    }
1110}
1111
1112/// A [`stream::Stream`] implementation that yields signal [messages](`Message`).
1113///
1114/// Use [`Proxy::receive_signal`] to create an instance of this type.
1115///
1116/// This type uses a [`MessageStream::for_match_rule`] internally and therefore the note about match
1117/// rule registration and [`AsyncDrop`] in its documentation applies here as well.
1118#[derive(Debug)]
1119pub struct SignalStream<'a> {
1120    stream: Join<MessageStream, Option<MessageStream>>,
1121    src_unique_name: Option<UniqueName<'static>>,
1122    signal_name: Option<MemberName<'a>>,
1123}
1124
1125impl<'a> SignalStream<'a> {
1126    /// The signal name.
1127    pub fn name(&self) -> Option<&MemberName<'a>> {
1128        self.signal_name.as_ref()
1129    }
1130
1131    async fn new(
1132        proxy: Proxy<'_>,
1133        signal_name: Option<MemberName<'a>>,
1134        args: &[(u8, &str)],
1135    ) -> Result<SignalStream<'a>> {
1136        let mut rule_builder = MatchRule::builder()
1137            .msg_type(Type::Signal)
1138            .sender(proxy.destination())?
1139            .path(proxy.path())?
1140            .interface(proxy.interface())?;
1141        if let Some(name) = &signal_name {
1142            rule_builder = rule_builder.member(name)?;
1143        }
1144        for (i, arg) in args {
1145            rule_builder = rule_builder.arg(*i, *arg)?;
1146        }
1147        let signal_rule: OwnedMatchRule = rule_builder.build().to_owned().into();
1148        let conn = proxy.connection();
1149
1150        let (src_unique_name, stream) = match proxy.destination().to_owned() {
1151            BusName::Unique(name) => (
1152                Some(name),
1153                join_streams(
1154                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1155                    None,
1156                ),
1157            ),
1158            BusName::WellKnown(name) => {
1159                use ordered_stream::OrderedStreamExt;
1160
1161                let name_owner_changed_rule = MatchRule::builder()
1162                    .msg_type(Type::Signal)
1163                    .sender("org.freedesktop.DBus")?
1164                    .path("/org/freedesktop/DBus")?
1165                    .interface("org.freedesktop.DBus")?
1166                    .member("NameOwnerChanged")?
1167                    .add_arg(name.as_str())?
1168                    .build();
1169                let name_owner_changed_stream = MessageStream::for_match_rule(
1170                    name_owner_changed_rule,
1171                    conn,
1172                    Some(MAX_NAME_OWNER_CHANGED_SIGNALS_QUEUED),
1173                )
1174                .await?
1175                .map(Either::Left);
1176
1177                let get_name_owner = conn
1178                    .call_method_raw(
1179                        Some("org.freedesktop.DBus"),
1180                        "/org/freedesktop/DBus",
1181                        Some("org.freedesktop.DBus"),
1182                        "GetNameOwner",
1183                        BitFlags::empty(),
1184                        &name,
1185                    )
1186                    .await
1187                    .map(|r| FromFuture::from(r.expect("no reply")).map(Either::Right))?;
1188
1189                let mut join = join_streams(name_owner_changed_stream, get_name_owner);
1190
1191                let mut src_unique_name = loop {
1192                    match join.next().await {
1193                        Some(Either::Left(Ok(msg))) => {
1194                            let signal = NameOwnerChanged::from_message(msg)
1195                                .expect("`NameOwnerChanged` signal stream got wrong message");
1196                            {
1197                                break signal
1198                                    .args()
1199                                    // SAFETY: The filtering code couldn't have let this through if
1200                                    // args were not in order.
1201                                    .expect("`NameOwnerChanged` signal has no args")
1202                                    .new_owner()
1203                                    .as_ref()
1204                                    .map(UniqueName::to_owned);
1205                            }
1206                        }
1207                        Some(Either::Left(Err(_))) => (),
1208                        Some(Either::Right(Ok(response))) => {
1209                            break Some(response.body().deserialize::<UniqueName<'_>>()?.to_owned())
1210                        }
1211                        Some(Either::Right(Err(e))) => {
1212                            // Probably the name is not owned. Not a problem but let's still log it.
1213                            debug!("Failed to get owner of {name}: {e}");
1214
1215                            break None;
1216                        }
1217                        None => {
1218                            return Err(Error::InputOutput(
1219                                std::io::Error::new(
1220                                    std::io::ErrorKind::BrokenPipe,
1221                                    "connection closed",
1222                                )
1223                                .into(),
1224                            ))
1225                        }
1226                    }
1227                };
1228
1229                // Let's take into account any buffered NameOwnerChanged signal.
1230                let (stream, _, queued) = join.into_inner();
1231                if let Some(msg) = queued.and_then(|e| match e.0 {
1232                    Either::Left(Ok(msg)) => Some(msg),
1233                    Either::Left(Err(_)) | Either::Right(_) => None,
1234                }) {
1235                    if let Some(signal) = NameOwnerChanged::from_message(msg) {
1236                        if let Ok(args) = signal.args() {
1237                            match (args.name(), args.new_owner().deref()) {
1238                                (BusName::WellKnown(n), Some(new_owner)) if n == &name => {
1239                                    src_unique_name = Some(new_owner.to_owned());
1240                                }
1241                                _ => (),
1242                            }
1243                        }
1244                    }
1245                }
1246                let name_owner_changed_stream = stream.into_inner();
1247
1248                let stream = join_streams(
1249                    MessageStream::for_match_rule(signal_rule, conn, None).await?,
1250                    Some(name_owner_changed_stream),
1251                );
1252
1253                (src_unique_name, stream)
1254            }
1255        };
1256
1257        Ok(SignalStream {
1258            stream,
1259            src_unique_name,
1260            signal_name,
1261        })
1262    }
1263
1264    fn filter(&mut self, msg: &Message) -> Result<bool> {
1265        let header = msg.header();
1266        let sender = header.sender();
1267        if sender == self.src_unique_name.as_ref() {
1268            return Ok(true);
1269        }
1270
1271        // The src_unique_name must be maintained in lock-step with the applied filter
1272        if let Some(signal) = NameOwnerChanged::from_message(msg.clone()) {
1273            let args = signal.args()?;
1274            self.src_unique_name = args.new_owner().as_ref().map(|n| n.to_owned());
1275        }
1276
1277        Ok(false)
1278    }
1279}
1280
1281impl stream::Stream for SignalStream<'_> {
1282    type Item = Message;
1283
1284    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
1285        OrderedStream::poll_next_before(self, cx, None).map(|res| res.into_data())
1286    }
1287}
1288
1289impl OrderedStream for SignalStream<'_> {
1290    type Data = Message;
1291    type Ordering = Sequence;
1292
1293    fn poll_next_before(
1294        self: Pin<&mut Self>,
1295        cx: &mut Context<'_>,
1296        before: Option<&Self::Ordering>,
1297    ) -> Poll<PollResult<Self::Ordering, Self::Data>> {
1298        let this = self.get_mut();
1299        loop {
1300            match ready!(OrderedStream::poll_next_before(
1301                Pin::new(&mut this.stream),
1302                cx,
1303                before
1304            )) {
1305                PollResult::Item { data, ordering } => {
1306                    if let Ok(msg) = data {
1307                        if let Ok(true) = this.filter(&msg) {
1308                            return Poll::Ready(PollResult::Item {
1309                                data: msg,
1310                                ordering,
1311                            });
1312                        }
1313                    }
1314                }
1315                PollResult::Terminated => return Poll::Ready(PollResult::Terminated),
1316                PollResult::NoneBefore => return Poll::Ready(PollResult::NoneBefore),
1317            }
1318        }
1319    }
1320}
1321
1322impl stream::FusedStream for SignalStream<'_> {
1323    fn is_terminated(&self) -> bool {
1324        ordered_stream::FusedOrderedStream::is_terminated(&self.stream)
1325    }
1326}
1327
1328#[async_trait::async_trait]
1329impl AsyncDrop for SignalStream<'_> {
1330    async fn async_drop(self) {
1331        let (signals, names, _buffered) = self.stream.into_inner();
1332        signals.async_drop().await;
1333        if let Some(names) = names {
1334            names.async_drop().await;
1335        }
1336    }
1337}
1338
1339#[cfg(feature = "blocking-api")]
1340impl<'a> From<crate::blocking::Proxy<'a>> for Proxy<'a> {
1341    fn from(proxy: crate::blocking::Proxy<'a>) -> Self {
1342        proxy.into_inner()
1343    }
1344}
1345
1346/// This trait is implemented by all async proxies, which are generated with the
1347/// [`proxy`](macro@zbus::proxy) macro.
1348pub trait ProxyImpl<'c>
1349where
1350    Self: Sized,
1351{
1352    /// Return a customizable builder for this proxy.
1353    fn builder(conn: &Connection) -> Builder<'c, Self>;
1354
1355    /// Consume `self`, returning the underlying `zbus::Proxy`.
1356    fn into_inner(self) -> Proxy<'c>;
1357
1358    /// The reference to the underlying `zbus::Proxy`.
1359    fn inner(&self) -> &Proxy<'c>;
1360}
1361
1362enum Either<L, R> {
1363    Left(L),
1364    Right(R),
1365}
1366
1367#[cfg(test)]
1368mod tests {
1369    use super::*;
1370    use crate::{connection, interface, object_server::SignalEmitter, proxy, utils::block_on};
1371    use futures_util::StreamExt;
1372    use ntest::timeout;
1373    use test_log::test;
1374
1375    #[test]
1376    #[timeout(15000)]
1377    fn signal() {
1378        block_on(test_signal()).unwrap();
1379    }
1380
1381    async fn test_signal() -> Result<()> {
1382        // Register a well-known name with the session bus and ensure we get the appropriate
1383        // signals called for that.
1384        let conn = Connection::session().await?;
1385        let dest_conn = Connection::session().await?;
1386        let unique_name = dest_conn.unique_name().unwrap().clone();
1387
1388        let well_known = "org.freedesktop.zbus.async.ProxySignalStreamTest";
1389        let proxy: Proxy<'_> = Builder::new(&conn)
1390            .destination(well_known)?
1391            .path("/does/not/matter")?
1392            .interface("does.not.matter")?
1393            .build()
1394            .await?;
1395        let mut owner_changed_stream = proxy.receive_owner_changed().await?;
1396
1397        let proxy = fdo::DBusProxy::new(&dest_conn).await?;
1398        let mut name_acquired_stream = proxy
1399            .inner()
1400            .receive_signal_with_args("NameAcquired", &[(0, well_known)])
1401            .await?;
1402
1403        let prop_stream = proxy
1404            .inner()
1405            .receive_property_changed("SomeProp")
1406            .await
1407            .filter_map(|changed| async move {
1408                let v: Option<u32> = changed.get().await.ok();
1409                dbg!(v)
1410            });
1411        drop(proxy);
1412        drop(prop_stream);
1413
1414        dest_conn.request_name(well_known).await?;
1415
1416        let (new_owner, acquired_signal) =
1417            futures_util::join!(owner_changed_stream.next(), name_acquired_stream.next(),);
1418
1419        assert_eq!(&new_owner.unwrap().unwrap(), &*unique_name);
1420
1421        let acquired_signal = acquired_signal.unwrap();
1422        assert_eq!(
1423            acquired_signal.body().deserialize::<&str>().unwrap(),
1424            well_known
1425        );
1426
1427        let proxy = Proxy::new(&conn, &unique_name, "/does/not/matter", "does.not.matter").await?;
1428        let mut unique_name_changed_stream = proxy.receive_owner_changed().await?;
1429
1430        drop(dest_conn);
1431        name_acquired_stream.async_drop().await;
1432
1433        // There shouldn't be an owner anymore.
1434        let new_owner = owner_changed_stream.next().await;
1435        assert!(new_owner.unwrap().is_none());
1436
1437        let new_unique_owner = unique_name_changed_stream.next().await;
1438        assert!(new_unique_owner.unwrap().is_none());
1439
1440        Ok(())
1441    }
1442
1443    #[test]
1444    #[timeout(15000)]
1445    fn signal_stream_deadlock() {
1446        block_on(test_signal_stream_deadlock()).unwrap();
1447    }
1448
1449    /// Tests deadlocking in signal reception when the message queue is full.
1450    ///
1451    /// Creates a connection with a small message queue, and a service that
1452    /// emits signals at a high rate. First a listener is created that listens
1453    /// for that signal which should fill the small queue. Then another signal
1454    /// signal listener is created against another signal. Previously, this second
1455    /// call to add the match rule never resolved and resulted in a deadlock.
1456    async fn test_signal_stream_deadlock() -> Result<()> {
1457        #[proxy(
1458            gen_blocking = false,
1459            default_path = "/org/zbus/Test",
1460            default_service = "org.zbus.Test.MR501",
1461            interface = "org.zbus.Test"
1462        )]
1463        trait Test {
1464            #[zbus(signal)]
1465            fn my_signal(&self, msg: &str) -> Result<()>;
1466        }
1467
1468        struct TestIface;
1469
1470        #[interface(name = "org.zbus.Test")]
1471        impl TestIface {
1472            #[zbus(signal)]
1473            async fn my_signal(context: &SignalEmitter<'_>, msg: &'static str) -> Result<()>;
1474        }
1475
1476        let test_iface = TestIface;
1477        let server_conn = connection::Builder::session()?
1478            .name("org.zbus.Test.MR501")?
1479            .serve_at("/org/zbus/Test", test_iface)?
1480            .build()
1481            .await?;
1482
1483        let client_conn = connection::Builder::session()?
1484            .max_queued(1)
1485            .build()
1486            .await?;
1487
1488        let test_proxy = TestProxy::new(&client_conn).await?;
1489        let test_prop_proxy = PropertiesProxy::builder(&client_conn)
1490            .destination("org.zbus.Test.MR501")?
1491            .path("/org/zbus/Test")?
1492            .build()
1493            .await?;
1494
1495        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1496
1497        let handle = {
1498            let tx = tx.clone();
1499            let conn = server_conn.clone();
1500            let server_fut = async move {
1501                use std::time::Duration;
1502
1503                #[cfg(not(feature = "tokio"))]
1504                use async_io::Timer;
1505
1506                #[cfg(feature = "tokio")]
1507                use tokio::time::sleep;
1508
1509                let iface_ref = conn
1510                    .object_server()
1511                    .interface::<_, TestIface>("/org/zbus/Test")
1512                    .await
1513                    .unwrap();
1514
1515                let context = iface_ref.signal_emitter();
1516                while !tx.is_closed() {
1517                    for _ in 0..10 {
1518                        TestIface::my_signal(context, "This is a test")
1519                            .await
1520                            .unwrap();
1521                    }
1522
1523                    #[cfg(not(feature = "tokio"))]
1524                    Timer::after(Duration::from_millis(5)).await;
1525
1526                    #[cfg(feature = "tokio")]
1527                    sleep(Duration::from_millis(5)).await;
1528                }
1529            };
1530            server_conn.executor().spawn(server_fut, "server_task")
1531        };
1532
1533        let signal_fut = async {
1534            let mut signal_stream = test_proxy.receive_my_signal().await.unwrap();
1535
1536            tx.send(()).await.unwrap();
1537
1538            while let Some(_signal) = signal_stream.next().await {}
1539        };
1540
1541        let prop_fut = async move {
1542            rx.recv().await.unwrap();
1543            let _prop_stream = test_prop_proxy.receive_properties_changed().await.unwrap();
1544        };
1545
1546        futures_util::pin_mut!(signal_fut);
1547        futures_util::pin_mut!(prop_fut);
1548
1549        futures_util::future::select(signal_fut, prop_fut).await;
1550
1551        handle.await;
1552
1553        Ok(())
1554    }
1555}