Select Git revision
08_runtimes_wakers_reactor_executor.md
orestis.malaspin authored
08_runtimes_wakers_reactor_executor.md 19.02 KiB
- Introduction
- Programme
- Runtimes existants
- Schéma
- Pourquoi avons nous besoin d'un runtime?
- Architecture d'un Runtime
- Amélioration 1: éviter le sondage actif 1/
- Amélioration 1: éviter le sondage actif 2/
- Amélioration 1: éviter le sondage actif 3/
- Amélioration 1: éviter le sondage actif 4/
- Amélioration 1: éviter le sondage actif 5/
- Amélioration 1: éviter le sondage actif 6/
- Amélioration 1: quelles améliorations encore
- Amélioration 2: ajouter un exécuteur, un réacteur et un waker
- Le nouveau main()
- Amélioration 2: ajouter un Executor
- L'exécuteur aura comme capacités
- Ce qu'il ne fera pas
- Amélioration 2: l'exécuteur 1/
- Amélioration 2: l'exécuteur 2/
- Amélioration 2: l'exécuteur 3/
- Amélioration 2: l'exécuteur 4/
- Amélioration 2: le Waker 1/
- Un Waker
- Amélioration 2: le Waker 2/
- Amélioration 2: le Waker 3/
- Amélioration 2: ajouter un Reactor
- Ce qu'il fera
- Amélioration 2: le Reactor 1/
- Amélioration 2: le Reactor 2/
- Amélioration 2: le Reactor 3/
- Amélioration 2: le Reactor 4/
- Amélioration 2: le Reactor 5/
-- title: "Runtimes, wakers et réacteurs-exécuteurs" author: "Orestis Malaspinas" date: "2025-03-24" patat: slideNumber: true wrap: true margins: left: 10 right: 10 top: auto slideLevel: 2 images: backend: kitty ...
Introduction
Programme
- Pourquoi avons-nous besoin d'un runtime
- Amélioration de notre exemple de base
- Partie 1: Ajout d'un réacteur et d'un waker
- Partie 2: Implémentation d'un exécuteur
- Partie 3: Implémentation d'un réacteur
- Enjoy: quelques expériences
Runtimes existants
- Embassy (systèmes embarqués)
- Tokio (très complet et populaire)
- Smol (plus simple et limité)
- Async-std (tentative de tout "async"-iser)
Schéma
ORDONNANCEMENT D'UNE TACHE
┌────────────────────┐ difficile
│Ordonnanceur de l'OS│◄───────────────┐
└────────────────────┘ │
│ │
▼ │
┌──────────────┐ │
│Tache (Thread)│ │
└───────┬──────┘ │
▼ │
plus simple ┌────────────────────┐ │
┌─────────────►│Ordonnanceur dans │ │
│ │l'espace utilisateur│ │
│ ┌─────────┴─────────┬──────────┴────────┐ │
│ │ │ │ │
│ ▼ ▼ ▼ │
┌┴───────┐ ┌────────┐ ┌────────┐ │
│ Tache │ │ Tache │ │ Tache ├──┘
│(Future)│ │(Future)│ │(Future)│
└────────┘ └────────┘ └────────┘
Pourquoi avons nous besoin d'un runtime?
- Runtime: ordonnancer les tâches et faire progresser les
Future - Ordonnancement fait par l'OS pour les thread système: aucun cnotrôle (et très complexe)
- Contrôle plus fin et gérable à l'aide d'un orodonnanceur dans l'espace utilisateur (userspace)
- Plus léger que les threads système (on peut en avoir 100k)
Architecture d'un Runtime
- Architecture: réacteur - exécuteur
- Réacteur:
- Réagis à un évènement et les dispatche à un gestionnaire (boucle d'évènements)
- P. ex. évènement I/O (Stream TCP qui est prêt à être lu), timer qui expire, etc.
- Nécessaire d'avoir un "système d'abonnment" (intégration forte avec l'I/O)
- Exécuteur:
- Décide qui a droit à du temps CPU pour fire progresser une tâche
- Responsable d'appeler
Future::poll()
Amélioration 1: éviter le sondage actif 1/
coroutine fn async_main() -> impl Future<Output = String> {
println!("Program starting");
let txt = Http::get("/600/HelloAsyncAwait").wait;
println!("{txt}");
let txt = Http::get("/400/HelloAsyncAwait").wait;
println!("{txt}");
}
fn main() {
let mut future = async_main();
while let PollState::NotReady = future.poll() {
println!("Schedule other tasks");
}
}
- On
poll()laFutureprincipale (dans lemain()) - Cette
Futuredoitpoll()sesFutureenfants et retourne que lorsque les deux sontReady() - On veut remplacer ces
poll()continus, par une file d'évènements (cf.04_syscall_event_queue.md)
Amélioration 1: éviter le sondage actif 2/
fn main() {
let future = async_main();
let mut runtime = Runtime::new();
runtime.block_on(future);
}
- On
poll()laFutureprincipale (dans lemain()) - Cette
Futuredoitpoll()sesFutureenfants et retourne que lorsque les deux sontReady() - On veut remplacer ces
poll()continus, par une file d'évènements (cf.04_syscall_event_queue.md) - On veut plus de la boucle inifinie: mais utiliser
epoll()
Amélioration 1: éviter le sondage actif 3/
static REGISTRY: OnceLock<Registry> = OnceLock::new();
pub fn registry() -> &'static Registry {
REGISTRY.get().expect("Called outside a runtime context")
}
pub struct Runtime {
poll: Poll,
}
-
Pollest la file d'évènements demioet constitue notreRuntime -
Registersert à enregistrer l'intérêt à une source d'évènements (iciTcpStream) - Interaction avec le
Registryque via la fonctionregistry()qui s'assure
Amélioration 1: éviter le sondage actif 4/
impl Runtime {
pub fn new() -> Self {
let poll = Poll::new().unwrap();
let registry = poll.registry().try_clone().unwrap();
REGISTRY.set(registry).unwrap();
Self { poll }
}
pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String> {
let mut future = future;
while let PollState::NotReady = future.poll() {
println!("Schedule other tasks");
let mut events = Events::with_capacity(100);
self.poll.poll(&mut events, None).unwrap();
}
}
}
-
new(): nouvelle instance dePollet duRegistryassocié- Intialisation du
REGISTRYglobal lors de la création duRuntime
- Intialisation du
-
block_on(): argument qui implémenteFuture-
future.poll()desFuturejusqu'à ce qu'ils soientReady(_) -
poll.poll()est réveillé lorsque les évènements liés à la file surviennent - rien à faire avec
events, mais c'est ici qu'on va "bloquer"
-
Amélioration 1: éviter le sondage actif 5/
fn poll(&mut self) -> PollState<Self::Output> {
if self.stream.is_none() {
println!("First poll - start operation");
self.write_request();
// Registering the Tcp stream as a source of events in the poll instance
runtime::registry()
.register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE)
.unwrap();
}
...
}
- Lors du premier appel à
poll():- on enregistre l'intérêt au
TcpStream - on essaie de lire
TcpStreamet on arrive dansWouldBlock(les données sont pas prêtes) - on retourne
NotReady
- on enregistre l'intérêt au
- On retourne à
block_on()dans lemain()et on attend le prochain réveil pour rappelerpoll()
Amélioration 1: éviter le sondage actif 6/
Program starting
First poll - start operation
Data not ready
Schedule other tasks
HTTP/1.1 200 OK
content-type: text/plain; charset=utf-8
content-length: 15
connection: close
date: Mon, 24 Mar 2025 07:56:51 GMT
HelloAsyncAwait
First poll - start operation
Data not ready
Schedule other tasks
HTTP/1.1 200 OK
content-type: text/plain; charset=utf-8
content-length: 15
connection: close
date: Mon, 24 Mar 2025 07:56:52 GMT
HelloAsyncAwait
- Un seul appel à "Schedule other tasks" et "Data not ready"
- On a bien le réveil qui se fait quand les tâches sont prêtes à continuer (pas d'appel continu à "Schedule other tasks")
- Pas de travail "inutile"
Amélioration 1: quelles améliorations encore
- On rend le contrôle à l'OS quand on appelle
Poll::poll() - On a parlé de réacteur - exécuteur, mais deux sont très fortement couplés via
Poll::poll() - On aimerait séparer le couplage de la file d'évènement
- On va ajouer un
Wakercomme dans le design desFuturede Rust- Faire un lien entre réacteur et exécuteur
- Ajouter un Réacteur qui va gérer les
Wakerqui va être responsable de la communication avec l'exécuteur
Amélioration 2: ajouter un exécuteur, un réacteur et un waker
- On sépare le
RuntimeenReactoretExecutor - On ajoute un
Wakerpour faire le lien entre les deux - On doit modifier
Futurepour prendre en compte leWaker
Le nouveau main()
fn main() {
let mut executor = runtime::init();
executor.block_on(async_main());
}
- Très similaire à la version précédente
Amélioration 2: ajouter un Executor
L'exécuteur aura comme capacités
- Stocke les
Futuredu plus haut niveau - Permet de créer des
Futurede haut niveau dans notre programme asynchrone - Permet d'avoir plusieurs exécuteurs chacun sur son propre thread système
- Distribuer les
Wakerpour que les tâches puissent dormir quand il n'y a rien à faire et se réveiller quand un des futurs de haut niveau peut progresser
Ce qu'il ne fera pas
- Ne sera pas vraiment multi-threadé: pas de communication entre les threads possible (pas moyen de "voler" des tâches)
- Chaque exécuteur est agnostique des autres (pas besoin d'avoir
SyncetSendqui est satisfait)
Amélioration 2: l'exécuteur 1/
type Task = Box<dyn Future<Output = String>>;
thread_local! {
static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default();
}
struct ExecutorCore {
tasks: RefCell<HashMap<usize, Task>>,
ready_queue: Arc<Mutex<Vec<usize>>>,
next_id: Cell<usize>,
}
- Une tâche est juste un type implémentant
Future - L'état de l'exécuteur est:
- une liste de tâche et d'identifiants qui doivent être exécutés
- une liste d'identifiants pour les tâches qui doivent être réveillées
- un identifiant pour la prochaîne tâche
- L'exécuteur pour chaque thread est stocké dans une variable statique
CURRENT_EXECUTOR -
RefCell<_>gère la mutabilité intlérieure&mut Tà l'exécution -
Cell<_>gère la mutabilité intérieur en sortant les valeurs et en les insérant à nouveau
Amélioration 2: l'exécuteur 2/
pub struct Executor;
impl Executor {
fn pop_ready(&self) -> Option<usize> {
CURRENT_EXECUTOR.with(|e| e.ready_queue.lock().map(|mut queue| queue.pop()).unwrap())
}
fn get_future(&self, id: usize) -> Option<Task> {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().remove(&id))
}
fn get_waker(&self, id: usize) -> Waker {
Waker {
id,
thread: thread::current(),
ready_queue: CURRENT_EXECUTOR.with(|e| e.ready_queue.clone()),
}
}
fn insert_task(&self, id: usize, task: Task) {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().insert(id, task));
}
fn task_count(&self) -> usize {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len())
}
}
- L'interface est composée de:
-
pop_ready()qui reoutrne l'id de la prochaîne tache à exécuter -
get_future()qui retourne la prochaine tâche àpoll()parid -
get_waker()qui crée un nouveauWakercorrespondant à l'id de la tâche et au thread actuel -
insert_task()qui ajoute une une paireid,taskà la liste de tâche à exécuter -
task_count()fonction utilitaire qui nous dit combien de tâche il reste à exécuter
-
Amélioration 2: l'exécuteur 3/
pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String> + 'static {
spawn(future);
loop {
while let Some(id) = self.pop_ready() {
let mut future = match self.get_future(id) {
Some(f) => f,
None => continue,
};
let waker = self.get_waker(id);
match future.poll(&waker) {
PollState::NotReady => self.insert_task(id, future),
PollState::Ready(_) => continue,
}
}
let task_count = self.task_count();
let name = String::from(thread::current().name().unwrap_or_default());
if task_count > 0 {
println!("{name}: {task_count}, pending tasks. Sleep until notified.");
thread::park();
} else {
println!("{name}: All tasks finished");
break;
}
}
}
- On
spawn()un nouveauFuturepuis on boucle: - Tant qu'il reste des tâches à exécuter:
- on récupère l'id de la tâche
- on crée un
Wakeravec l'id de la tâche - on
poll()la tâche:- si la tâche est
Ready(_)on fait rien - si la tâche est
NotReadyon la rajoute avec son id dans la liste des tâches
- si la tâche est
- on compte combien il reste de tâche pour décider si on a terminé ou pas:
- si on a pas terminé on
park()le thread (on le met en attente)
- si on a pas terminé on
Amélioration 2: l'exécuteur 4/
pub fn spawn<F>(future: F) where F: Future<Output = String> + 'static {
CURRENT_EXECUTOR.with(|e| {
let id = e.next_id.get();
e.tasks.borrow_mut().insert(id, Box::new(future));
e.ready_queue
.lock()
.map(|mut queue| queue.push(id))
.unwrap();
e.next_id.set(id + 1);
});
}
-
spawn()unFuture:- récupére l'identifiant actuel
- ajoute le
Futureet son identifiant dans la liste des tâche - ajoute l'id dans la liste des tâches prêtes à être
poll() - incrémente l'identifiant
Amélioration 2: le Waker 1/
- Utilisé pour réveiller une tâche sans avoir besoin de
Polldirectement - Fera partie de l'
Executoret du traitFuture - Utilisation de
thread::park()/thread::unpark()pour arrêter / redémarrer un thread.
Un Waker
pub struct Waker {
thread: Thread,
id: usize,
ready_queue: Arc<Mutex<Vec<usize>>>,
}
- Un
Wakerest appelé depuis unThreadsystème - il a un identifiant
- Et gère les tâches à réveiller: celles qui sont prêtes à être exécutées
Amélioration 2: le Waker 2/
impl Waker {
pub fn wake(&self) {
self.ready_queue
.lock()
.map(|mut queue| queue.push(self.id))
.unwrap();
self.thread.unpark();
}
}
-
wake()réveille un thread:- ajoute le la tâche dans la liste des tâches à redémarrer
- réveille le thread appeland:
unpark() -
wake()peut être appelé de différents endroits =>ready_queue: Arc<Mutex<_>>
Amélioration 2: le Waker 3/
pub trait Future {
type Output;
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>;
}
-
FutureprendWakeren argument (cf. la lib standard) - On doit adapter
HttpGetFuture(argument depoll()et lespoll()subséquents)
Amélioration 2: ajouter un Reactor
Ce qu'il fera
- Gérer de façon efficace les attentes et les évènements auxquels notre
Ruintimesera intéressé - Stocker une collection de
Wakeret s'assurer que leWakercorrect est réveillé en fonction de la notification qu'il reçoit - Permettre aux
Futurede s'abonner / désabonner aux évènements
Amélioration 2: le Reactor 1/
type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
static REACTOR: OnceLock<Reactor> = OnceLock::new();
pub fn reactor() -> &'static Reactor {
REACTOR.get().expect("Called outside a runtime context")
}
pub struct Reactor {
wakers: Wakers,
registry: Registry,
next_id: AtomicUsize,
}
- une
HashMapdeWakeret leurid - un
Registryqui permet de spécifier une source d'évènements - un identifiant pour avoir spécifier un
Token()pour nos abonnements
Amélioration 2: le Reactor 2/
impl Future for HttpGetFuture {
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
if self.stream.is_none() {
self.write_request();
runtime::reactor().register(self.stream.as_mut().unwrap(), Interest::READABLE, self.id);
runtime::reactor().set_waker(waker, self.id);
}
let mut buf = vec![0u8; 4096];
loop {
match self.stream.as_mut().unwrap().read(&mut buf) {
// 0 bytes read so we are finished we got the entire GET response
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer);
runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id);
break PollState::Ready(String::from(s));
}
...
Err(e) if e.kind() == ErrorKind::WouldBlock => {
runtime::reactor().set_waker(waker, self.id);
break PollState::NotReady;
}
...
}
}
}
}
- Au premier
poll():- on enregistre l'intérêt pour le
TcpStream - on ajoute le
Waker
- on enregistre l'intérêt pour le
- Quand on a fini on retire le
Wakeret son identifiant - Quand on est pas prêt on met à jour le
Waker(pour faire comme l'API standard)
Amélioration 2: le Reactor 3/
impl Reactor {
pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
self.registry.register(stream, Token(id), interest).unwrap()
}
pub fn set_waker(&self, waker: &Waker, id: usize) {
self.wakers
.lock()
.map(|mut w| w.insert(id, waker.clone()))
.unwrap();
}
pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
self.registry.deregister(stream).unwrap();
}
pub fn next_id(&self) -> usize {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
}
-
register(): S'abonne une source d'évènementsTcpStreamavec le type d'évènements (READABLEici) et l'identifiant -
deregister(): Se désabonne duTcpStream(quand on a fini avec notreFuture), et retire leWakerde laHashMap -
set_waker(): Insère leWakeret soniddans laHashMapdesWaker -
next_id(): passe au prochainidde façon atomique
Amélioration 2: le Reactor 4/
pub fn start() {
let wakers = Arc::new(Mutex::new(HashMap::<usize, Waker>::new()));
let poll = Poll::new().unwrap();
let registry = poll.registry().try_clone().unwrap();
let next_id = AtomicUsize::new(1);
let reactor = Reactor {
wakers: wakers.clone(),
registry,
next_id,
};
REACTOR.set(reactor).ok().expect("Rector already running");
std::thread::spawn(move || event_loop(poll, wakers));
}
- Au démarrage du réacteur:
- On crée les structures de données:
Poll,Registry,Reactor - On crée le
REACTORqui permet de partager lesWaker - On déplace la boucle d'évènements dans un thread séparé
- On crée les structures de données:
Amélioration 2: le Reactor 5/
fn event_loop(mut poll: Poll, wakers: Wakers) {
let mut events = Events::with_capacity(100);
loop {
poll.poll(&mut events, None).unwrap();
events.iter().for_each(|e| {
let Token(id) = e.token();
let wakers = wakers.lock().unwrap();
if let Some(waker) = wakers.get(&id) {
waker.wake();
}
});
}
}
- On a une boucle infinie qui monitore le système
- Chaque nouvel évènement, on récupère l'identifiant et on
wake()leWakercorrespondant