1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
#![allow(missing_docs)]

use std::cell::RefCell;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::mpsc::SyncSender;

use super::reactor::Reactor;
use super::SubgraphId;

pub trait Give<T> {
    fn give(&self, t: T) -> bool;
}

pub struct Buffer<T>(pub(crate) Rc<RefCell<Vec<T>>>);
impl<T> Give<T> for Buffer<T> {
    fn give(&self, t: T) -> bool {
        (*self.0).borrow_mut().push(t);
        true
    }
}

impl<T> Default for Buffer<T> {
    fn default() -> Self {
        Buffer(Rc::new(RefCell::new(Vec::new())))
    }
}

impl<T> Clone for Buffer<T> {
    fn clone(&self) -> Self {
        Buffer(self.0.clone())
    }
}

impl<T> Give<T> for SyncSender<T> {
    fn give(&self, t: T) -> bool {
        self.send(t).is_ok()
    }
}

// TODO(justin): this thing should probably give Vecs to the Givable, and buffer
// stuff up and automatically flush, but postponing that until we have occasion
// to benchmark it.
pub struct Input<T, G>
where
    G: Give<T>,
{
    reactor: Reactor,
    sg_id: SubgraphId,
    givable: G,
    _marker: PhantomData<T>,
}
impl<T, G> Input<T, G>
where
    G: Give<T>,
{
    pub fn new(reactor: Reactor, sg_id: SubgraphId, givable: G) -> Self {
        Input {
            reactor,
            sg_id,
            givable,
            _marker: PhantomData,
        }
    }

    pub fn give(&self, t: T) {
        self.givable.give(t);
    }

    pub fn flush(&self) {
        self.reactor.trigger(self.sg_id).unwrap(/* TODO(justin) */);
    }
}