roowho2_lib/server/
rwhod.rs1use std::{
2 collections::{HashMap, HashSet},
3 net::{IpAddr, SocketAddr},
4 path::Path,
5 sync::Arc,
6};
7
8use anyhow::Context;
9use chrono::{DateTime, Duration, Timelike, Utc};
10use nix::{
11 ifaddrs::getifaddrs,
12 net::if_::InterfaceFlags,
13 sys::{stat::stat, sysinfo::sysinfo},
14 unistd::gethostname,
15};
16use tokio::{
17 net::UdpSocket,
18 sync::RwLock,
19 time::{Duration as TokioDuration, interval},
20};
21use uucore::utmpx::Utmpx;
22
23use crate::proto::{Whod, WhodStatusUpdate, WhodUserEntry};
24
25pub const RWHOD_BROADCAST_PORT: u16 = 513;
27
28pub type RwhodStatusStore = Arc<RwLock<HashMap<String, WhodStatusUpdate>>>;
29
30pub fn generate_rwhod_user_entries(now: DateTime<Utc>) -> anyhow::Result<Vec<WhodUserEntry>> {
32 Utmpx::iter_all_records()
33 .filter(|entry| entry.is_user_process())
34 .map(|entry| {
35 let login_time = entry
36 .login_time()
37 .checked_to_utc()
38 .and_then(|t| DateTime::<Utc>::from_timestamp_secs(t.unix_timestamp()))
39 .ok_or_else(|| anyhow::anyhow!("Failed to convert login time to UTC"))?;
40
41 let idle_time = stat(&Path::new("/dev").join(entry.tty_device()))
42 .ok()
43 .and_then(|st| {
44 let last_active = DateTime::<Utc>::from_timestamp_secs(st.st_atime)?;
45 Some((now - last_active).max(Duration::zero()))
46 })
47 .unwrap_or(Duration::zero());
48
49 debug_assert!(
50 idle_time.num_seconds() >= 0,
51 "Idle time should never be negative"
52 );
53
54 Ok(WhodUserEntry::new(
55 entry.tty_device(),
56 entry.user(),
57 login_time,
58 idle_time,
59 ))
60 })
61 .collect()
62}
63
64pub fn generate_rwhod_status_update() -> anyhow::Result<WhodStatusUpdate> {
66 let sysinfo = sysinfo().unwrap();
67 let load_average = sysinfo.load_average();
68 let uptime = sysinfo.uptime();
69 let hostname = gethostname()?.to_str().unwrap().to_string();
70 let now = Utc::now().with_nanosecond(0).unwrap_or(Utc::now());
71
72 let result = WhodStatusUpdate::new(
73 now,
74 None,
75 hostname,
76 (
77 (load_average.0 * 100.0).abs() as i32,
78 (load_average.1 * 100.0).abs() as i32,
79 (load_average.2 * 100.0).abs() as i32,
80 ),
81 now - uptime,
82 generate_rwhod_user_entries(now)?,
83 );
84
85 Ok(result)
86}
87
88#[derive(Debug, Clone)]
89pub struct RwhodSendTarget {
90 pub name: String,
92
93 pub addr: IpAddr,
97}
98
99pub fn determine_relevant_interfaces() -> anyhow::Result<Vec<RwhodSendTarget>> {
101 getifaddrs().map_err(|e| e.into()).map(|ifaces| {
102 ifaces
103 .filter(|iface| iface.flags.contains(InterfaceFlags::IFF_UP))
105 .filter(|iface| {
107 iface
108 .flags
109 .intersects(InterfaceFlags::IFF_BROADCAST | InterfaceFlags::IFF_POINTOPOINT)
110 })
111 .filter_map(|iface| {
112 let neighbor_addr = if iface.flags.contains(InterfaceFlags::IFF_BROADCAST) {
113 iface.broadcast
114 } else if iface.flags.contains(InterfaceFlags::IFF_POINTOPOINT) {
115 iface.destination
116 } else {
117 None
118 };
119
120 match neighbor_addr {
121 Some(addr) => addr
122 .as_sockaddr_in()
123 .map(|sa| IpAddr::V4(sa.ip()))
124 .or_else(|| addr.as_sockaddr_in6().map(|sa| IpAddr::V6(sa.ip())))
125 .map(|ip_addr| RwhodSendTarget {
126 name: iface.interface_name,
127 addr: ip_addr,
128 }),
129 None => None,
130 }
131 })
132 .scan(HashSet::new(), |seen, n| {
134 if seen.insert(n.name.clone()) {
135 Some(n)
136 } else {
137 None
138 }
139 })
140 .collect::<Vec<RwhodSendTarget>>()
141 })
142}
143
144pub async fn send_rwhod_packet_to_interface(
145 socket: Arc<UdpSocket>,
146 interface: &RwhodSendTarget,
147 packet: &Whod,
148) -> anyhow::Result<()> {
149 let serialized_packet = packet.to_bytes();
150
151 let target_addr = match interface.addr {
153 IpAddr::V4(addr) => SocketAddr::new(IpAddr::V4(addr), RWHOD_BROADCAST_PORT),
154 IpAddr::V6(addr) => SocketAddr::new(IpAddr::V6(addr), RWHOD_BROADCAST_PORT),
155 };
156
157 tracing::debug!(
158 "Sending rwhod packet to interface {} at address {}",
159 interface.name,
160 target_addr
161 );
162
163 socket
164 .send_to(&serialized_packet, &target_addr)
165 .await
166 .map_err(|e| anyhow::anyhow!("Failed to send rwhod packet: {}", e))?;
167
168 Ok(())
169}
170
171pub async fn rwhod_packet_receiver_task(
172 socket: Arc<UdpSocket>,
173 whod_status_store: RwhodStatusStore,
174) -> anyhow::Result<()> {
175 let mut buf = [0u8; Whod::MAX_SIZE];
176
177 loop {
178 let (len, src) = socket.recv_from(&mut buf).await?;
179
180 tracing::debug!("Received rwhod packet of length {} bytes from {}", len, src);
181
182 if len < Whod::HEADER_SIZE {
183 tracing::error!(
184 "Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
185 Whod::HEADER_SIZE
186 );
187 continue;
188 }
189
190 let result = Whod::from_bytes(&buf[..len])
191 .context("Failed to parse whod packet")?
192 .try_into()
193 .map(|mut status_update: WhodStatusUpdate| {
194 let timestamp = Utc::now().with_nanosecond(0).unwrap_or(Utc::now());
195 status_update.recvtime = Some(timestamp);
196 status_update
197 })
198 .map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e));
199
200 match result {
201 Ok(status_update) => {
202 tracing::debug!("Processed whod packet from {src}: {:?}", status_update);
203
204 let mut store = whod_status_store.write().await;
205 store.insert(status_update.hostname.clone(), status_update);
206 }
207 Err(err) => {
208 tracing::error!("Error processing whod packet from {src}: {err}");
209 }
210 }
211 }
212}
213
214pub async fn rwhod_packet_sender_task(
215 socket: Arc<UdpSocket>,
216 interfaces: Vec<RwhodSendTarget>,
217) -> anyhow::Result<()> {
218 let mut interval = interval(TokioDuration::from_secs(60));
219
220 loop {
221 interval.tick().await;
222
223 let status_update = generate_rwhod_status_update()?;
224
225 tracing::debug!("Generated rwhod packet: {:?}", status_update);
226
227 let packet = status_update
228 .try_into()
229 .map_err(|e| anyhow::anyhow!("{}", e))?;
230
231 for interface in &interfaces {
232 if let Err(e) = send_rwhod_packet_to_interface(socket.clone(), interface, &packet).await
233 {
234 tracing::error!(
235 "Failed to send rwhod packet on interface {}: {}",
236 interface.name,
237 e
238 );
239 }
240 }
241 }
242}