diff --git a/exemples/coroutines-variables/Cargo.lock b/exemples/coroutines-variables/Cargo.lock new file mode 100644 index 0000000000000000000000000000000000000000..a630948b299130ee97539a7c8e9a17c861c9f03e --- /dev/null +++ b/exemples/coroutines-variables/Cargo.lock @@ -0,0 +1,113 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "coroutines-variables" +version = "0.1.0" +dependencies = [ + "mio", +] + +[[package]] +name = "libc" +version = "0.2.171" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" + +[[package]] +name = "log" +version = "0.4.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94" + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/exemples/coroutines-variables/Cargo.toml b/exemples/coroutines-variables/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..c279bf6ab03f8f0b9ae23f162d7b3c0c42d50fc0 --- /dev/null +++ b/exemples/coroutines-variables/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "coroutines-variables" +version = "0.1.0" +edition = "2024" + +[dependencies] +mio = { version = "1.0.3", features = ["net", "os-poll"] } diff --git a/exemples/coroutines-variables/src/future.rs b/exemples/coroutines-variables/src/future.rs new file mode 100644 index 0000000000000000000000000000000000000000..2fe6d932dcd6d0218f382ec389b915b757e405d7 --- /dev/null +++ b/exemples/coroutines-variables/src/future.rs @@ -0,0 +1,26 @@ +use crate::runtime::Waker; + +// Future trait looks like standard Future except for the missing context +// pub trait Future { +// type Output; +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; +//} +// We now also have the `Waker` as an argument of the `Future` trait. We see that in the standard +// library the Waker is hidden in a Context which we do not need here +pub trait Future { + type Output; + + fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>; +} + +// Similar to the Poll #[derive(Debug)] +// pub enum Poll<T> { +// Ready(T), +// Pending, +// } +pub enum PollState<T> { + Ready(T), + NotReady, +} + +// Exercise adapt the join_all function to accept the Waker structs diff --git a/exemples/coroutines-variables/src/http.rs b/exemples/coroutines-variables/src/http.rs new file mode 100644 index 0000000000000000000000000000000000000000..58cc23e717426772be7a38d71b26bd154769e8cc --- /dev/null +++ b/exemples/coroutines-variables/src/http.rs @@ -0,0 +1,122 @@ +use mio::Interest; + +use crate::future::{Future, PollState}; +use crate::runtime::{self, reactor, Waker}; +use std::io::{ErrorKind, Read, Write}; + +// Simple helper function to write GET requests +fn get_req(path: &str) -> String { + format!( + "GET {path} HTTP/1.1\r\n\ + Host: localhost\r\n\ + Connection: close\r\n\ + \r\n" + ) +} + +pub struct Http; + +impl Http { + pub fn get(path: &str) -> impl Future<Output = String> { + HttpGetFuture::new(path) + } +} + +// The id of the `Future` is also a needed information to register it for the `Waker` and `Poll` +struct HttpGetFuture { + // Option since we will not conect upon creation + stream: Option<mio::net::TcpStream>, + // buffer to read from the TcpStream + buffer: Vec<u8>, + // The GET request we will construct + path: String, + // The id of our Future + id: usize, +} + +impl HttpGetFuture { + // Create the new HttpGetFuture with default state + // We increment the `next_id()` and assign it to the id field + fn new(path: &str) -> Self { + let id = reactor().next_id(); + Self { + stream: None, + buffer: vec![], + path: String::from(path), + id, + } + } + + // Writes the request to a non bocking stream. Here we are using mio t connect to the socket + // We just create the stream and do not perform any request here + fn write_request(&mut self) { + let stream = std::net::TcpStream::connect("localhost:8080").unwrap(); + stream.set_nonblocking(true).unwrap(); + let mut stream = mio::net::TcpStream::from_std(stream); + stream.write_all(get_req(&self.path).as_bytes()).unwrap(); + self.stream = Some(stream); + } +} + +impl Future for HttpGetFuture { + type Output = String; + + // We have three different states here: + // 1. stream is None, therefore we have not started executing anything (State would be + // NotStarted) + // 2. stream is Some and read returns WouldBlock (State would be Pending) + // 3. stream is Some and read returns 0 (State would be Resolved) + // These states are implicit here but we actually have some kind of state machine + fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> { + // First poll we write the GET request to the server and register our interest into read + // evnts on te TcpStream. We then directly poll the stream immediatly (instead of returning + // NotReady). + // + // It is very important to note that we call poll() at least once before doing anything so + // we actually start doing something not when we create the future but when we await it + // This is an example of Lazy evaluation (we only do something when we poll not before) + if self.stream.is_none() { + println!("First poll - start operation"); + self.write_request(); + // Registering the Tcp stream as a source of events in the poll instance + runtime::reactor().register(self.stream.as_mut().unwrap(), Interest::READABLE, self.id); + // We also set the `Waker` responsible for handling the `Future` + runtime::reactor().set_waker(waker, self.id); + } + let mut buf = vec![0u8; 4096]; + // We loop until everything is read and return Ready when done + loop { + // We listen on the socket and read + match self.stream.as_mut().unwrap().read(&mut buf) { + // 0 bytes read so we are finished we got the entire GET response + Ok(0) => { + let s = String::from_utf8_lossy(&self.buffer); + // The future is over we deregister our interest from this source of events for + // the current id + runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id); + break PollState::Ready(String::from(s)); + } + // We read n bytes so we continue reading + Ok(n) => { + self.buffer.extend(&buf[0..n]); + continue; + } + // This error indicates that the data is not ready yet or that that there is more + // data we did not received yet + Err(e) if e.kind() == ErrorKind::WouldBlock => { + println!("Data not ready"); + // We need to + runtime::reactor().set_waker(waker, self.id); + break PollState::NotReady; + } + // This error can happen if a signal interrupted the read (which can happen) we + // handle this case by reading again + Err(e) if e.kind() == ErrorKind::Interrupted => { + continue; + } + // Nothing we can do. We panic! + Err(e) => panic!("{e:?}"), + } + } + } +} diff --git a/exemples/coroutines-variables/src/main.rs b/exemples/coroutines-variables/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..49a72d50167fa233af2543be708a6c0b85be9256 --- /dev/null +++ b/exemples/coroutines-variables/src/main.rs @@ -0,0 +1,131 @@ +mod future; +mod http; +mod runtime; + +use future::{Future, PollState}; +use http::Http; +use runtime::Waker; +use std::fmt::Write; + +fn main() { + let mut executor = runtime::init(); + executor.block_on(async_main()); +} + +// ================================= +// We rewrite this: +// ================================= + +// The code we want to write into a buffer instead of printing in the terminal +// Here we use the writeln!() macro which is similar to println! but instead of writing in the +// stadard output we write in a buffer. Also writer represents the buffer and a mutable reference +// to it. +// coroutine fn async_main() { +// let mut buffer = String::from("\nBUFFER:\n------\n"); +// let writer = &mut buffer; +// let mut counter = 0; +// println!("Program starting"); +// let txt = Http::get("/600/HelloAsyncWait").wait; +// writeln!(writer, "{txt}").unwrap(); +// counter += 1; +// let txt = Http::get("/400/HelloAsyncWait").wait; +// writeln!(writer, "{txt}").unwrap(); +// writeln!(writer, "------").unwrap(); +// counter += 1; +// println!("Received {} responses", counter); +// println!("{buffer}"); +// } + +// ================================= +// Into this: +// ================================= + +fn async_main() -> impl Future<Output = String> { + Coroutine::new() +} + +// New: Only stores an Option<usize> which will be our counter +// We first want a counter, then a self referencial String +// all in options +struct Stack {} + +enum State { + Start, + Wait1(Box<dyn Future<Output = String>>), + Wait2(Box<dyn Future<Output = String>>), + Resolved, +} + +struct Coroutine { + state: State, + // New: We add a stack that will store an internal state +} + +impl Coroutine { + fn new() -> Self { + // New: Add initial stack state + Self { + state: State::Start, + } + } +} + +impl Future for Coroutine { + type Output = String; + + fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> { + loop { + match self.state { + State::Start => { + // New: We initialize the counter, and buffer/writer when the Coroutine starts + // We create the buffer and the writer by taking the mutable reference from it + // and storing into the Option. Here the type in the Option is coerced from + // &mut to *mut + + println!("Program starting"); + let fut1 = Box::new(Http::get("/600/HelloAsyncWait")); + self.state = State::Wait1(fut1); + } + + State::Wait1(ref mut f1) => { + match f1.poll(waker) { + PollState::Ready(txt) => { + // New: restore the stack + // take() gets the value stored in the Option and leaves a None in its + // place. It like "sapping" the internal state with "nothing". We thus + // take the ownership of the internal stake in order to modify it. + // Add txt to the writer, and incremetn the counter + let fut2 = Box::new(Http::get("/400/HelloAsyncWait")); + self.state = State::Wait2(fut2); + // New: We need to save the stack (overwrite the None part) + } + PollState::NotReady => break PollState::NotReady, + } + } + + State::Wait2(ref mut f2) => { + match f2.poll(waker) { + PollState::Ready(txt) => { + // New: restore the stack + // take() gets the value stored in the Option and leaves a None in its + // place. It like "sapping" the internal state with "nothing". We thus + // take the ownership of the internal stake in order to modify it. + // if we took ownershiup here, we would invalidate the pointer of the + // writer. Add txt to the writer, and incremetn the counter + // Println! the buffer and the counte + self.state = State::Resolved; + // New: We have finished. The internal State must be back to None, which is + // what `take()` did for us. Except for the buffer which we should also + // release here. If we do not to that here the memory would leave until + // the coroutine goes out of scope. + break PollState::Ready(String::new()); + } + PollState::NotReady => break PollState::NotReady, + } + } + + State::Resolved => panic!("Polled a resolved future"), + } + } + } +} diff --git a/exemples/coroutines-variables/src/runtime.rs b/exemples/coroutines-variables/src/runtime.rs new file mode 100644 index 0000000000000000000000000000000000000000..e87aed2a804a8ce64b68fd5970ba77a04ceb3843 --- /dev/null +++ b/exemples/coroutines-variables/src/runtime.rs @@ -0,0 +1,10 @@ +mod executor; +mod reactor; + +pub use executor::{Executor, Waker}; +pub use reactor::reactor; + +pub fn init() -> Executor { + reactor::start(); + Executor::new() +} diff --git a/exemples/coroutines-variables/src/runtime/executor.rs b/exemples/coroutines-variables/src/runtime/executor.rs new file mode 100644 index 0000000000000000000000000000000000000000..47fbcb9365c818d6a281304caa1be8cc88acf41b --- /dev/null +++ b/exemples/coroutines-variables/src/runtime/executor.rs @@ -0,0 +1,184 @@ +use std::{ + cell::{Cell, RefCell}, + collections::HashMap, + sync::{Arc, Mutex}, + thread::{self, Thread}, +}; + +use crate::future::{Future, PollState}; + +// A `Waker` contains the thread, `std::thread::Thread`, +// from (`thread::current()`) which it is called in order to be `park()` or `unpark()`. This +// is not a robust method, as `park()` or `unpark()` could be called from several +// places in the code, resulting in potential deadlocks. We will use this +// method to ask the OS to wake up a thread at the right moment (or to put it to sleep). +// To be really generic, we should have a `Waker` feature, but that would be too +// complicated in the scope of this course. +// +// A waker contains its thread, an `id` which identifies the associated task, and a list of +// of task identifiers that are ready to be executed. This is necessary to add +// the current identifier to the list of tasks to be executed +#[derive(Clone)] +pub struct Waker { + thread: Thread, + id: usize, + ready_queue: Arc<Mutex<Vec<usize>>>, +} + +impl Waker { + // Wake means add the id of the task to the ready_queue and `unpark()` the associated thread. + // We can then call `poll()` on the task to see its progress. + // Of course we must lock the ready_queue first otherwise there may be a place where we are + // modifying concurrently the ready_queue which would cause a concurrent access + pub fn wake(&self) { + self.ready_queue + .lock() + .map(|mut queue| queue.push(self.id)) + .unwrap(); + self.thread.unpark(); + } +} + +// A Task is just something that implements the `Future` trait +type Task = Box<dyn Future<Output = String>>; + +// CURRENT_EXECUTOR is a static variable that is unique to the thread it was called from. It can +// not be accessed from any other thread +thread_local! { + static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default(); +} + +// The `ExecutorCore` holds the state of the `Executor`. +// +// The `tasks` is a `HashMap` between its `id` and the actual `Taksk` (anything that implements the +// `Future` trait). We only need a RefCell since it will never be accessed by different threads. +// A `RefCell` allow **interior** mutability (the thing stored inside can be mutated) but rules are +// checked at runtime instead of at compile time. Only one instance can hold a mutable reference at +// any given time (no need for Arc). `RefCell` allows to access a value by &mut T +// +// The `ready_queue` is the same as the queue for the `Waker`. It contains the list of tasks ready +// for execution. Each `Waker` having a copy and each `Waker` being on a different thread an +// `Arc<Mutex<_>>` of the ids is necessary here. +// +// The `next_id` is a counter giving us the top level id of each `Future` that is spawned. It is a +// simple `Cell` because the value inside will be replaced everything we need to mutate it. `Cell` +// does not allow to access a `&mut T` but we need to take the value out of the `Cell` and replace +// it wil something else. +#[derive(Default)] +struct ExecutorCore { + tasks: RefCell<HashMap<usize, Task>>, + ready_queue: Arc<Mutex<Vec<usize>>>, + next_id: Cell<usize>, +} + +// The `spawn` function does the following things: +// 1. Gets the next available id for our top level `Future` +// 2. Adds the id and the future, into our tasks HashMap +// 3. Adds the id into the list of ids that are ready to be executes (this would be their first +// executrion) +// 4. Prepares the next id (by incrementation by 1) +// +// Here `F` must have a 'static lifetime which means that the `Future` must live until the end of +// our program. In other terms it must outlive `spawn()` +pub fn spawn<F>(future: F) +where + F: Future<Output = String> + 'static, +{ + CURRENT_EXECUTOR.with(|e| { + let id = e.next_id.get(); + e.tasks.borrow_mut().insert(id, Box::new(future)); + e.ready_queue + .lock() + .map(|mut queue| queue.push(id)) + .unwrap(); + e.next_id.set(id + 1); + }); +} + +// The executor is all stored in tthe `ExecutorCore` which is a local static variable anyway, so we +// just create an empty streuct for a namespacing purpose +pub struct Executor; + +impl Executor { + // Just a new instance, since everything is done in the CURRENT_EXECUTOR static variable when a + // new thread is spawned + pub fn new() -> Self { + Self {} + } + + // Pops an id from the ready_queue list. Must of course be locked before doing anything. The + // queue works rather as a stack than as a queue but it's not very important. + fn pop_ready(&self) -> Option<usize> { + CURRENT_EXECUTOR.with(|e| e.ready_queue.lock().map(|mut queue| queue.pop()).unwrap()) + } + + // Returns the task that is related to the id and removes it from the tasks hashmap + // This will be used for polling. If the task returns not ready we will have to put it back + // again later + fn get_future(&self, id: usize) -> Option<Task> { + CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().remove(&id)) + } + + // Creates a new Waker from an id + fn get_waker(&self, id: usize) -> Waker { + Waker { + id, + thread: thread::current(), + ready_queue: CURRENT_EXECUTOR.with(|e| e.ready_queue.clone()), + } + } + + // Inserts a new task with a given id into the tasks HashMap + fn insert_task(&self, id: usize, task: Task) { + CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().insert(id, task)); + } + + // Counts how many tasks are left to be executed + fn task_count(&self) -> usize { + CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len()) + } + + pub fn block_on<F>(&mut self, future: F) + where + F: Future<Output = String> + 'static, + { + // New: OPTIMIZATION: It could be that the future is ready on the first call to block_on() and + // instead to do all these things here, we just want to return realy. + // We therefore create a waker with any ID, poll() and return early if we are Ready. The + // top level future is the one we are waiting and if it's ready we don't want to block + // until the end of ages + //let waker = self.get_waker(usize::MAX); + //let mut future = future; + //match future.poll(&waker) { + // PollState::NotReady => (), + // PollState::Ready(_) => return, + //} + // END OPTIMIZATION + + spawn(future); + loop { + while let Some(id) = self.pop_ready() { + let mut future = match self.get_future(id) { + Some(f) => f, + // guard against false wakeups + None => continue, + }; + let waker = self.get_waker(id); + match future.poll(&waker) { + PollState::NotReady => self.insert_task(id, future), + PollState::Ready(_) => continue, + } + } + let task_count = self.task_count(); + let name = String::from(thread::current().name().unwrap_or_default()); + + if task_count > 0 { + println!("{name}: {task_count}, pending tasks. Sleep until notified."); + thread::park(); + } else { + println!("{name}: All tasks finished"); + break; + } + } + } +} diff --git a/exemples/coroutines-variables/src/runtime/reactor.rs b/exemples/coroutines-variables/src/runtime/reactor.rs new file mode 100644 index 0000000000000000000000000000000000000000..9e1a264a096c059c070e3cb399a51bae8b6b3945 --- /dev/null +++ b/exemples/coroutines-variables/src/runtime/reactor.rs @@ -0,0 +1,102 @@ +use std::{ + collections::HashMap, + sync::{atomic::AtomicUsize, Arc, Mutex, OnceLock}, +}; + +use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token}; + +use crate::runtime::Waker; + +type Wakers = Arc<Mutex<HashMap<usize, Waker>>>; + +// The REACTOR may be accessed from variuous threads and must therefore be hidden behind a lock. +// Also it must never change one initialized when we start the reactor. +static REACTOR: OnceLock<Reactor> = OnceLock::new(); + +// To access the Reractor that will be borrowed from various places +pub fn reactor() -> &'static Reactor { + REACTOR.get().expect("Called outside a runtime context") +} + +// The reactor will be made of three fields: +// +// 1. A list of wakers that is a HashMap of id, Waker +// 2. The registry that will help us interact with the Poll event queue +// 3. The next available id for a Waker. +pub struct Reactor { + wakers: Wakers, + registry: Registry, + next_id: AtomicUsize, +} + +impl Reactor { + // We register interest for the `TcpStream` for a given interest and assign it the `id` in + // argumetn as a Token + pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) { + self.registry.register(stream, Token(id), interest).unwrap() + } + + // Adds a waker into the `Wakers` `HashMap` along with its `id`. We replace the `Waker` if one + // with the same `id` is already present + pub fn set_waker(&self, waker: &Waker, id: usize) { + self.wakers + .lock() + .map(|mut w| w.insert(id, waker.clone())) + .unwrap(); + } + + // Removes a `Waker` from our `HashMap` and deregisters the `TcpStream` from the `Poll` + // instance + pub fn deregister(&self, stream: &mut TcpStream, id: usize) { + self.wakers.lock().map(|mut w| w.remove(&id)).unwrap(); + self.registry.deregister(stream).unwrap(); + } + + // Increments the counter atomically. Relaxed just means atomicity. Other possiblities of + // Ordering can garantee that there is a certain ordering in how values are added from + // different threads. + pub fn next_id(&self) -> usize { + self.next_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + } +} + +// 1. We poll events without a timeout in an infinite loop +// 2. For each event we get the token associated to the events (it is the id that was passed upon +// creation of the interest) +// 3. We `wake()` the waker by id +fn event_loop(mut poll: Poll, wakers: Wakers) { + let mut events = Events::with_capacity(100); + loop { + poll.poll(&mut events, None).unwrap(); + events.iter().for_each(|e| { + let Token(id) = e.token(); + let wakers = wakers.lock().unwrap(); + + if let Some(waker) = wakers.get(&id) { + waker.wake(); + } + }); + } +} + +// This function starts the Reactor by: +// 1. Creating the Wakers HashMap and the Poll instance. +// 2. Cloning the registry used for the source (the TcpStream) +// 3. Creating a new Reactor +// 4. Initializing the static variable REACTOR and making sure no other instance is running +// 5. Starting the `envent_loop()` for this Poll and Wakers and move it into a separate thread (we +// never take ownership again of Poll) +pub fn start() { + let wakers = Arc::new(Mutex::new(HashMap::<usize, Waker>::new())); + let poll = Poll::new().unwrap(); + let registry = poll.registry().try_clone().unwrap(); + let next_id = AtomicUsize::new(1); + let reactor = Reactor { + wakers: wakers.clone(), + registry, + next_id, + }; + REACTOR.set(reactor).ok().expect("Rector already running"); + std::thread::spawn(move || event_loop(poll, wakers)); +}