diff --git a/08_runtimes_wakers_reactor_executor.md b/08_runtimes_wakers_reactor_executor.md index 782f98d1d9ec18d4f714d41eab40b056bccac54c..da41d9931ae6a12bb33fe42ca08883d9876535ef 100644 --- a/08_runtimes_wakers_reactor_executor.md +++ b/08_runtimes_wakers_reactor_executor.md @@ -1,4 +1,4 @@ ---- +-- title: "Runtimes, wakers et réacteurs-exécuteurs" author: "Orestis Malaspinas" date: "2025-03-24" @@ -111,7 +111,112 @@ fn main() { * On `poll()` la `Future` principale (dans le `main()`) * Cette `Future` doit `poll()` ses `Future` enfants et retourne que lorsque les deux sont `Ready()` * On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`) +* On veut plus de la boucle inifinie: mais utiliser `epoll()` +## Amélioration 1: éviter le sondage actif (3/) +```rust +static REGISTRY: OnceLock<Registry> = OnceLock::new(); +pub fn registry() -> &'static Registry { + REGISTRY.get().expect("Called outside a runtime context") +} +pub struct Runtime { + poll: Poll, +} +``` -* On veut plus de la boucle inifinie: mais utiliser `epoll()` +* `Poll` est la file d'évènements de `mio` et constitue notre `Runtime` +* `Register` sert à enregistrer l'intérêt à une **source** d'évènements (ici `TcpStream`) +* Interaction avec le `Registry` que via la fonction `registry()` qui s'assure + +## Amélioration 1: éviter le sondage actif (4/) + +```rust +impl Runtime { + pub fn new() -> Self { + let poll = Poll::new().unwrap(); + let registry = poll.registry().try_clone().unwrap(); + REGISTRY.set(registry).unwrap(); + Self { poll } + } + pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String> { + let mut future = future; + while let PollState::NotReady = future.poll() { + println!("Schedule other tasks"); + let mut events = Events::with_capacity(100); + self.poll.poll(&mut events, None).unwrap(); + } + } +} +``` + +* `new()`: nouvelle instance de `Poll` et du `Registry` associé + * Intialisation du `REGISTRY` global lors de la création du `Runtime` +* `block_on()`: argument qui implémente `Future` + * `future.poll()` des `Future` jusqu'à ce qu'ils soient `Ready(_)` + * `poll.poll()` est réveillé lorsque les évènements liés à la file surviennent + * rien à faire avec `events`, mais c'est ici qu'on va "bloquer" + +## Amélioration 1: éviter le sondage actif (5/) + +```rust +fn poll(&mut self) -> PollState<Self::Output> { + if self.stream.is_none() { + println!("First poll - start operation"); + self.write_request(); + // Registering the Tcp stream as a source of events in the poll instance + runtime::registry() + .register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE) + .unwrap(); + } + ... +} +``` + +* Lors du premier appel à `poll()`: + * on enregistre l'intérêt au `TcpStream` + * on essaie de lire `TcpStream` et on arrive dans `WouldBlock` (les données sont pas prêtes) + * on retourne `NotReady` +* On retourne à `block_on()` dans le `main()` et on attend le prochain réveil pour rappeler `poll()` + +## Amélioration 1: éviter le sondage actif (6/) + +```console +Program starting +First poll - start operation +Data not ready +Schedule other tasks +HTTP/1.1 200 OK +content-type: text/plain; charset=utf-8 +content-length: 15 +connection: close +date: Mon, 24 Mar 2025 07:56:51 GMT + +HelloAsyncAwait +First poll - start operation +Data not ready +Schedule other tasks +HTTP/1.1 200 OK +content-type: text/plain; charset=utf-8 +content-length: 15 +connection: close +date: Mon, 24 Mar 2025 07:56:52 GMT + +HelloAsyncAwait +``` + +* Un seul appel à "Schedule other tasks" et "Data not ready" +* On a bien le réveil qui se fait quand les tâches sont prêtes à continuer (pas d'appel continu à "Schedule other tasks") +* Pas de travail "inutile" + +## Amélioration 1: quelles améliorations encore + +* On rend le contrôle à l'OS quand on appelle `Poll::poll()` +* On a parlé de réacteur - exécuteur, mais deux sont très fortement couplés via `Poll::poll()` +* On aimerait séparer le couplage de la file d'évènement +* On va ajouer un `Waker` comme dans le design des `Future` de Rust + * Faire un lien entre réacteur et exécuteur + * Ajouter un Réacteur qui va gérer les `Waker` qui va être responsable de la communication avec l'exécuteur + + +## Amélioration 2: ajouter un réacteur et un waker (1/) diff --git a/codes/coroutines_better/src/http.rs b/codes/coroutines_better/src/http.rs index 976b1577ff53ad1cc95856210ebe3be5d396d070..e3a8246df612aa49e6e3c033ec540b8f270b4f86 100644 --- a/codes/coroutines_better/src/http.rs +++ b/codes/coroutines_better/src/http.rs @@ -72,6 +72,7 @@ impl Future for HttpGetFuture { if self.stream.is_none() { println!("First poll - start operation"); self.write_request(); + // Registering the Tcp stream as a source of events in the poll instance runtime::registry() .register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE) .unwrap(); diff --git a/codes/coroutines_better/src/main.rs b/codes/coroutines_better/src/main.rs index ac1fdc2670df8ace39aa641968d10e50a8a06a9f..45857d0fb11ed14d0a3ef47774d4d04cf1c658c2 100644 --- a/codes/coroutines_better/src/main.rs +++ b/codes/coroutines_better/src/main.rs @@ -11,6 +11,33 @@ fn main() { let mut runtime = Runtime::new(); runtime.block_on(future); } +// Our ptograms outputs the following. It is very important to note that there is only on Schedule +// other task and Data not ready calls that are shown. This is due to the event queue polling which +// only wakes the task when an event occured saving a lot of CPU cycles. In our older example we +// had to limit the number of calls in order to not get flooded with "Schedule other tasks" +// messages. This version is single threaded to keep things simple. +// +//Program starting +//First poll - start operation +//Data not ready +//Schedule other tasks +//HTTP/1.1 200 OK +//content-type: text/plain; charset=utf-8 +//content-length: 15 +//connection: close +//date: Mon, 24 Mar 2025 07:56:51 GMT +// +//HelloAsyncAwait +//First poll - start operation +//Data not ready +//Schedule other tasks +//HTTP/1.1 200 OK +//content-type: text/plain; charset=utf-8 +//content-length: 15 +//connection: close +//date: Mon, 24 Mar 2025 07:56:52 GMT +// +//HelloAsyncAwait // ================================= // We rewrite this: diff --git a/codes/coroutines_better/src/runtime.rs b/codes/coroutines_better/src/runtime.rs index dce52c283acc532577a3e4384c58f830897d25e8..905b1498167501d8592217cde78271ffd6a22688 100644 --- a/codes/coroutines_better/src/runtime.rs +++ b/codes/coroutines_better/src/runtime.rs @@ -1,50 +1,49 @@ -use std::future; use std::sync::OnceLock; use mio::{Events, Poll, Registry}; use crate::future::{Future, PollState}; -// Le Registry est la façon d'enregistrer notre intérêt pour une instance de Poll. En l'ocurrence, -// cea sera d'enregistrer les évènements sur notre `TcpStram` quand nous ferons une requête GET. -// Ainsi, nous voudrons accéder au Registry depuis l'intérieur de HttpGetFuture sans avoir besoin -// de passer des références de partout. C'est un peu moins élégant, mais cela est plus simple pour -// notre API. +// The Registry is the way we register our interest in a Poll instance. In this case, +// it will register events on our `TcpStream` when we make a GET request. +// In this way, we'll be able to access the Registry from inside HttpGetFuture without having to +// pass references all over the place. It's a little less elegant, but simpler for our API. // -// Ici nous utilisons un OnceLock<_> qui est une valeur dans laquelle on peut écrire qu'une seule fois -// et qui est non-initialisée quand elle est crée. Cette version est thread safe contrairement à sa -// soeur `OnceCell`. On veut intialiser le Registry que quand on démarre notre runtime et pas au -// démarrage du programme. Si le Runtime n'as pas été initialisé, toute tentative d'accès dans -// http::get(), on aura donc un message d'erreur. Le registry pourra être accédé que par la -// fonction `registry()` +// Here we're using an `OnceLock<_>`, which is a value that can only be written to once +// and is uninitialized when created. This version is thread safe, unlike its +// sister `OnceCell`. We want to intitialize the Registry only when we start our runtime and not at +// program startup. If the Runtime has not been initialized, any attempt to access it via +// http::get() will result in an error. The registry can only be accessed via the +// function `registry()` which throws an error if the Runtime was not properly initialized static REGISTRY: OnceLock<Registry> = OnceLock::new(); pub fn registry() -> &'static Registry { REGISTRY.get().expect("Called outside a runtime context") } -// Rien d'autre que notre instance de Poll (la file d'évènements) +// Nothing but our Poll instance (the event queue) pub struct Runtime { poll: Poll, } impl Runtime { - // On crée une nouvelle instance de notre file d'évènements et on initialise notre registe et - // notre Runtime avec. Ici on a une instance de `poll` qui est clonable (on pourrait l'accéder - // depuis plusieurs endroit), On stocke le registry comme une variable globale: c'est pas joli - // mais efficace + // We create a new instance of our event queue and initialize our registry and + // our Runtime with it. Here we have an instance of `poll` which is clonable (we could access it + // from several locations). We store the registry as a global variable: not pretty + // but efficient to keep things as simple as possible pub fn new() -> Self { let poll = Poll::new().unwrap(); + // creation of a copy of the Regfistry used to register a source of events let registry = poll.registry().try_clone().unwrap(); REGISTRY.set(registry).unwrap(); Self { poll } } - // 1. On peut bloquer sur n'imorte quelle fonction qui implémente le trait `Future` - // 2. On prnd l'ownership de future et on la rend mutable - // 3. On loop jusqu'à ce que la `Future` soit ready et on `poll()` pour un évènement quelconque - // qui pourrait ne rien avoir à faire avec notre future actuelle - // poll() ici attend qu'on ait un évènement READABLE dans le stream + // 1. Block on any function that implements the `Future` trait + // 2. Take ownership of future and make it mutable + // 3. Loop until `Future` is ready and `poll()` for any event + // that may have nothing to do with our current future + // poll() here waits until we have a READABLE event in the stream pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String>,