diff --git a/exemples/reactor_executor/Cargo.lock b/exemples/reactor_executor/Cargo.lock new file mode 100644 index 0000000000000000000000000000000000000000..4254be511d9da3b724268412d8f69e315dc5d5bd --- /dev/null +++ b/exemples/reactor_executor/Cargo.lock @@ -0,0 +1,113 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 4 + +[[package]] +name = "libc" +version = "0.2.171" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" + +[[package]] +name = "log" +version = "0.4.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys", +] + +[[package]] +name = "reactor_executor" +version = "0.1.0" +dependencies = [ + "mio", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/exemples/reactor_executor/Cargo.toml b/exemples/reactor_executor/Cargo.toml new file mode 100644 index 0000000000000000000000000000000000000000..117fa460fa78e25abd955fb3db2ec3c08ead0357 --- /dev/null +++ b/exemples/reactor_executor/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "reactor_executor" +version = "0.1.0" +edition = "2021" + +[dependencies] +mio = { version = "1.0.3", features = ["net", "os-poll"] } diff --git a/exemples/reactor_executor/src/future.rs b/exemples/reactor_executor/src/future.rs new file mode 100644 index 0000000000000000000000000000000000000000..1023ee2e62f26014c90d010d684c3c2314d0011f --- /dev/null +++ b/exemples/reactor_executor/src/future.rs @@ -0,0 +1,28 @@ +use crate::runtime::Waker; + +// Future trait looks like standard Future except for the missing context +// pub trait Future { +// type Output; +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; +//} +// We now also have the `Waker` as an argument of the `Future` trait. We see that in the standard +// library the Waker is hidden in a Context which we do not need here +pub trait Future { + type Output; + + // TODO: + // Add the Waker into poll + fn poll(&mut self) -> PollState<Self::Output>; +} + +// Similar to the Poll #[derive(Debug)] +// pub enum Poll<T> { +// Ready(T), +// Pending, +// } +pub enum PollState<T> { + Ready(T), + NotReady, +} + +// Exercise adapt the join_all function to accept the Waker structs diff --git a/exemples/reactor_executor/src/http.rs b/exemples/reactor_executor/src/http.rs new file mode 100644 index 0000000000000000000000000000000000000000..07b782f8b9d778444e9573a68aa823c2bbf5ed15 --- /dev/null +++ b/exemples/reactor_executor/src/http.rs @@ -0,0 +1,112 @@ +use mio::Interest; + +use crate::future::{Future, PollState}; +use crate::runtime::{self, reactor, Waker}; +use std::io::{ErrorKind, Read, Write}; + +// Simple helper function to write GET requests +fn get_req(path: &str) -> String { + format!( + "GET {path} HTTP/1.1\r\n\ + Host: localhost\r\n\ + Connection: close\r\n\ + \r\n" + ) +} + +pub struct Http; + +impl Http { + pub fn get(path: &str) -> impl Future<Output = String> { + HttpGetFuture::new(path) + } +} + +// The id of the `Future` is also a needed information to register it for the `Waker` and `Poll` +struct HttpGetFuture { + // Option since we will not conect upon creation + stream: Option<mio::net::TcpStream>, + // buffer to read from the TcpStream + buffer: Vec<u8>, + // The GET request we will construct + path: String, + // NEW: The id of our Future + id: usize, +} + +impl HttpGetFuture { + fn new(path: &str) -> Self { + unimplemented!(); + // TODO: + // get the id from the next_id of the reactor + let id = 1; + Self { + stream: None, + buffer: vec![], + path: String::from(path), + id, + } + } + + // Writes the request to a non bocking stream. Here we are using mio t connect to the socket + // We just create the stream and do not perform any request here + fn write_request(&mut self) { + let stream = std::net::TcpStream::connect("localhost:8080").unwrap(); + stream.set_nonblocking(true).unwrap(); + let mut stream = mio::net::TcpStream::from_std(stream); + stream.write_all(get_req(&self.path).as_bytes()).unwrap(); + self.stream = Some(stream); + } +} + +impl Future for HttpGetFuture { + type Output = String; + + // TODO: + // Add the waker into poll() + fn poll(&mut self) -> PollState<Self::Output> { + if self.stream.is_none() { + println!("First poll - start operation"); + self.write_request(); + //TODO: + // From the reactor() + // 1. Registering the Tcp stream as a source of events in the poll instance + // 2. We also set the `Waker` responsible for handling the `Future` with the self.id + } + let mut buf = vec![0u8; 4096]; + // We loop until everything is read and return Ready when done + loop { + // We listen on the socket and read + match self.stream.as_mut().unwrap().read(&mut buf) { + // 0 bytes read so we are finished we got the entire GET response + Ok(0) => { + let s = String::from_utf8_lossy(&self.buffer); + // TODO: + // The future is over we deregister our interest from this source of events for + // the current id + break PollState::Ready(String::from(s)); + } + // We read n bytes so we continue reading + Ok(n) => { + self.buffer.extend(&buf[0..n]); + continue; + } + // This error indicates that the data is not ready yet or that that there is more + // data we did not received yet + Err(e) if e.kind() == ErrorKind::WouldBlock => { + println!("Data not ready"); + // TODO: + // We need to set the `waker` again since the data is not ready + break PollState::NotReady; + } + // This error can happen if a signal interrupted the read (which can happen) we + // handle this case by reading again + Err(e) if e.kind() == ErrorKind::Interrupted => { + continue; + } + // Nothing we can do. We panic! + Err(e) => panic!("{e:?}"), + } + } + } +} diff --git a/exemples/reactor_executor/src/main.rs b/exemples/reactor_executor/src/main.rs new file mode 100644 index 0000000000000000000000000000000000000000..6532eb6226ca0fb3f6d82db100537e1a6ba75df3 --- /dev/null +++ b/exemples/reactor_executor/src/main.rs @@ -0,0 +1,79 @@ +mod future; +mod http; +mod runtime; + +use future::{Future, PollState}; +use http::Http; +use runtime::Waker; + +fn main() { + let mut executor = runtime::init(); + executor.block_on(async_main()); +} + +fn async_main() -> impl Future<Output = String> { + Coroutine::new() +} + +enum State { + Start, + Wait1(Box<dyn Future<Output = String>>), + Wait2(Box<dyn Future<Output = String>>), + Resolved, +} + +struct Coroutine { + state: State, +} + +impl Coroutine { + fn new() -> Self { + Self { + state: State::Start, + } + } +} + +// We need to adapt `poll()` and the subsequent calls of the children `Future` to have an extra +// argument: the `Waker`. +impl Future for Coroutine { + type Output = String; + + //TODO: + //add Waker into poll + fn poll(&mut self) -> PollState<Self::Output> { + loop { + match self.state { + State::Start => { + println!("Program starting"); + let fut1 = Box::new(Http::get("/600/HelloAsyncAwait")); + self.state = State::Wait1(fut1); + } + + //TODO: + //add Waker into poll + State::Wait1(ref mut future) => match future.poll() { + PollState::Ready(txt) => { + println!("{txt}"); + let fut2 = Box::new(Http::get("/400/HelloAsyncAwait")); + self.state = State::Wait2(fut2); + } + PollState::NotReady => break PollState::NotReady, + }, + + //TODO: + //add Waker into poll + State::Wait2(ref mut future) => match future.poll() { + PollState::Ready(txt) => { + println!("{txt}"); + self.state = State::Resolved; + break PollState::Ready(String::new()); + } + PollState::NotReady => break PollState::NotReady, + }, + + State::Resolved => panic!("Polled a resolved future"), + } + } + } +} diff --git a/exemples/reactor_executor/src/runtime.rs b/exemples/reactor_executor/src/runtime.rs new file mode 100644 index 0000000000000000000000000000000000000000..e87aed2a804a8ce64b68fd5970ba77a04ceb3843 --- /dev/null +++ b/exemples/reactor_executor/src/runtime.rs @@ -0,0 +1,10 @@ +mod executor; +mod reactor; + +pub use executor::{Executor, Waker}; +pub use reactor::reactor; + +pub fn init() -> Executor { + reactor::start(); + Executor::new() +} diff --git a/exemples/reactor_executor/src/runtime/executor.rs b/exemples/reactor_executor/src/runtime/executor.rs new file mode 100644 index 0000000000000000000000000000000000000000..7b1827c24a38d2a448f419576a6012abf4d84411 --- /dev/null +++ b/exemples/reactor_executor/src/runtime/executor.rs @@ -0,0 +1,114 @@ +use std::{ + cell::{Cell, RefCell}, + collections::HashMap, + sync::{Arc, Mutex}, + thread::{self, Thread}, +}; + +use crate::future::{Future, PollState}; + +#[derive(Clone)] +pub struct Waker { + thread: Thread, + id: usize, + ready_queue: Arc<Mutex<Vec<usize>>>, +} + +impl Waker { + pub fn wake(&self) { + // TODO: + // 1. push id into the ready_queue + // 2. unpark the thread + unimplemented!() + } +} + +type Task = Box<dyn Future<Output = String>>; + +thread_local! { + static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default(); +} + +#[derive(Default)] +struct ExecutorCore { + tasks: RefCell<HashMap<usize, Task>>, + ready_queue: Arc<Mutex<Vec<usize>>>, + next_id: Cell<usize>, +} + +pub fn spawn<F>(future: F) +where + F: Future<Output = String> + 'static, +{ + unimplemented!(); + // TODO: + CURRENT_EXECUTOR.with(|e| { + // 1. insert (next_id and future) + // 2. push id into ready_queue + // 3. add one to next_id + }); +} + +pub struct Executor; +impl Executor { + pub fn new() -> Self { + Self {} + } + + fn pop_ready(&self) -> Option<usize> { + // TODO: + // pop from ready queue, CURRENT_EXECUTOR.with + // dont forget the lock() + unimplemented!() + } + fn get_future(&self, id: usize) -> Option<Task> { + // TODO: + // remove &id from tasks thanks to borrow_mut + unimplemented!() + } + fn get_waker(&self, id: usize) -> Waker { + // TODO: + // create a new waker from: id, thread::current(), and a clone from ready_queue + unimplemented!() + } + + // Inserts a new task with a given id into the tasks HashMap + fn insert_task(&self, id: usize, task: Task) { + // TODO: + //insert a pair id, task into tasks + unimplemented!() + } + + // Counts how many tasks are left to be executed + fn task_count(&self) -> usize { + CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len()) + } + + pub fn block_on<F>(&mut self, future: F) + where + F: Future<Output = String> + 'static, + { + // TODO: + // 1. spawn a future + loop { + // TODO: + // 1. while there are something left into ready_queue, pop_ready the id + // 2. get the future by id + // 3. get a waker by id + // 4. poll the future + // 5. if NotReady => insert the (id, future) + // 6. else continue + // + let task_count = self.task_count(); + let name = String::from(thread::current().name().unwrap_or_default()); + + // TODO: if not finished park() + // otherwise break + if task_count > 0 { + println!("{name}: {task_count}, pending tasks. Sleep until notified."); + } else { + println!("{name}: All tasks finished"); + } + } + } +} diff --git a/exemples/reactor_executor/src/runtime/reactor.rs b/exemples/reactor_executor/src/runtime/reactor.rs new file mode 100644 index 0000000000000000000000000000000000000000..bae730151c8b87e95e7212b452a7bf78da889b3d --- /dev/null +++ b/exemples/reactor_executor/src/runtime/reactor.rs @@ -0,0 +1,71 @@ +use std::{ + collections::HashMap, + sync::{atomic::AtomicUsize, Arc, Mutex, OnceLock}, +}; + +use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token}; + +use crate::runtime::Waker; + +type Wakers = Arc<Mutex<HashMap<usize, Waker>>>; + +static REACTOR: OnceLock<Reactor> = OnceLock::new(); + +pub fn reactor() -> &'static Reactor { + REACTOR.get().expect("Called outside a runtime context") +} + +pub struct Reactor { + wakers: Wakers, + registry: Registry, + next_id: AtomicUsize, +} + +impl Reactor { + pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) { + // TODO: + // register on the registry for stream, id, and interest + unimplemented!() + } + + pub fn set_waker(&self, waker: &Waker, id: usize) { + // TODO: + // insert a pair id, waker into wakers + unimplemented!() + } + + pub fn deregister(&self, stream: &mut TcpStream, id: usize) { + //TODO: + // 1. remove the if from the wakers (don't forget to lock/map) + // 2. deregister the stream from the registry + unimplemented!() + } + pub fn next_id(&self) -> usize { + self.next_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + } +} + +fn event_loop(mut poll: Poll, wakers: Wakers) { + unimplemented!(); + let mut events = Events::with_capacity(100); + loop { + // TODO: + // 1. poll event with None timeout + // 2. for_each event get the .token() + // 3. lock the wakers and the the waker by id + // 4. wake() the waker + } +} + +pub fn start() { + unimplemented!(); + // TODO: + // 1. create a new empty wakers HashMap<usize, Waker> + // 2. create a new Poll + // 3. create a try_clone poll.registry() + // 4. initialize a new next_id to 1 + // 5. create a new Reactor + REACTOR.set(reactor).ok().expect("Rector already running"); + // 6. move the event loop of poll and wakers into a new system thread: +}