zlink_tokio/notified/
state.rs1use std::{
2 fmt::Debug,
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use crate::Reply;
8use pin_project_lite::pin_project;
9use tokio::sync::broadcast;
10use tokio_stream::wrappers::BroadcastStream;
11
12#[derive(Debug, Clone)]
14pub struct State<T, ReplyParams> {
15 value: T,
16 tx: broadcast::Sender<ReplyParams>,
17}
18
19impl<T, ReplyParams> zlink_core::notified::State<T, ReplyParams> for State<T, ReplyParams>
20where
21 T: Into<ReplyParams> + Clone + Debug + Send,
22 ReplyParams: Clone + Send + 'static + Debug,
23{
24 type Stream = Stream<ReplyParams>;
25
26 fn new(value: T) -> Self {
28 let (tx, _) = broadcast::channel(1);
29
30 Self { value, tx }
31 }
32
33 async fn set(&mut self, value: T) {
35 self.value = value.clone();
36 let _ = self.tx.send(value.into());
38 }
39
40 fn get(&self) -> T {
42 self.value.clone()
43 }
44
45 fn stream(&self) -> Stream<ReplyParams> {
47 Stream {
48 inner: self.tx.subscribe().into(),
49 cached: None,
50 once: false,
51 }
52 }
53
54 fn stream_once(&self) -> Stream<ReplyParams> {
56 Stream {
57 inner: self.tx.subscribe().into(),
58 cached: Some(self.get().into()),
59 once: true,
60 }
61 }
62}
63
64pin_project! {
65 #[derive(Debug)]
68 pub struct Stream<ReplyParams> {
69 #[pin]
70 inner: BroadcastStream<ReplyParams>,
71 cached: Option<ReplyParams>,
72 once: bool,
73 }
74}
75
76impl<ReplyParams> futures_util::Stream for Stream<ReplyParams>
77where
78 ReplyParams: Clone + Send + 'static,
79{
80 type Item = Reply<ReplyParams>;
81
82 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 let this = self.project();
84 if *this.once {
85 return Poll::Ready(
86 this.cached
87 .take()
88 .map(|reply| Reply::new(Some(reply)).set_continues(Some(false))),
89 );
90 }
91 let mut stream = this.inner;
92 loop {
93 match futures_util::ready!(stream.as_mut().poll_next(cx)) {
94 Some(Ok(reply)) => {
95 *this.cached = Some(reply.clone());
97 break Poll::Ready(Some(Reply::new(Some(reply)).set_continues(Some(true))));
98 }
99 Some(Err(_)) => continue,
102 None => {
104 break Poll::Ready(
105 this.cached
106 .take()
107 .map(|reply| Reply::new(Some(reply)).set_continues(Some(false))),
108 );
109 }
110 }
111 }
112 }
113}