From 5cb435e6f12906d24ac05860ceba579e3800e62e Mon Sep 17 00:00:00 2001 From: Orestis <orestis.malaspinas@pm.me> Date: Sun, 23 Mar 2025 23:13:22 +0100 Subject: [PATCH] en cours --- 08_runtimes_wakers_reactor_executor.md | 117 +++++++++++++++++++++++++ codes/coroutines_better/Cargo.lock | 113 ++++++++++++++++++++++++ codes/coroutines_better/Cargo.toml | 7 ++ codes/coroutines_better/src/future.rs | 20 +++++ codes/coroutines_better/src/http.rs | 110 +++++++++++++++++++++++ codes/coroutines_better/src/main.rs | 87 ++++++++++++++++++ codes/coroutines_better/src/runtime.rs | 59 +++++++++++++ 7 files changed, 513 insertions(+) create mode 100644 08_runtimes_wakers_reactor_executor.md create mode 100644 codes/coroutines_better/Cargo.lock create mode 100644 codes/coroutines_better/Cargo.toml create mode 100644 codes/coroutines_better/src/future.rs create mode 100644 codes/coroutines_better/src/http.rs create mode 100644 codes/coroutines_better/src/main.rs create mode 100644 codes/coroutines_better/src/runtime.rs diff --git a/08_runtimes_wakers_reactor_executor.md b/08_runtimes_wakers_reactor_executor.md new file mode 100644 index 0000000..782f98d --- /dev/null +++ b/08_runtimes_wakers_reactor_executor.md @@ -0,0 +1,117 @@ +--- +title: "Runtimes, wakers et réacteurs-exécuteurs" +author: "Orestis Malaspinas" +date: "2025-03-24" +patat: + slideNumber: true + wrap: true + margins: + left: 10 + right: 10 + top: auto + slideLevel: 2 + images: + backend: kitty +... + +# Introduction + +## Programme + +* Pourquoi avons-nous besoin d'un runtime +* Amélioration de notre exemple de base +* Partie 1: Ajout d'un réacteur et d'un waker +* Partie 2: Implémentation d'un exécuteur +* Partie 3: Implémentation d'un réacteur +* Enjoy: quelques expériences + +## Runtimes existants + +* Embassy (systèmes embarqués) +* Tokio (très complet et populaire) +* Smol (plus simple et limité) +* Async-std (tentative de tout "async"-iser) + +## Schéma + +```console + ORDONNANCEMENT D'UNE TACHE + + ┌────────────────────┐ difficile + │Ordonnanceur de l'OS│◄───────────────┐ + └────────────────────┘ │ + │ │ + ▼ │ + ┌──────────────┐ │ + │Tache (Thread)│ │ + └───────┬──────┘ │ + ▼ │ + plus simple ┌────────────────────┐ │ + ┌─────────────►│Ordonnanceur dans │ │ + │ │l'espace utilisateur│ │ + │ ┌─────────┴─────────┬──────────┴────────┐ │ + │ │ │ │ │ + │ ▼ ▼ ▼ │ + ┌┴───────┐ ┌────────┐ ┌────────┐ │ + │ Tache │ │ Tache │ │ Tache ├──┘ + │(Future)│ │(Future)│ │(Future)│ + └────────┘ └────────┘ └────────┘ +``` + +## Pourquoi avons nous besoin d'un runtime? + +* Runtime: ordonnancer les tâches et faire progresser les `Future` +* Ordonnancement fait par l'OS pour les thread système: aucun cnotrôle (et très complexe) +* Contrôle plus fin et gérable à l'aide d'un orodonnanceur dans l'espace utilisateur (userspace) +* Plus léger que les threads système (on peut en avoir 100k) + +## Architecture d'un Runtime + +* Architecture: réacteur - exécuteur +* Réacteur: + * Réagis à un évènement et les dispatche à un gestionnaire (boucle d'évènements) + * P. ex. évènement I/O (Stream TCP qui est prêt à être lu), timer qui expire, etc. + * Nécessaire d'avoir un "système d'abonnment" (intégration forte avec l'I/O) +* Exécuteur: + * Décide qui a droit à du temps CPU pour fire progresser une tâche + * Responsable d'appeler `Future::poll()` + +## Amélioration 1: éviter le sondage actif (1/) + +```rust +coroutine fn async_main() -> impl Future<Output = String> { + println!("Program starting"); + let txt = Http::get("/600/HelloAsyncAwait").wait; + println!("{txt}"); + let txt = Http::get("/400/HelloAsyncAwait").wait; + println!("{txt}"); +} +fn main() { + let mut future = async_main(); + while let PollState::NotReady = future.poll() { + println!("Schedule other tasks"); + } +} +``` + +* On `poll()` la `Future` principale (dans le `main()`) +* Cette `Future` doit `poll()` ses `Future` enfants et retourne que lorsque les deux sont `Ready()` +* On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`) + +## Amélioration 1: éviter le sondage actif (2/) + +```rust +fn main() { + let future = async_main(); + let mut runtime = Runtime::new(); + runtime.block_on(future); +} +``` + +* On `poll()` la `Future` principale (dans le `main()`) +* Cette `Future` doit `poll()` ses `Future` enfants et retourne que lorsque les deux sont `Ready()` +* On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`) + + + +* On veut plus de la boucle inifinie: mais utiliser `epoll()` diff --git a/codes/coroutines_better/Cargo.lock b/codes/coroutines_better/Cargo.lock new file mode 100644 index 0000000..426d5ed --- /dev/null +++ b/codes/coroutines_better/Cargo.lock @@ -0,0 +1,113 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "coroutines_better" +version = "0.1.0" +dependencies = [ + "mio", +] + +[[package]] +name = "libc" +version = "0.2.171" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6" + +[[package]] +name = "log" +version = "0.4.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" + +[[package]] +name = "mio" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" +dependencies = [ + "libc", + "log", + "wasi", + "windows-sys", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" diff --git a/codes/coroutines_better/Cargo.toml b/codes/coroutines_better/Cargo.toml new file mode 100644 index 0000000..8318b4d --- /dev/null +++ b/codes/coroutines_better/Cargo.toml @@ -0,0 +1,7 @@ +[package] +name = "coroutines_better" +version = "0.1.0" +edition = "2021" + +[dependencies] +mio = { version = "1.0.3", features = ["net", "os-poll"] } diff --git a/codes/coroutines_better/src/future.rs b/codes/coroutines_better/src/future.rs new file mode 100644 index 0000000..40ec06d --- /dev/null +++ b/codes/coroutines_better/src/future.rs @@ -0,0 +1,20 @@ +// Future trait looks like standard Future except for the missing context +// pub trait Future { +// type Output; +// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>; +//} +pub trait Future { + type Output; + + fn poll(&mut self) -> PollState<Self::Output>; +} + +// Similar to the Poll #[derive(Debug)] +// pub enum Poll<T> { +// Ready(T), +// Pending, +// } +pub enum PollState<T> { + Ready(T), + NotReady, +} diff --git a/codes/coroutines_better/src/http.rs b/codes/coroutines_better/src/http.rs new file mode 100644 index 0000000..976b157 --- /dev/null +++ b/codes/coroutines_better/src/http.rs @@ -0,0 +1,110 @@ +use mio::{Interest, Token}; + +use crate::future::{Future, PollState}; +use crate::runtime; +use std::io::{ErrorKind, Read, Write}; + +// Simple helper function to write GET requests +fn get_req(path: &str) -> String { + format!( + "GET {path} HTTP/1.1\r\n\ + Host: localhost\r\n\ + Connection: close\r\n\ + \r\n" + ) +} + +pub struct Http; + +impl Http { + pub fn get(path: &str) -> impl Future<Output = String> { + HttpGetFuture::new(path) + } +} + +struct HttpGetFuture { + // Option since we will not conect upon creation + stream: Option<mio::net::TcpStream>, + // buffer to read from the TcpStream + buffer: Vec<u8>, + // The GET request we will construct + path: String, +} + +impl HttpGetFuture { + // Create the new HttpGetFuture with default state + fn new(path: &str) -> Self { + Self { + stream: None, + buffer: vec![], + path: String::from(path), + } + } + + // Writes the request to a non bocking stream. Here we are using mio t connect to the socket + // We just create the stream and do not perform any request here + fn write_request(&mut self) { + let stream = std::net::TcpStream::connect("localhost:8080").unwrap(); + stream.set_nonblocking(true).unwrap(); + let mut stream = mio::net::TcpStream::from_std(stream); + stream.write_all(get_req(&self.path).as_bytes()).unwrap(); + self.stream = Some(stream); + } +} + +impl Future for HttpGetFuture { + type Output = String; + + // We have three different states here: + // 1. stream is None, therefore we have not started executing anything (State would be + // NotStarted) + // 2. stream is Some and read returns WouldBlock (State would be Pending) + // 3. stream is Some and read returns 0 (State would be Resolved) + // These states are implicit here but we actually have some kind of state machine + fn poll(&mut self) -> PollState<Self::Output> { + // First poll we write the GET request to the server and register our interest into read + // evnts on te TcpStream. We then directly poll the stream immediatly (instead of returning + // NotReady). + // + // It is very important to note that we call poll() at least once before doing anything so + // we actually start doing something not when we create the future but when we await it + // This is an example of Lazy evaluation (we only do something when we poll not before) + if self.stream.is_none() { + println!("First poll - start operation"); + self.write_request(); + runtime::registry() + .register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE) + .unwrap(); + } + let mut buf = vec![0u8; 4096]; + // We loop until everything is read and return Ready when done + loop { + // We listen on the socket and read + match self.stream.as_mut().unwrap().read(&mut buf) { + // 0 bytes read so we are finished we got the entire GET response + Ok(0) => { + let s = String::from_utf8_lossy(&self.buffer); + break PollState::Ready(String::from(s)); + } + // We read n bytes so we continue reading + Ok(n) => { + self.buffer.extend(&buf[0..n]); + continue; + } + // This error indicates that the data is not ready yet or that that there is more + // data we did not received yet + Err(e) if e.kind() == ErrorKind::WouldBlock => { + println!("Data not ready"); + break PollState::NotReady; + } + // This error can happen if a signal interrupted the read (which can happen) we + // handle this case by reading again + Err(e) if e.kind() == ErrorKind::Interrupted => { + continue; + } + // Nothing we can do. We panic! + Err(e) => panic!("{e:?}"), + } + } + } +} diff --git a/codes/coroutines_better/src/main.rs b/codes/coroutines_better/src/main.rs new file mode 100644 index 0000000..ac1fdc2 --- /dev/null +++ b/codes/coroutines_better/src/main.rs @@ -0,0 +1,87 @@ +mod future; +mod http; +mod runtime; + +use future::{Future, PollState}; +use http::Http; +use runtime::Runtime; + +fn main() { + let future = async_main(); + let mut runtime = Runtime::new(); + runtime.block_on(future); +} + +// ================================= +// We rewrite this: +// ================================= +// coroutine fn async_main() -> impl Future<Output = String> { +// println!("Program starting"); +// let txt = Http::get("/600/HelloAsyncAwait").wait; +// println!("{txt}"); +// let txt = Http::get("/400/HelloAsyncAwait").wait; +// println!("{txt}"); +// } + +// ================================= +// Into this: +// ================================= + +fn async_main() -> impl Future<Output = String> { + Coroutine::new() +} + +enum State { + Start, + Wait1(Box<dyn Future<Output = String>>), + Wait2(Box<dyn Future<Output = String>>), + Resolved, +} + +struct Coroutine { + state: State, +} + +impl Coroutine { + fn new() -> Self { + Self { + state: State::Start, + } + } +} + +impl Future for Coroutine { + type Output = String; + + fn poll(&mut self) -> PollState<Self::Output> { + loop { + match self.state { + State::Start => { + println!("Program starting"); + let fut1 = Box::new(Http::get("/600/HelloAsyncAwait")); + self.state = State::Wait1(fut1); + } + + State::Wait1(ref mut future) => match future.poll() { + PollState::Ready(txt) => { + println!("{txt}"); + let fut2 = Box::new(Http::get("/400/HelloAsyncAwait")); + self.state = State::Wait2(fut2); + } + PollState::NotReady => break PollState::NotReady, + }, + + State::Wait2(ref mut future) => match future.poll() { + PollState::Ready(txt) => { + println!("{txt}"); + self.state = State::Resolved; + break PollState::Ready(String::new()); + } + PollState::NotReady => break PollState::NotReady, + }, + + State::Resolved => panic!("Polled a resolved future"), + } + } + } +} diff --git a/codes/coroutines_better/src/runtime.rs b/codes/coroutines_better/src/runtime.rs new file mode 100644 index 0000000..dce52c2 --- /dev/null +++ b/codes/coroutines_better/src/runtime.rs @@ -0,0 +1,59 @@ +use std::future; +use std::sync::OnceLock; + +use mio::{Events, Poll, Registry}; + +use crate::future::{Future, PollState}; + +// Le Registry est la façon d'enregistrer notre intérêt pour une instance de Poll. En l'ocurrence, +// cea sera d'enregistrer les évènements sur notre `TcpStram` quand nous ferons une requête GET. +// Ainsi, nous voudrons accéder au Registry depuis l'intérieur de HttpGetFuture sans avoir besoin +// de passer des références de partout. C'est un peu moins élégant, mais cela est plus simple pour +// notre API. +// +// Ici nous utilisons un OnceLock<_> qui est une valeur dans laquelle on peut écrire qu'une seule fois +// et qui est non-initialisée quand elle est crée. Cette version est thread safe contrairement à sa +// soeur `OnceCell`. On veut intialiser le Registry que quand on démarre notre runtime et pas au +// démarrage du programme. Si le Runtime n'as pas été initialisé, toute tentative d'accès dans +// http::get(), on aura donc un message d'erreur. Le registry pourra être accédé que par la +// fonction `registry()` +static REGISTRY: OnceLock<Registry> = OnceLock::new(); + +pub fn registry() -> &'static Registry { + REGISTRY.get().expect("Called outside a runtime context") +} + +// Rien d'autre que notre instance de Poll (la file d'évènements) +pub struct Runtime { + poll: Poll, +} + +impl Runtime { + // On crée une nouvelle instance de notre file d'évènements et on initialise notre registe et + // notre Runtime avec. Ici on a une instance de `poll` qui est clonable (on pourrait l'accéder + // depuis plusieurs endroit), On stocke le registry comme une variable globale: c'est pas joli + // mais efficace + pub fn new() -> Self { + let poll = Poll::new().unwrap(); + let registry = poll.registry().try_clone().unwrap(); + REGISTRY.set(registry).unwrap(); + Self { poll } + } + + // 1. On peut bloquer sur n'imorte quelle fonction qui implémente le trait `Future` + // 2. On prnd l'ownership de future et on la rend mutable + // 3. On loop jusqu'à ce que la `Future` soit ready et on `poll()` pour un évènement quelconque + // qui pourrait ne rien avoir à faire avec notre future actuelle + // poll() ici attend qu'on ait un évènement READABLE dans le stream + pub fn block_on<F>(&mut self, future: F) + where + F: Future<Output = String>, + { + let mut future = future; + while let PollState::NotReady = future.poll() { + println!("Schedule other tasks"); + let mut events = Events::with_capacity(100); + self.poll.poll(&mut events, None).unwrap(); + } + } +} -- GitLab