1
use std::sync::Arc;
2

            
3
use anyhow::Context;
4
use chrono::{Timelike, Utc};
5
use tokio::net::UdpSocket;
6

            
7
use crate::{
8
    proto::{Whod, WhodStatusUpdate},
9
    server::rwhod::RwhodStatusStore,
10
};
11

            
12
pub async fn rwhod_packet_receiver_task(
13
    socket: Arc<UdpSocket>,
14
    whod_status_store: RwhodStatusStore,
15
) -> anyhow::Result<()> {
16
    let mut buf = [0u8; Whod::MAX_SIZE];
17

            
18
    loop {
19
        let (len, src) = socket.recv_from(&mut buf).await?;
20

            
21
        tracing::debug!("Received rwhod packet of length {} bytes from {}", len, src);
22

            
23
        if len < Whod::HEADER_SIZE {
24
            tracing::error!(
25
                "Received too short packet from {src}: {len} bytes (needs to be at least {} bytes)",
26
                Whod::HEADER_SIZE
27
            );
28
            continue;
29
        }
30

            
31
        let result = Whod::from_bytes(&buf[..len])
32
            .context("Failed to parse whod packet")?
33
            .try_into()
34
            .map(|mut status_update: WhodStatusUpdate| {
35
                let timestamp = Utc::now().with_nanosecond(0).unwrap_or(Utc::now());
36
                status_update.recvtime = Some(timestamp);
37
                status_update
38
            })
39
            .map_err(|e| anyhow::anyhow!("Invalid whod packet: {}", e));
40

            
41
        match result {
42
            Ok(status_update) => {
43
                tracing::debug!("Processed whod packet from {src}: {:?}", status_update);
44

            
45
                let mut store = whod_status_store.write().await;
46
                store.insert(status_update.hostname.clone(), status_update);
47
            }
48
            Err(err) => {
49
                tracing::error!("Error processing whod packet from {src}: {err}");
50
            }
51
        }
52
    }
53
}