diff --git a/08_runtimes_wakers_reactor_executor.md b/08_runtimes_wakers_reactor_executor.md index da41d9931ae6a12bb33fe42ca08883d9876535ef..e6078d08f54a71dd6cba20391d66c9f6aa4bc735 100644 --- a/08_runtimes_wakers_reactor_executor.md +++ b/08_runtimes_wakers_reactor_executor.md @@ -76,7 +76,7 @@ patat: * Décide qui a droit à du temps CPU pour fire progresser une tâche * Responsable d'appeler `Future::poll()` -## Amélioration 1: éviter le sondage actif (1/) +## Amélioration 1: éviter le sondage actif 1/ ```rust coroutine fn async_main() -> impl Future<Output = String> { @@ -98,7 +98,7 @@ fn main() { * Cette `Future` doit `poll()` ses `Future` enfants et retourne que lorsque les deux sont `Ready()` * On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`) -## Amélioration 1: éviter le sondage actif (2/) +## Amélioration 1: éviter le sondage actif 2/ ```rust fn main() { @@ -113,7 +113,7 @@ fn main() { * On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`) * On veut plus de la boucle inifinie: mais utiliser `epoll()` -## Amélioration 1: éviter le sondage actif (3/) +## Amélioration 1: éviter le sondage actif 3/ ```rust static REGISTRY: OnceLock<Registry> = OnceLock::new(); @@ -129,7 +129,7 @@ pub struct Runtime { * `Register` sert à enregistrer l'intérêt à une **source** d'évènements (ici `TcpStream`) * Interaction avec le `Registry` que via la fonction `registry()` qui s'assure -## Amélioration 1: éviter le sondage actif (4/) +## Amélioration 1: éviter le sondage actif 4/ ```rust impl Runtime { @@ -157,7 +157,7 @@ impl Runtime { * `poll.poll()` est réveillé lorsque les évènements liés à la file surviennent * rien à faire avec `events`, mais c'est ici qu'on va "bloquer" -## Amélioration 1: éviter le sondage actif (5/) +## Amélioration 1: éviter le sondage actif 5/ ```rust fn poll(&mut self) -> PollState<Self::Output> { @@ -179,7 +179,7 @@ fn poll(&mut self) -> PollState<Self::Output> { * on retourne `NotReady` * On retourne à `block_on()` dans le `main()` et on attend le prochain réveil pour rappeler `poll()` -## Amélioration 1: éviter le sondage actif (6/) +## Amélioration 1: éviter le sondage actif 6/ ```console Program starting @@ -219,4 +219,343 @@ HelloAsyncAwait * Ajouter un Réacteur qui va gérer les `Waker` qui va être responsable de la communication avec l'exécuteur -## Amélioration 2: ajouter un réacteur et un waker (1/) +## Amélioration 2: ajouter un exécuteur, un réacteur et un waker + +* On sépare le `Runtime` en `Reactor` et `Executor` +* On ajoute un `Waker` pour faire le lien entre les deux +* On doit modifier `Future` pour prendre en compte le `Waker` + +### Le nouveau `main()` + +```rust +fn main() { + let mut executor = runtime::init(); + executor.block_on(async_main()); +} +``` + +* Très similaire à la version précédente + +## Amélioration 2: ajouter un `Executor` + +### L'exécuteur aura comme capacités + +* Stocke les `Future` du plus haut niveau +* Permet de créer des `Future` de haut niveau dans notre programme asynchrone +* Permet d'avoir plusieurs exécuteurs chacun sur son propre thread système +* Distribuer les `Waker` pour que les tâches puissent dormir quand il n'y a rien à faire et se réveiller quand un des futurs de haut niveau peut progresser + +### Ce qu'il ne fera pas + +* Ne sera pas vraiment multi-threadé: pas de communication entre les threads possible (pas moyen de "voler" des tâches) +* Chaque exécuteur est agnostique des autres (pas besoin d'avoir `Sync` et `Send` qui est satisfait) + +## Amélioration 2: l'exécuteur 1/ + +```rust +type Task = Box<dyn Future<Output = String>>; +thread_local! { + static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default(); +} +struct ExecutorCore { + tasks: RefCell<HashMap<usize, Task>>, + ready_queue: Arc<Mutex<Vec<usize>>>, + next_id: Cell<usize>, +} +``` + +* Une tâche est juste un type implémentant `Future` +* L'état de l'exécuteur est: + * une liste de tâche et d'identifiants qui doivent être exécutés + * une liste d'identifiants pour les tâches qui doivent être réveillées + * un identifiant pour la prochaîne tâche +* L'exécuteur pour **chaque thread** est stocké dans une variable statique `CURRENT_EXECUTOR` +* `RefCell<_>` gère la mutabilité intlérieure `&mut T` à l'exécution +* `Cell<_>` gère la mutabilité intérieur en sortant les valeurs et en les insérant à nouveau + +## Amélioration 2: l'exécuteur 2/ + +```rust +pub struct Executor; +impl Executor { + fn pop_ready(&self) -> Option<usize> { + CURRENT_EXECUTOR.with(|e| e.ready_queue.lock().map(|mut queue| queue.pop()).unwrap()) + } + fn get_future(&self, id: usize) -> Option<Task> { + CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().remove(&id)) + } + fn get_waker(&self, id: usize) -> Waker { + Waker { + id, + thread: thread::current(), + ready_queue: CURRENT_EXECUTOR.with(|e| e.ready_queue.clone()), + } + } + fn insert_task(&self, id: usize, task: Task) { + CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().insert(id, task)); + } + fn task_count(&self) -> usize { + CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len()) + } +} +``` + +* L'interface est composée de: + * `pop_ready()` qui reoutrne l'id de la prochaîne tache à exécuter + * `get_future()` qui retourne la prochaine tâche à `poll()` par `id` + * `get_waker()` qui crée un nouveau `Waker` correspondant à l'id de la tâche et au thread actuel + * `insert_task()` qui ajoute une une paire `id`, `task` à la liste de tâche à exécuter + * `task_count()` fonction utilitaire qui nous dit combien de tâche il reste à exécuter + +## Amélioration 2: l'exécuteur 3/ + +```rust +pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String> + 'static { + spawn(future); + loop { + while let Some(id) = self.pop_ready() { + let mut future = match self.get_future(id) { + Some(f) => f, + None => continue, + }; + let waker = self.get_waker(id); + match future.poll(&waker) { + PollState::NotReady => self.insert_task(id, future), + PollState::Ready(_) => continue, + } + } + let task_count = self.task_count(); + let name = String::from(thread::current().name().unwrap_or_default()); + if task_count > 0 { + println!("{name}: {task_count}, pending tasks. Sleep until notified."); + thread::park(); + } else { + println!("{name}: All tasks finished"); + break; + } + } +} +``` + +* On `spawn()` un nouveau `Future` puis on boucle: +* Tant qu'il reste des tâches à exécuter: + * on récupère l'id de la tâche + * on crée un `Waker` avec l'id de la tâche + * on `poll()` la tâche: + * si la tâche est `Ready(_)` on fait rien + * si la tâche est `NotReady` on la rajoute avec son id dans la liste des tâches + * on compte combien il reste de tâche pour décider si on a terminé ou pas: + * si on a pas terminé on `park()` le thread (on le met en attente) + +## Amélioration 2: l'exécuteur 4/ + +```rust +pub fn spawn<F>(future: F) where F: Future<Output = String> + 'static { + CURRENT_EXECUTOR.with(|e| { + let id = e.next_id.get(); + e.tasks.borrow_mut().insert(id, Box::new(future)); + e.ready_queue + .lock() + .map(|mut queue| queue.push(id)) + .unwrap(); + e.next_id.set(id + 1); + }); +} +``` + +* `spawn()` un `Future`: + 1. récupére l'identifiant actuel + 2. ajoute le `Future` et son identifiant dans la liste des tâche + 3. ajoute l'id dans la liste des tâches prêtes à être `poll()` + 4. incrémente l'identifiant + +## Amélioration 2: le `Waker` 1/ + +* Utilisé pour réveiller une tâche sans avoir besoin de `Poll` directement +* Fera partie de l'`Executor` et du trait `Future` +* Utilisation de `thread::park()/thread::unpark()` pour arrêter / redémarrer un thread. + +### Un `Waker` + +```rust +pub struct Waker { + thread: Thread, + id: usize, + ready_queue: Arc<Mutex<Vec<usize>>>, +} +``` + +* Un `Waker` est appelé depuis un `Thread` système +* il a un identifiant +* Et gère les tâches à réveiller: celles qui sont prêtes à être exécutées + +## Amélioration 2: le `Waker` 2/ + +```rust +impl Waker { + pub fn wake(&self) { + self.ready_queue + .lock() + .map(|mut queue| queue.push(self.id)) + .unwrap(); + self.thread.unpark(); + } +} +``` + +* `wake()` réveille un thread: + * ajoute le la tâche dans la liste des tâches à redémarrer + * réveille le thread appeland: `unpark()` + * `wake()` peut être appelé de différents endroits => `ready_queue: Arc<Mutex<_>>` + +## Amélioration 2: le `Waker` 3/ + +```rust +pub trait Future { + type Output; + fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>; +} +``` + +* `Future` prend `Waker` en argument (cf. la lib standard) +* On doit adapter `HttpGetFuture` (argument de `poll()` et les `poll()` subséquents) + +## Amélioration 2: ajouter un `Reactor` + +### Ce qu'il fera + +* Gérer de façon efficace les attentes et les évènements auxquels notre `Ruintime` sera intéressé +* Stocker une collection de `Waker` et s'assurer que le `Waker` correct est réveillé en fonction de la notification qu'il reçoit +* Permettre aux `Future` de s'abonner / désabonner aux évènements + +## Amélioration 2: le `Reactor` 1/ + +```rust +type Wakers = Arc<Mutex<HashMap<usize, Waker>>>; +static REACTOR: OnceLock<Reactor> = OnceLock::new(); +pub fn reactor() -> &'static Reactor { + REACTOR.get().expect("Called outside a runtime context") +} +pub struct Reactor { + wakers: Wakers, + registry: Registry, + next_id: AtomicUsize, +} +``` + +* une `HashMap` de `Waker` et leur `id` +* un `Registry` qui permet de spécifier une source d'évènements +* un identifiant pour avoir spécifier un `Token()` pour nos abonnements + +## Amélioration 2: le `Reactor` 2/ + +```rust +impl Future for HttpGetFuture { + fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> { + if self.stream.is_none() { + self.write_request(); + runtime::reactor().register(self.stream.as_mut().unwrap(), Interest::READABLE, self.id); + runtime::reactor().set_waker(waker, self.id); + } + let mut buf = vec![0u8; 4096]; + loop { + match self.stream.as_mut().unwrap().read(&mut buf) { + // 0 bytes read so we are finished we got the entire GET response + Ok(0) => { + let s = String::from_utf8_lossy(&self.buffer); + runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id); + break PollState::Ready(String::from(s)); + } + ... + Err(e) if e.kind() == ErrorKind::WouldBlock => { + runtime::reactor().set_waker(waker, self.id); + break PollState::NotReady; + } + ... + } + } + } +} +``` + +* Au premier `poll()`: + * on enregistre l'intérêt pour le `TcpStream` + * on ajoute le `Waker` +* Quand on a fini on retire le `Waker` et son identifiant +* Quand on est pas prêt on met à jour le `Waker` (pour faire comme l'API standard) + + +## Amélioration 2: le `Reactor` 3/ + +```rust +impl Reactor { + pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) { + self.registry.register(stream, Token(id), interest).unwrap() + } + pub fn set_waker(&self, waker: &Waker, id: usize) { + self.wakers + .lock() + .map(|mut w| w.insert(id, waker.clone())) + .unwrap(); + } + pub fn deregister(&self, stream: &mut TcpStream, id: usize) { + self.wakers.lock().map(|mut w| w.remove(&id)).unwrap(); + self.registry.deregister(stream).unwrap(); + } + pub fn next_id(&self) -> usize { + self.next_id + .fetch_add(1, std::sync::atomic::Ordering::Relaxed) + } +} +``` + +* `register()`: S'abonne une source d'évènements `TcpStream` avec le type d'évènements (`READABLE` ici) et l'identifiant +* `deregister()`: Se désabonne du `TcpStream` (quand on a fini avec notre `Future`), et retire le `Waker` de la `HashMap` +* `set_waker()`: Insère le `Waker` et son `id` dans la `HashMap` des `Waker` +* `next_id()`: passe au prochain `id` de façon atomique + +## Amélioration 2: le `Reactor` 4/ + +```rust +pub fn start() { + let wakers = Arc::new(Mutex::new(HashMap::<usize, Waker>::new())); + let poll = Poll::new().unwrap(); + let registry = poll.registry().try_clone().unwrap(); + let next_id = AtomicUsize::new(1); + let reactor = Reactor { + wakers: wakers.clone(), + registry, + next_id, + }; + REACTOR.set(reactor).ok().expect("Rector already running"); + std::thread::spawn(move || event_loop(poll, wakers)); +} +``` + +* Au démarrage du réacteur: + * On crée les structures de données: `Poll`, `Registry`, `Reactor` + * On crée le `REACTOR` qui permet de partager les `Waker` + * On déplace la boucle d'évènements dans un thread séparé + +## Amélioration 2: le `Reactor` 5/ + +```rust +fn event_loop(mut poll: Poll, wakers: Wakers) { + let mut events = Events::with_capacity(100); + loop { + poll.poll(&mut events, None).unwrap(); + events.iter().for_each(|e| { + let Token(id) = e.token(); + let wakers = wakers.lock().unwrap(); + + if let Some(waker) = wakers.get(&id) { + waker.wake(); + } + }); + } +} +``` + +* On a une boucle infinie qui monitore le système +* Chaque nouvel évènement, on récupère l'identifiant et on `wake()` le `Waker` correspondant +