Skip to main content

zlink_tokio/notified/
state.rs

1use std::{
2    fmt::Debug,
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use crate::Reply;
8use pin_project_lite::pin_project;
9use tokio::sync::broadcast;
10use tokio_stream::wrappers::BroadcastStream;
11
12/// A notified state (e.g a field) of a service implementation.
13#[derive(Debug, Clone)]
14pub struct State<T, ReplyParams> {
15    value: T,
16    tx: broadcast::Sender<ReplyParams>,
17}
18
19impl<T, ReplyParams> zlink_core::notified::State<T, ReplyParams> for State<T, ReplyParams>
20where
21    T: Into<ReplyParams> + Clone + Debug + Send,
22    ReplyParams: Clone + Send + 'static + Debug,
23{
24    type Stream = Stream<ReplyParams>;
25
26    /// Create a new notified field.
27    fn new(value: T) -> Self {
28        let (tx, _) = broadcast::channel(1);
29
30        Self { value, tx }
31    }
32
33    /// Set the value of the notified field and notify all listeners.
34    async fn set(&mut self, value: T) {
35        self.value = value.clone();
36        // Failure means that there are currently no receivers and that's ok.
37        let _ = self.tx.send(value.into());
38    }
39
40    /// Get the value of the notified field.
41    fn get(&self) -> T {
42        self.value.clone()
43    }
44
45    /// Get a stream of replies for the notified field.
46    fn stream(&self) -> Stream<ReplyParams> {
47        Stream {
48            inner: self.tx.subscribe().into(),
49            cached: None,
50            once: false,
51        }
52    }
53
54    /// A stream of replies for this state, that only yields one reply: the current state.
55    fn stream_once(&self) -> Stream<ReplyParams> {
56        Stream {
57            inner: self.tx.subscribe().into(),
58            cached: Some(self.get().into()),
59            once: true,
60        }
61    }
62}
63
64pin_project! {
65    /// The stream to use as the [`crate::Service::ReplyStream`] in service implementation when
66    /// using [`State`].
67    #[derive(Debug)]
68    pub struct Stream<ReplyParams> {
69        #[pin]
70        inner: BroadcastStream<ReplyParams>,
71        cached: Option<ReplyParams>,
72        once: bool,
73    }
74}
75
76impl<ReplyParams> futures_util::Stream for Stream<ReplyParams>
77where
78    ReplyParams: Clone + Send + 'static,
79{
80    type Item = Reply<ReplyParams>;
81
82    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83        let this = self.project();
84        if *this.once {
85            return Poll::Ready(
86                this.cached
87                    .take()
88                    .map(|reply| Reply::new(Some(reply)).set_continues(Some(false))),
89            );
90        }
91        let mut stream = this.inner;
92        loop {
93            match futures_util::ready!(stream.as_mut().poll_next(cx)) {
94                Some(Ok(reply)) => {
95                    // Cache and yield immediately with continues=true.
96                    *this.cached = Some(reply.clone());
97                    break Poll::Ready(Some(Reply::new(Some(reply)).set_continues(Some(true))));
98                }
99                // Some intermediate values were missed. That's OK, as long as we get the
100                // latest value.
101                Some(Err(_)) => continue,
102                // Channel closed - yield cached value with continues=false.
103                None => {
104                    break Poll::Ready(
105                        this.cached
106                            .take()
107                            .map(|reply| Reply::new(Some(reply)).set_continues(Some(false))),
108                    );
109                }
110            }
111        }
112    }
113}