Skip to content
Snippets Groups Projects
Select Git revision
  • 4d824ef7600011eaa02f6742c710cf89e9eeb22d
  • main default protected
2 results

08_runtimes_wakers_reactor_executor.md

Blame
  • Orestis's avatar
    orestis.malaspin authored
    4d824ef7
    History

    -- title: "Runtimes, wakers et réacteurs-exécuteurs" author: "Orestis Malaspinas" date: "2025-03-24" patat: slideNumber: true wrap: true margins: left: 10 right: 10 top: auto slideLevel: 2 images: backend: kitty ...

    Introduction

    Programme

    • Pourquoi avons-nous besoin d'un runtime
    • Amélioration de notre exemple de base
    • Partie 1: Ajout d'un réacteur et d'un waker
    • Partie 2: Implémentation d'un exécuteur
    • Partie 3: Implémentation d'un réacteur
    • Enjoy: quelques expériences

    Runtimes existants

    • Embassy (systèmes embarqués)
    • Tokio (très complet et populaire)
    • Smol (plus simple et limité)
    • Async-std (tentative de tout "async"-iser)

    Schéma

                   ORDONNANCEMENT D'UNE TACHE               
                                                            
                     ┌────────────────────┐   difficile     
                     │Ordonnanceur de l'OS│◄───────────────┐
                     └────────────────────┘                │
                               │                           │
                               ▼                           │
                       ┌──────────────┐                    │
                       │Tache (Thread)│                    │
                       └───────┬──────┘                    │
                               ▼                           │
       plus simple   ┌────────────────────┐                │
      ┌─────────────►│Ordonnanceur dans   │                │
      │              │l'espace utilisateur│                │
      │    ┌─────────┴─────────┬──────────┴────────┐       │
      │    │                   │                   │       │
      │    ▼                   ▼                   ▼       │
     ┌┴───────┐            ┌────────┐          ┌────────┐  │
     │ Tache  │            │ Tache  │          │ Tache  ├──┘
     │(Future)│            │(Future)│          │(Future)│   
     └────────┘            └────────┘          └────────┘

    Pourquoi avons nous besoin d'un runtime?

    • Runtime: ordonnancer les tâches et faire progresser les Future
    • Ordonnancement fait par l'OS pour les thread système: aucun cnotrôle (et très complexe)
    • Contrôle plus fin et gérable à l'aide d'un orodonnanceur dans l'espace utilisateur (userspace)
    • Plus léger que les threads système (on peut en avoir 100k)

    Architecture d'un Runtime

    • Architecture: réacteur - exécuteur
    • Réacteur:
      • Réagis à un évènement et les dispatche à un gestionnaire (boucle d'évènements)
      • P. ex. évènement I/O (Stream TCP qui est prêt à être lu), timer qui expire, etc.
      • Nécessaire d'avoir un "système d'abonnment" (intégration forte avec l'I/O)
    • Exécuteur:
      • Décide qui a droit à du temps CPU pour fire progresser une tâche
      • Responsable d'appeler Future::poll()

    Amélioration 1: éviter le sondage actif 1/

    coroutine fn async_main() -> impl Future<Output = String> {
        println!("Program starting");
        let txt = Http::get("/600/HelloAsyncAwait").wait;
        println!("{txt}");
        let txt = Http::get("/400/HelloAsyncAwait").wait;
        println!("{txt}");
    }
    fn main() {
        let mut future = async_main();
        while let PollState::NotReady = future.poll() {
            println!("Schedule other tasks");
        }
    }
    • On poll() la Future principale (dans le main())
    • Cette Future doit poll() ses Future enfants et retourne que lorsque les deux sont Ready()
    • On veut remplacer ces poll() continus, par une file d'évènements (cf. 04_syscall_event_queue.md)

    Amélioration 1: éviter le sondage actif 2/

    fn main() {
        let future = async_main();
        let mut runtime = Runtime::new();
        runtime.block_on(future); 
    }
    • On poll() la Future principale (dans le main())
    • Cette Future doit poll() ses Future enfants et retourne que lorsque les deux sont Ready()
    • On veut remplacer ces poll() continus, par une file d'évènements (cf. 04_syscall_event_queue.md)
    • On veut plus de la boucle inifinie: mais utiliser epoll()

    Amélioration 1: éviter le sondage actif 3/

    static REGISTRY: OnceLock<Registry> = OnceLock::new();
    pub fn registry() -> &'static Registry {
        REGISTRY.get().expect("Called outside a runtime context")
    }
    pub struct Runtime {
        poll: Poll,
    }
    • Poll est la file d'évènements de mio et constitue notre Runtime
    • Register sert à enregistrer l'intérêt à une source d'évènements (ici TcpStream)
    • Interaction avec le Registry que via la fonction registry() qui s'assure

    Amélioration 1: éviter le sondage actif 4/

    impl Runtime {
        pub fn new() -> Self {
            let poll = Poll::new().unwrap();
            let registry = poll.registry().try_clone().unwrap();
            REGISTRY.set(registry).unwrap();
            Self { poll }
        }
        pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String> {
            let mut future = future;
            while let PollState::NotReady = future.poll() {
                println!("Schedule other tasks");
                let mut events = Events::with_capacity(100);
                self.poll.poll(&mut events, None).unwrap();
            }
        }
    }
    • new(): nouvelle instance de Poll et du Registry associé
      • Intialisation du REGISTRY global lors de la création du Runtime
    • block_on(): argument qui implémente Future
      • future.poll() des Future jusqu'à ce qu'ils soient Ready(_)
      • poll.poll() est réveillé lorsque les évènements liés à la file surviennent
      • rien à faire avec events, mais c'est ici qu'on va "bloquer"

    Amélioration 1: éviter le sondage actif 5/

    fn poll(&mut self) -> PollState<Self::Output> {
        if self.stream.is_none() {
            println!("First poll - start operation");
            self.write_request();
            // Registering the Tcp stream as a source of events in the poll instance
            runtime::registry()
                .register(self.stream.as_mut().unwrap(), Token(0), Interest::READABLE)
                .unwrap();
        }
        ...
    }
    • Lors du premier appel à poll():
      • on enregistre l'intérêt au TcpStream
      • on essaie de lire TcpStream et on arrive dans WouldBlock (les données sont pas prêtes)
      • on retourne NotReady
    • On retourne à block_on() dans le main() et on attend le prochain réveil pour rappeler poll()

    Amélioration 1: éviter le sondage actif 6/

    Program starting
    First poll - start operation
    Data not ready
    Schedule other tasks
    HTTP/1.1 200 OK
    content-type: text/plain; charset=utf-8
    content-length: 15
    connection: close
    date: Mon, 24 Mar 2025 07:56:51 GMT
    
    HelloAsyncAwait
    First poll - start operation
    Data not ready
    Schedule other tasks
    HTTP/1.1 200 OK
    content-type: text/plain; charset=utf-8
    content-length: 15
    connection: close
    date: Mon, 24 Mar 2025 07:56:52 GMT
    
    HelloAsyncAwait
    • Un seul appel à "Schedule other tasks" et "Data not ready"
    • On a bien le réveil qui se fait quand les tâches sont prêtes à continuer (pas d'appel continu à "Schedule other tasks")
    • Pas de travail "inutile"

    Amélioration 1: quelles améliorations encore

    • On rend le contrôle à l'OS quand on appelle Poll::poll()
    • On a parlé de réacteur - exécuteur, mais deux sont très fortement couplés via Poll::poll()
    • On aimerait séparer le couplage de la file d'évènement
    • On va ajouer un Waker comme dans le design des Future de Rust
      • Faire un lien entre réacteur et exécuteur
      • Ajouter un Réacteur qui va gérer les Waker qui va être responsable de la communication avec l'exécuteur

    Amélioration 2: ajouter un exécuteur, un réacteur et un waker

    • On sépare le Runtime en Reactor et Executor
    • On ajoute un Waker pour faire le lien entre les deux
    • On doit modifier Future pour prendre en compte le Waker

    Le nouveau main()

    fn main() {
        let mut executor = runtime::init();
        executor.block_on(async_main());
    }
    • Très similaire à la version précédente

    Amélioration 2: ajouter un Executor

    L'exécuteur aura comme capacités

    • Stocke les Future du plus haut niveau
    • Permet de créer des Future de haut niveau dans notre programme asynchrone
    • Permet d'avoir plusieurs exécuteurs chacun sur son propre thread système
    • Distribuer les Waker pour que les tâches puissent dormir quand il n'y a rien à faire et se réveiller quand un des futurs de haut niveau peut progresser

    Ce qu'il ne fera pas

    • Ne sera pas vraiment multi-threadé: pas de communication entre les threads possible (pas moyen de "voler" des tâches)
    • Chaque exécuteur est agnostique des autres (pas besoin d'avoir Sync et Send qui est satisfait)

    Amélioration 2: l'exécuteur 1/

    type Task = Box<dyn Future<Output = String>>;
    thread_local! {
        static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default();
    }
    struct ExecutorCore {
        tasks: RefCell<HashMap<usize, Task>>,
        ready_queue: Arc<Mutex<Vec<usize>>>,
        next_id: Cell<usize>,
    }
    • Une tâche est juste un type implémentant Future
    • L'état de l'exécuteur est:
      • une liste de tâche et d'identifiants qui doivent être exécutés
      • une liste d'identifiants pour les tâches qui doivent être réveillées
      • un identifiant pour la prochaîne tâche
    • L'exécuteur pour chaque thread est stocké dans une variable statique CURRENT_EXECUTOR
    • RefCell<_> gère la mutabilité intlérieure &mut T à l'exécution
    • Cell<_> gère la mutabilité intérieur en sortant les valeurs et en les insérant à nouveau

    Amélioration 2: l'exécuteur 2/

    pub struct Executor;
    impl Executor {
        fn pop_ready(&self) -> Option<usize> {
            CURRENT_EXECUTOR.with(|e| e.ready_queue.lock().map(|mut queue| queue.pop()).unwrap())
        }
        fn get_future(&self, id: usize) -> Option<Task> {
            CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().remove(&id))
        }
        fn get_waker(&self, id: usize) -> Waker {
            Waker {
                id,
                thread: thread::current(),
                ready_queue: CURRENT_EXECUTOR.with(|e| e.ready_queue.clone()),
            }
        }
        fn insert_task(&self, id: usize, task: Task) {
            CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().insert(id, task));
        }
        fn task_count(&self) -> usize {
            CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len())
        }
    }
    • L'interface est composée de:
      • pop_ready() qui reoutrne l'id de la prochaîne tache à exécuter
      • get_future() qui retourne la prochaine tâche à poll() par id
      • get_waker() qui crée un nouveau Waker correspondant à l'id de la tâche et au thread actuel
      • insert_task() qui ajoute une une paire id, task à la liste de tâche à exécuter
      • task_count() fonction utilitaire qui nous dit combien de tâche il reste à exécuter

    Amélioration 2: l'exécuteur 3/

    pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String> + 'static {
        spawn(future);
        loop {
            while let Some(id) = self.pop_ready() {
                let mut future = match self.get_future(id) {
                    Some(f) => f,
                    None => continue,
                };
                let waker = self.get_waker(id);
                match future.poll(&waker) {
                    PollState::NotReady => self.insert_task(id, future),
                    PollState::Ready(_) => continue,
                }
            }
            let task_count = self.task_count();
            let name = String::from(thread::current().name().unwrap_or_default());
            if task_count > 0 {
                println!("{name}:  {task_count}, pending tasks. Sleep until notified.");
                thread::park();
            } else {
                println!("{name}: All tasks finished");
                break;
            }
        }
    }
    • On spawn() un nouveau Future puis on boucle:
    • Tant qu'il reste des tâches à exécuter:
      • on récupère l'id de la tâche
      • on crée un Waker avec l'id de la tâche
      • on poll() la tâche:
        • si la tâche est Ready(_) on fait rien
        • si la tâche est NotReady on la rajoute avec son id dans la liste des tâches
      • on compte combien il reste de tâche pour décider si on a terminé ou pas:
        • si on a pas terminé on park() le thread (on le met en attente)

    Amélioration 2: l'exécuteur 4/

    pub fn spawn<F>(future: F) where F: Future<Output = String> + 'static {
        CURRENT_EXECUTOR.with(|e| {
            let id = e.next_id.get();
            e.tasks.borrow_mut().insert(id, Box::new(future));
            e.ready_queue
                .lock()
                .map(|mut queue| queue.push(id))
                .unwrap();
            e.next_id.set(id + 1);
        });
    }
    • spawn() un Future:
      1. récupére l'identifiant actuel
      2. ajoute le Future et son identifiant dans la liste des tâche
      3. ajoute l'id dans la liste des tâches prêtes à être poll()
      4. incrémente l'identifiant

    Amélioration 2: le Waker 1/

    • Utilisé pour réveiller une tâche sans avoir besoin de Poll directement
    • Fera partie de l'Executor et du trait Future
    • Utilisation de thread::park()/thread::unpark() pour arrêter / redémarrer un thread.

    Un Waker

    pub struct Waker {
        thread: Thread,
        id: usize,
        ready_queue: Arc<Mutex<Vec<usize>>>,
    }
    • Un Waker est appelé depuis un Thread système
    • il a un identifiant
    • Et gère les tâches à réveiller: celles qui sont prêtes à être exécutées

    Amélioration 2: le Waker 2/

    impl Waker {
        pub fn wake(&self) {
            self.ready_queue
                .lock()
                .map(|mut queue| queue.push(self.id))
                .unwrap();
            self.thread.unpark();
        }
    }
    • wake() réveille un thread:
      • ajoute le la tâche dans la liste des tâches à redémarrer
      • réveille le thread appeland: unpark()
      • wake() peut être appelé de différents endroits => ready_queue: Arc<Mutex<_>>

    Amélioration 2: le Waker 3/

    pub trait Future {
        type Output;
        fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>;
    }
    • Future prend Waker en argument (cf. la lib standard)
    • On doit adapter HttpGetFuture (argument de poll() et les poll() subséquents)

    Amélioration 2: ajouter un Reactor

    Ce qu'il fera

    • Gérer de façon efficace les attentes et les évènements auxquels notre Ruintime sera intéressé
    • Stocker une collection de Waker et s'assurer que le Waker correct est réveillé en fonction de la notification qu'il reçoit
    • Permettre aux Future de s'abonner / désabonner aux évènements

    Amélioration 2: le Reactor 1/

    type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
    static REACTOR: OnceLock<Reactor> = OnceLock::new();
    pub fn reactor() -> &'static Reactor {
        REACTOR.get().expect("Called outside a runtime context")
    }
    pub struct Reactor {
        wakers: Wakers,
        registry: Registry,
        next_id: AtomicUsize,
    }
    • une HashMap de Waker et leur id
    • un Registry qui permet de spécifier une source d'évènements
    • un identifiant pour avoir spécifier un Token() pour nos abonnements

    Amélioration 2: le Reactor 2/

    impl Future for HttpGetFuture {
        fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
            if self.stream.is_none() {
                self.write_request();
                runtime::reactor().register(self.stream.as_mut().unwrap(), Interest::READABLE, self.id);
                runtime::reactor().set_waker(waker, self.id);
            }
            let mut buf = vec![0u8; 4096];
            loop {
                match self.stream.as_mut().unwrap().read(&mut buf) {
                    // 0 bytes read so we are finished we got the entire GET response
                    Ok(0) => {
                        let s = String::from_utf8_lossy(&self.buffer);
                        runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id);
                        break PollState::Ready(String::from(s));
                    }
                    ...
                    Err(e) if e.kind() == ErrorKind::WouldBlock => {
                        runtime::reactor().set_waker(waker, self.id);
                        break PollState::NotReady;
                    }
                    ...
                }
            }
        }
    }
    • Au premier poll():
      • on enregistre l'intérêt pour le TcpStream
      • on ajoute le Waker
    • Quand on a fini on retire le Waker et son identifiant
    • Quand on est pas prêt on met à jour le Waker (pour faire comme l'API standard)

    Amélioration 2: le Reactor 3/

    impl Reactor {
        pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
            self.registry.register(stream, Token(id), interest).unwrap()
        }
        pub fn set_waker(&self, waker: &Waker, id: usize) {
            self.wakers
                .lock()
                .map(|mut w| w.insert(id, waker.clone()))
                .unwrap();
        }
        pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
            self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
            self.registry.deregister(stream).unwrap();
        }
        pub fn next_id(&self) -> usize {
            self.next_id
                .fetch_add(1, std::sync::atomic::Ordering::Relaxed)
        }
    }
    • register(): S'abonne une source d'évènements TcpStream avec le type d'évènements (READABLE ici) et l'identifiant
    • deregister(): Se désabonne du TcpStream (quand on a fini avec notre Future), et retire le Waker de la HashMap
    • set_waker(): Insère le Waker et son id dans la HashMap des Waker
    • next_id(): passe au prochain id de façon atomique

    Amélioration 2: le Reactor 4/

    pub fn start() {
        let wakers = Arc::new(Mutex::new(HashMap::<usize, Waker>::new()));
        let poll = Poll::new().unwrap();
        let registry = poll.registry().try_clone().unwrap();
        let next_id = AtomicUsize::new(1);
        let reactor = Reactor {
            wakers: wakers.clone(),
            registry,
            next_id,
        };
        REACTOR.set(reactor).ok().expect("Rector already running");
        std::thread::spawn(move || event_loop(poll, wakers));
    }
    • Au démarrage du réacteur:
      • On crée les structures de données: Poll, Registry, Reactor
      • On crée le REACTOR qui permet de partager les Waker
      • On déplace la boucle d'évènements dans un thread séparé

    Amélioration 2: le Reactor 5/

    fn event_loop(mut poll: Poll, wakers: Wakers) {
        let mut events = Events::with_capacity(100);
        loop {
            poll.poll(&mut events, None).unwrap();
            events.iter().for_each(|e| {
                let Token(id) = e.token();
                let wakers = wakers.lock().unwrap();
    
                if let Some(waker) = wakers.get(&id) {
                    waker.wake();
                }
            });
        }
    }
    • On a une boucle infinie qui monitore le système
    • Chaque nouvel évènement, on récupère l'identifiant et on wake() le Waker correspondant