Coverage Report

Created: 2024-05-16 12:16

/__w/smoldot/smoldot/repo/lib/src/json_rpc/service/deliver_channel.rs
Line
Count
Source (jump to first uncovered line)
1
// Smoldot
2
// Copyright (C) 2023  Pierre Krieger
3
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5
// This program is free software: you can redistribute it and/or modify
6
// it under the terms of the GNU General Public License as published by
7
// the Free Software Foundation, either version 3 of the License, or
8
// (at your option) any later version.
9
10
// This program is distributed in the hope that it will be useful,
11
// but WITHOUT ANY WARRANTY; without even the implied warranty of
12
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13
// GNU General Public License for more details.
14
15
// You should have received a copy of the GNU General Public License
16
// along with this program.  If not, see <http://www.gnu.org/licenses/>.
17
18
use alloc::sync::Arc;
19
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
20
21
// TODO: this implementation could probably be optimized, but well, it works
22
23
// TODO: should be bounded
24
0
pub fn deliver_channel<T>() -> (DeliverSender<T>, DeliverReceiver<T>) {
25
0
    let common = Arc::new(Common {
26
0
        queue: crossbeam_queue::SegQueue::new(),
27
0
        num_senders_alive: AtomicUsize::new(1),
28
0
        sender_did_something: event_listener::Event::new(),
29
0
        receiver_is_dead: AtomicBool::new(false),
30
0
        receiver_did_something: event_listener::Event::new(),
31
0
    });
32
0
33
0
    let tx = DeliverSender {
34
0
        common: common.clone(),
35
0
    };
36
0
    let rx = DeliverReceiver { common };
37
0
    (tx, rx)
38
0
}
Unexecuted instantiation: _RINvNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channel15deliver_channelpEB8_
Unexecuted instantiation: _RINvNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channel15deliver_channelpEB8_
39
40
pub struct DeliverSender<T> {
41
    common: Arc<Common<T>>,
42
}
43
44
impl<T> DeliverSender<T> {
45
    /// Sends the given payload to the receiver and waits for it to have been received.
46
    ///
47
    /// If the [`DeliverReceiver`] has been dropped, an error is returned with the original value.
48
0
    pub async fn deliver(&mut self, payload: T) -> Result<(), T> {
Unexecuted instantiation: _RNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channelINtB2_13DeliverSenderpE7deliverB8_
Unexecuted instantiation: _RNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channelINtB2_13DeliverSenderpE7deliverB8_
49
0
        let message = Arc::new(atomic_take::AtomicTake::new(payload));
50
0
51
0
        self.common.queue.push(message.clone());
52
0
        self.common.sender_did_something.notify(1);
53
0
54
0
        let mut waiter = None;
55
0
        loop {
56
0
            if message.is_taken() {
57
0
                return Ok(());
58
0
            }
59
0
60
0
            if self.common.receiver_is_dead.load(Ordering::Acquire) {
61
0
                return match message.take() {
62
0
                    Some(payload) => Err(payload),
63
0
                    None => Ok(()),
64
                };
65
0
            }
66
67
0
            if let Some(waiter) = waiter.take() {
68
0
                waiter.await;
69
0
            } else {
70
0
                waiter = Some(self.common.receiver_did_something.listen());
71
0
            }
72
        }
73
0
    }
Unexecuted instantiation: _RNCNvMNtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channelINtB4_13DeliverSenderpE7deliver0Ba_
Unexecuted instantiation: _RNCNvMNtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channelINtB4_13DeliverSenderpE7deliver0Ba_
74
}
75
76
impl<T> Clone for DeliverSender<T> {
77
0
    fn clone(&self) -> Self {
78
0
        self.common
79
0
            .num_senders_alive
80
0
            .fetch_add(1, Ordering::Release);
81
0
        DeliverSender {
82
0
            common: self.common.clone(),
83
0
        }
84
0
    }
Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channels_0pEINtB5_13DeliverSenderpENtNtCsaYZPK01V26L_4core5clone5Clone5cloneBb_
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channels_0pEINtB5_13DeliverSenderpENtNtCsaYZPK01V26L_4core5clone5Clone5cloneBb_
85
}
86
87
impl<T> Drop for DeliverSender<T> {
88
0
    fn drop(&mut self) {
89
0
        let _num_remain = self
90
0
            .common
91
0
            .num_senders_alive
92
0
            .fetch_sub(1, Ordering::Release);
93
0
        debug_assert!(_num_remain != usize::MAX); // Check for underflow.
94
0
        self.common.sender_did_something.notify(usize::MAX);
95
0
    }
Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channels0_0pEINtB5_13DeliverSenderpENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropBb_
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channels0_0pEINtB5_13DeliverSenderpENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropBb_
96
}
97
98
pub struct DeliverReceiver<T> {
99
    common: Arc<Common<T>>,
100
}
101
102
impl<T> DeliverReceiver<T> {
103
    /// Returns the next item that was sent. If no item is available, waits until one is.
104
    ///
105
    /// Returns `None` if all the [`DeliverSender`]s have been dropped.
106
0
    pub async fn next(&mut self) -> Option<T> {
Unexecuted instantiation: _RNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channelINtB5_15DeliverReceiverpE4nextBb_
Unexecuted instantiation: _RNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channelINtB5_15DeliverReceiverpE4nextBb_
107
0
        let message: Arc<atomic_take::AtomicTake<T>> = {
108
0
            let mut waiter = None;
109
            loop {
110
0
                if let Some(item) = self.common.queue.pop() {
111
0
                    break item;
112
0
                }
113
0
114
0
                if self.common.num_senders_alive.load(Ordering::Acquire) == 0 {
115
0
                    return None;
116
0
                }
117
118
0
                if let Some(waiter) = waiter.take() {
119
0
                    waiter.await;
120
0
                } else {
121
0
                    waiter = Some(self.common.sender_did_something.listen());
122
0
                }
123
            }
124
        };
125
126
0
        let payload = message.take().unwrap();
127
0
        self.common.receiver_did_something.notify(usize::MAX);
128
0
        Some(payload)
129
0
    }
Unexecuted instantiation: _RNCNvMs1_NtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channelINtB7_15DeliverReceiverpE4next0Bd_
Unexecuted instantiation: _RNCNvMs1_NtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channelINtB7_15DeliverReceiverpE4next0Bd_
130
}
131
132
impl<T> Drop for DeliverReceiver<T> {
133
0
    fn drop(&mut self) {
134
0
        self.common.receiver_is_dead.store(true, Ordering::Release);
135
0
        self.common.receiver_did_something.notify(usize::MAX);
136
0
    }
Unexecuted instantiation: _RNvXININtNtNtCsN16ciHI6Qf_7smoldot8json_rpc7service15deliver_channels2_0pEINtB5_15DeliverReceiverpENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropBb_
Unexecuted instantiation: _RNvXININtNtNtCseuYC0Zibziv_7smoldot8json_rpc7service15deliver_channels2_0pEINtB5_15DeliverReceiverpENtNtNtCsaYZPK01V26L_4core3ops4drop4Drop4dropBb_
137
}
138
139
struct Common<T> {
140
    queue: crossbeam_queue::SegQueue<Arc<atomic_take::AtomicTake<T>>>,
141
    /// `true` if the sending side is dead.
142
    num_senders_alive: AtomicUsize,
143
    /// Notified after an element has been pushed to the queue, or after a sender died.
144
    sender_did_something: event_listener::Event,
145
    /// `true` if the receiving side is dead.
146
    receiver_is_dead: AtomicBool,
147
    /// Notified after an element has been popped from the queue, or after a receiver died.
148
    receiver_did_something: event_listener::Event,
149
}