Skip to content
Snippets Groups Projects
Verified Commit 5cb435e6 authored by orestis.malaspin's avatar orestis.malaspin
Browse files

en cours

parent ebc53229
No related branches found
No related tags found
No related merge requests found
Pipeline #38718 passed
---
title: "Runtimes, wakers et réacteurs-exécuteurs"
author: "Orestis Malaspinas"
date: "2025-03-24"
patat:
slideNumber: true
wrap: true
margins:
left: 10
right: 10
top: auto
slideLevel: 2
images:
backend: kitty
...
# Introduction
## Programme
* Pourquoi avons-nous besoin d'un runtime
* Amélioration de notre exemple de base
* Partie 1: Ajout d'un réacteur et d'un waker
* Partie 2: Implémentation d'un exécuteur
* Partie 3: Implémentation d'un réacteur
* Enjoy: quelques expériences
## Runtimes existants
* Embassy (systèmes embarqués)
* Tokio (très complet et populaire)
* Smol (plus simple et limité)
* Async-std (tentative de tout "async"-iser)
## Schéma
```console
ORDONNANCEMENT D'UNE TACHE
┌────────────────────┐ difficile
│Ordonnanceur de l'OS│◄───────────────┐
└────────────────────┘ │
│ │
▼ │
┌──────────────┐ │
│Tache (Thread)│ │
└───────┬──────┘ │
▼ │
plus simple ┌────────────────────┐ │
┌─────────────►│Ordonnanceur dans │ │
│ │l'espace utilisateur│ │
│ ┌─────────┴─────────┬──────────┴────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
┌┴───────┐ ┌────────┐ ┌────────┐ │
│ Tache │ │ Tache │ │ Tache ├──┘
│(Future)│ │(Future)│ │(Future)│
└────────┘ └────────┘ └────────┘
```
## Pourquoi avons nous besoin d'un runtime?
* Runtime: ordonnancer les tâches et faire progresser les `Future`
* Ordonnancement fait par l'OS pour les thread système: aucun cnotrôle (et très complexe)
* Contrôle plus fin et gérable à l'aide d'un orodonnanceur dans l'espace utilisateur (userspace)
* Plus léger que les threads système (on peut en avoir 100k)
## Architecture d'un Runtime
* Architecture: réacteur - exécuteur
* Réacteur:
* Réagis à un évènement et les dispatche à un gestionnaire (boucle d'évènements)
* P. ex. évènement I/O (Stream TCP qui est prêt à être lu), timer qui expire, etc.
* Nécessaire d'avoir un "système d'abonnment" (intégration forte avec l'I/O)
* Exécuteur:
* Décide qui a droit à du temps CPU pour fire progresser une tâche
* Responsable d'appeler `Future::poll()`
## Amélioration 1: éviter le sondage actif (1/)
```rust
coroutine fn async_main() -> impl Future<Output = String> {
println!("Program starting");
let txt = Http::get("/600/HelloAsyncAwait").wait;
println!("{txt}");
let txt = Http::get("/400/HelloAsyncAwait").wait;
println!("{txt}");
}
fn main() {
let mut future = async_main();
while let PollState::NotReady = future.poll() {
println!("Schedule other tasks");
}
}
```
* On `poll()` la `Future` principale (dans le `main()`)
* Cette `Future` doit `poll()` ses `Future` enfants et retourne que lorsque les deux sont `Ready()`
* On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`)
## Amélioration 1: éviter le sondage actif (2/)
```rust
fn main() {
let future = async_main();
let mut runtime = Runtime::new();
runtime.block_on(future);
}
```
* On `poll()` la `Future` principale (dans le `main()`)
* Cette `Future` doit `poll()` ses `Future` enfants et retourne que lorsque les deux sont `Ready()`
* On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`)
* On veut plus de la boucle inifinie: mais utiliser `epoll()`
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "coroutines_better"
version = "0.1.0"
dependencies = [
"mio",
]
[[package]]
name = "libc"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "log"
version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
[[package]]
name = "mio"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[package]
name = "coroutines_better"
version = "0.1.0"
edition = "2021"
[dependencies]
mio = { version = "1.0.3", features = ["net", "os-poll"] }
// Future trait looks like standard Future except for the missing context
// pub trait Future {
// type Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
//}
pub trait Future {
type Output;
fn poll(&mut self) -> PollState<Self::Output>;
}
// Similar to the Poll #[derive(Debug)]
// pub enum Poll<T> {
// Ready(T),
// Pending,
// }
pub enum PollState<T> {
Ready(T),
NotReady,
}
use mio::{Interest, Token};
use crate::future::{Future, PollState};
use crate::runtime;
use std::io::{ErrorKind, Read, Write};
// Simple helper function to write GET requests
fn get_req(path: &str) -> String {
format!(
"GET {path} HTTP/1.1\r\n\
Host: localhost\r\n\
Connection: close\r\n\
\r\n"
)
}
pub struct Http;
impl Http {
pub fn get(path: &str) -> impl Future<Output = String> {
HttpGetFuture::new(path)
}
}
struct HttpGetFuture {
// Option since we will not conect upon creation
stream: Option<mio::net::TcpStream>,
// buffer to read from the TcpStream
buffer: Vec<u8>,
// The GET request we will construct
path: String,
}
impl HttpGetFuture {
// Create the new HttpGetFuture with default state
fn new(path: &str) -> Self {
Self {
stream: None,
buffer: vec![],
path: String::from(path),
}
}
// Writes the request to a non bocking stream. Here we are using mio t connect to the socket
// We just create the stream and do not perform any request here
fn write_request(&mut self) {
let stream = std::net::TcpStream::connect("localhost:8080").unwrap();
stream.set_nonblocking(true).unwrap();
let mut stream = mio::net::TcpStream::from_std(stream);
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
self.stream = Some(stream);
}
}
impl Future for HttpGetFuture {
type Output = String;
// We have three different states here:
// 1. stream is None, therefore we have not started executing anything (State would be
// NotStarted)
// 2. stream is Some and read returns WouldBlock (State would be Pending)
// 3. stream is Some and read returns 0 (State would be Resolved)
// These states are implicit here but we actually have some kind of state machine
fn poll(&mut self) -> PollState<Self::Output> {
// First poll we write the GET request to the server and register our interest into read
// evnts on te TcpStream. We then directly poll the stream immediatly (instead of returning
// NotReady).
//
// It is very important to note that we call poll() at least once before doing anything so
// we actually start doing something not when we create the future but when we await it
// This is an example of Lazy evaluation (we only do something when we poll not before)
if self.stream.is_none() {
println!("First poll - start operation");
self.write_request();
runtime::registry()
.register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE)
.unwrap();
}
let mut buf = vec![0u8; 4096];
// We loop until everything is read and return Ready when done
loop {
// We listen on the socket and read
match self.stream.as_mut().unwrap().read(&mut buf) {
// 0 bytes read so we are finished we got the entire GET response
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer);
break PollState::Ready(String::from(s));
}
// We read n bytes so we continue reading
Ok(n) => {
self.buffer.extend(&buf[0..n]);
continue;
}
// This error indicates that the data is not ready yet or that that there is more
// data we did not received yet
Err(e) if e.kind() == ErrorKind::WouldBlock => {
println!("Data not ready");
break PollState::NotReady;
}
// This error can happen if a signal interrupted the read (which can happen) we
// handle this case by reading again
Err(e) if e.kind() == ErrorKind::Interrupted => {
continue;
}
// Nothing we can do. We panic!
Err(e) => panic!("{e:?}"),
}
}
}
}
mod future;
mod http;
mod runtime;
use future::{Future, PollState};
use http::Http;
use runtime::Runtime;
fn main() {
let future = async_main();
let mut runtime = Runtime::new();
runtime.block_on(future);
}
// =================================
// We rewrite this:
// =================================
// coroutine fn async_main() -> impl Future<Output = String> {
// println!("Program starting");
// let txt = Http::get("/600/HelloAsyncAwait").wait;
// println!("{txt}");
// let txt = Http::get("/400/HelloAsyncAwait").wait;
// println!("{txt}");
// }
// =================================
// Into this:
// =================================
fn async_main() -> impl Future<Output = String> {
Coroutine::new()
}
enum State {
Start,
Wait1(Box<dyn Future<Output = String>>),
Wait2(Box<dyn Future<Output = String>>),
Resolved,
}
struct Coroutine {
state: State,
}
impl Coroutine {
fn new() -> Self {
Self {
state: State::Start,
}
}
}
impl Future for Coroutine {
type Output = String;
fn poll(&mut self) -> PollState<Self::Output> {
loop {
match self.state {
State::Start => {
println!("Program starting");
let fut1 = Box::new(Http::get("/600/HelloAsyncAwait"));
self.state = State::Wait1(fut1);
}
State::Wait1(ref mut future) => match future.poll() {
PollState::Ready(txt) => {
println!("{txt}");
let fut2 = Box::new(Http::get("/400/HelloAsyncAwait"));
self.state = State::Wait2(fut2);
}
PollState::NotReady => break PollState::NotReady,
},
State::Wait2(ref mut future) => match future.poll() {
PollState::Ready(txt) => {
println!("{txt}");
self.state = State::Resolved;
break PollState::Ready(String::new());
}
PollState::NotReady => break PollState::NotReady,
},
State::Resolved => panic!("Polled a resolved future"),
}
}
}
}
use std::future;
use std::sync::OnceLock;
use mio::{Events, Poll, Registry};
use crate::future::{Future, PollState};
// Le Registry est la façon d'enregistrer notre intérêt pour une instance de Poll. En l'ocurrence,
// cea sera d'enregistrer les évènements sur notre `TcpStram` quand nous ferons une requête GET.
// Ainsi, nous voudrons accéder au Registry depuis l'intérieur de HttpGetFuture sans avoir besoin
// de passer des références de partout. C'est un peu moins élégant, mais cela est plus simple pour
// notre API.
//
// Ici nous utilisons un OnceLock<_> qui est une valeur dans laquelle on peut écrire qu'une seule fois
// et qui est non-initialisée quand elle est crée. Cette version est thread safe contrairement à sa
// soeur `OnceCell`. On veut intialiser le Registry que quand on démarre notre runtime et pas au
// démarrage du programme. Si le Runtime n'as pas été initialisé, toute tentative d'accès dans
// http::get(), on aura donc un message d'erreur. Le registry pourra être accédé que par la
// fonction `registry()`
static REGISTRY: OnceLock<Registry> = OnceLock::new();
pub fn registry() -> &'static Registry {
REGISTRY.get().expect("Called outside a runtime context")
}
// Rien d'autre que notre instance de Poll (la file d'évènements)
pub struct Runtime {
poll: Poll,
}
impl Runtime {
// On crée une nouvelle instance de notre file d'évènements et on initialise notre registe et
// notre Runtime avec. Ici on a une instance de `poll` qui est clonable (on pourrait l'accéder
// depuis plusieurs endroit), On stocke le registry comme une variable globale: c'est pas joli
// mais efficace
pub fn new() -> Self {
let poll = Poll::new().unwrap();
let registry = poll.registry().try_clone().unwrap();
REGISTRY.set(registry).unwrap();
Self { poll }
}
// 1. On peut bloquer sur n'imorte quelle fonction qui implémente le trait `Future`
// 2. On prnd l'ownership de future et on la rend mutable
// 3. On loop jusqu'à ce que la `Future` soit ready et on `poll()` pour un évènement quelconque
// qui pourrait ne rien avoir à faire avec notre future actuelle
// poll() ici attend qu'on ait un évènement READABLE dans le stream
pub fn block_on<F>(&mut self, future: F)
where
F: Future<Output = String>,
{
let mut future = future;
while let PollState::NotReady = future.poll() {
println!("Schedule other tasks");
let mut events = Events::with_capacity(100);
self.poll.poll(&mut events, None).unwrap();
}
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment