Skip to content
Snippets Groups Projects
Verified Commit 4d824ef7 authored by orestis.malaspin's avatar orestis.malaspin
Browse files

finished i think

parent 8c9b0391
Branches
No related tags found
No related merge requests found
......@@ -76,7 +76,7 @@ patat:
* Décide qui a droit à du temps CPU pour fire progresser une tâche
* Responsable d'appeler `Future::poll()`
## Amélioration 1: éviter le sondage actif (1/)
## Amélioration 1: éviter le sondage actif 1/
```rust
coroutine fn async_main() -> impl Future<Output = String> {
......@@ -98,7 +98,7 @@ fn main() {
* Cette `Future` doit `poll()` ses `Future` enfants et retourne que lorsque les deux sont `Ready()`
* On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`)
## Amélioration 1: éviter le sondage actif (2/)
## Amélioration 1: éviter le sondage actif 2/
```rust
fn main() {
......@@ -113,7 +113,7 @@ fn main() {
* On veut remplacer ces `poll()` continus, par une file d'évènements (cf. `04_syscall_event_queue.md`)
* On veut plus de la boucle inifinie: mais utiliser `epoll()`
## Amélioration 1: éviter le sondage actif (3/)
## Amélioration 1: éviter le sondage actif 3/
```rust
static REGISTRY: OnceLock<Registry> = OnceLock::new();
......@@ -129,7 +129,7 @@ pub struct Runtime {
* `Register` sert à enregistrer l'intérêt à une **source** d'évènements (ici `TcpStream`)
* Interaction avec le `Registry` que via la fonction `registry()` qui s'assure
## Amélioration 1: éviter le sondage actif (4/)
## Amélioration 1: éviter le sondage actif 4/
```rust
impl Runtime {
......@@ -157,7 +157,7 @@ impl Runtime {
* `poll.poll()` est réveillé lorsque les évènements liés à la file surviennent
* rien à faire avec `events`, mais c'est ici qu'on va "bloquer"
## Amélioration 1: éviter le sondage actif (5/)
## Amélioration 1: éviter le sondage actif 5/
```rust
fn poll(&mut self) -> PollState<Self::Output> {
......@@ -179,7 +179,7 @@ fn poll(&mut self) -> PollState<Self::Output> {
* on retourne `NotReady`
* On retourne à `block_on()` dans le `main()` et on attend le prochain réveil pour rappeler `poll()`
## Amélioration 1: éviter le sondage actif (6/)
## Amélioration 1: éviter le sondage actif 6/
```console
Program starting
......@@ -219,4 +219,343 @@ HelloAsyncAwait
* Ajouter un Réacteur qui va gérer les `Waker` qui va être responsable de la communication avec l'exécuteur
## Amélioration 2: ajouter un réacteur et un waker (1/)
## Amélioration 2: ajouter un exécuteur, un réacteur et un waker
* On sépare le `Runtime` en `Reactor` et `Executor`
* On ajoute un `Waker` pour faire le lien entre les deux
* On doit modifier `Future` pour prendre en compte le `Waker`
### Le nouveau `main()`
```rust
fn main() {
let mut executor = runtime::init();
executor.block_on(async_main());
}
```
* Très similaire à la version précédente
## Amélioration 2: ajouter un `Executor`
### L'exécuteur aura comme capacités
* Stocke les `Future` du plus haut niveau
* Permet de créer des `Future` de haut niveau dans notre programme asynchrone
* Permet d'avoir plusieurs exécuteurs chacun sur son propre thread système
* Distribuer les `Waker` pour que les tâches puissent dormir quand il n'y a rien à faire et se réveiller quand un des futurs de haut niveau peut progresser
### Ce qu'il ne fera pas
* Ne sera pas vraiment multi-threadé: pas de communication entre les threads possible (pas moyen de "voler" des tâches)
* Chaque exécuteur est agnostique des autres (pas besoin d'avoir `Sync` et `Send` qui est satisfait)
## Amélioration 2: l'exécuteur 1/
```rust
type Task = Box<dyn Future<Output = String>>;
thread_local! {
static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default();
}
struct ExecutorCore {
tasks: RefCell<HashMap<usize, Task>>,
ready_queue: Arc<Mutex<Vec<usize>>>,
next_id: Cell<usize>,
}
```
* Une tâche est juste un type implémentant `Future`
* L'état de l'exécuteur est:
* une liste de tâche et d'identifiants qui doivent être exécutés
* une liste d'identifiants pour les tâches qui doivent être réveillées
* un identifiant pour la prochaîne tâche
* L'exécuteur pour **chaque thread** est stocké dans une variable statique `CURRENT_EXECUTOR`
* `RefCell<_>` gère la mutabilité intlérieure `&mut T` à l'exécution
* `Cell<_>` gère la mutabilité intérieur en sortant les valeurs et en les insérant à nouveau
## Amélioration 2: l'exécuteur 2/
```rust
pub struct Executor;
impl Executor {
fn pop_ready(&self) -> Option<usize> {
CURRENT_EXECUTOR.with(|e| e.ready_queue.lock().map(|mut queue| queue.pop()).unwrap())
}
fn get_future(&self, id: usize) -> Option<Task> {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().remove(&id))
}
fn get_waker(&self, id: usize) -> Waker {
Waker {
id,
thread: thread::current(),
ready_queue: CURRENT_EXECUTOR.with(|e| e.ready_queue.clone()),
}
}
fn insert_task(&self, id: usize, task: Task) {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().insert(id, task));
}
fn task_count(&self) -> usize {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len())
}
}
```
* L'interface est composée de:
* `pop_ready()` qui reoutrne l'id de la prochaîne tache à exécuter
* `get_future()` qui retourne la prochaine tâche à `poll()` par `id`
* `get_waker()` qui crée un nouveau `Waker` correspondant à l'id de la tâche et au thread actuel
* `insert_task()` qui ajoute une une paire `id`, `task` à la liste de tâche à exécuter
* `task_count()` fonction utilitaire qui nous dit combien de tâche il reste à exécuter
## Amélioration 2: l'exécuteur 3/
```rust
pub fn block_on<F>(&mut self, future: F) where F: Future<Output = String> + 'static {
spawn(future);
loop {
while let Some(id) = self.pop_ready() {
let mut future = match self.get_future(id) {
Some(f) => f,
None => continue,
};
let waker = self.get_waker(id);
match future.poll(&waker) {
PollState::NotReady => self.insert_task(id, future),
PollState::Ready(_) => continue,
}
}
let task_count = self.task_count();
let name = String::from(thread::current().name().unwrap_or_default());
if task_count > 0 {
println!("{name}: {task_count}, pending tasks. Sleep until notified.");
thread::park();
} else {
println!("{name}: All tasks finished");
break;
}
}
}
```
* On `spawn()` un nouveau `Future` puis on boucle:
* Tant qu'il reste des tâches à exécuter:
* on récupère l'id de la tâche
* on crée un `Waker` avec l'id de la tâche
* on `poll()` la tâche:
* si la tâche est `Ready(_)` on fait rien
* si la tâche est `NotReady` on la rajoute avec son id dans la liste des tâches
* on compte combien il reste de tâche pour décider si on a terminé ou pas:
* si on a pas terminé on `park()` le thread (on le met en attente)
## Amélioration 2: l'exécuteur 4/
```rust
pub fn spawn<F>(future: F) where F: Future<Output = String> + 'static {
CURRENT_EXECUTOR.with(|e| {
let id = e.next_id.get();
e.tasks.borrow_mut().insert(id, Box::new(future));
e.ready_queue
.lock()
.map(|mut queue| queue.push(id))
.unwrap();
e.next_id.set(id + 1);
});
}
```
* `spawn()` un `Future`:
1. récupére l'identifiant actuel
2. ajoute le `Future` et son identifiant dans la liste des tâche
3. ajoute l'id dans la liste des tâches prêtes à être `poll()`
4. incrémente l'identifiant
## Amélioration 2: le `Waker` 1/
* Utilisé pour réveiller une tâche sans avoir besoin de `Poll` directement
* Fera partie de l'`Executor` et du trait `Future`
* Utilisation de `thread::park()/thread::unpark()` pour arrêter / redémarrer un thread.
### Un `Waker`
```rust
pub struct Waker {
thread: Thread,
id: usize,
ready_queue: Arc<Mutex<Vec<usize>>>,
}
```
* Un `Waker` est appelé depuis un `Thread` système
* il a un identifiant
* Et gère les tâches à réveiller: celles qui sont prêtes à être exécutées
## Amélioration 2: le `Waker` 2/
```rust
impl Waker {
pub fn wake(&self) {
self.ready_queue
.lock()
.map(|mut queue| queue.push(self.id))
.unwrap();
self.thread.unpark();
}
}
```
* `wake()` réveille un thread:
* ajoute le la tâche dans la liste des tâches à redémarrer
* réveille le thread appeland: `unpark()`
* `wake()` peut être appelé de différents endroits => `ready_queue: Arc<Mutex<_>>`
## Amélioration 2: le `Waker` 3/
```rust
pub trait Future {
type Output;
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>;
}
```
* `Future` prend `Waker` en argument (cf. la lib standard)
* On doit adapter `HttpGetFuture` (argument de `poll()` et les `poll()` subséquents)
## Amélioration 2: ajouter un `Reactor`
### Ce qu'il fera
* Gérer de façon efficace les attentes et les évènements auxquels notre `Ruintime` sera intéressé
* Stocker une collection de `Waker` et s'assurer que le `Waker` correct est réveillé en fonction de la notification qu'il reçoit
* Permettre aux `Future` de s'abonner / désabonner aux évènements
## Amélioration 2: le `Reactor` 1/
```rust
type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
static REACTOR: OnceLock<Reactor> = OnceLock::new();
pub fn reactor() -> &'static Reactor {
REACTOR.get().expect("Called outside a runtime context")
}
pub struct Reactor {
wakers: Wakers,
registry: Registry,
next_id: AtomicUsize,
}
```
* une `HashMap` de `Waker` et leur `id`
* un `Registry` qui permet de spécifier une source d'évènements
* un identifiant pour avoir spécifier un `Token()` pour nos abonnements
## Amélioration 2: le `Reactor` 2/
```rust
impl Future for HttpGetFuture {
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
if self.stream.is_none() {
self.write_request();
runtime::reactor().register(self.stream.as_mut().unwrap(), Interest::READABLE, self.id);
runtime::reactor().set_waker(waker, self.id);
}
let mut buf = vec![0u8; 4096];
loop {
match self.stream.as_mut().unwrap().read(&mut buf) {
// 0 bytes read so we are finished we got the entire GET response
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer);
runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id);
break PollState::Ready(String::from(s));
}
...
Err(e) if e.kind() == ErrorKind::WouldBlock => {
runtime::reactor().set_waker(waker, self.id);
break PollState::NotReady;
}
...
}
}
}
}
```
* Au premier `poll()`:
* on enregistre l'intérêt pour le `TcpStream`
* on ajoute le `Waker`
* Quand on a fini on retire le `Waker` et son identifiant
* Quand on est pas prêt on met à jour le `Waker` (pour faire comme l'API standard)
## Amélioration 2: le `Reactor` 3/
```rust
impl Reactor {
pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
self.registry.register(stream, Token(id), interest).unwrap()
}
pub fn set_waker(&self, waker: &Waker, id: usize) {
self.wakers
.lock()
.map(|mut w| w.insert(id, waker.clone()))
.unwrap();
}
pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
self.registry.deregister(stream).unwrap();
}
pub fn next_id(&self) -> usize {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
}
```
* `register()`: S'abonne une source d'évènements `TcpStream` avec le type d'évènements (`READABLE` ici) et l'identifiant
* `deregister()`: Se désabonne du `TcpStream` (quand on a fini avec notre `Future`), et retire le `Waker` de la `HashMap`
* `set_waker()`: Insère le `Waker` et son `id` dans la `HashMap` des `Waker`
* `next_id()`: passe au prochain `id` de façon atomique
## Amélioration 2: le `Reactor` 4/
```rust
pub fn start() {
let wakers = Arc::new(Mutex::new(HashMap::<usize, Waker>::new()));
let poll = Poll::new().unwrap();
let registry = poll.registry().try_clone().unwrap();
let next_id = AtomicUsize::new(1);
let reactor = Reactor {
wakers: wakers.clone(),
registry,
next_id,
};
REACTOR.set(reactor).ok().expect("Rector already running");
std::thread::spawn(move || event_loop(poll, wakers));
}
```
* Au démarrage du réacteur:
* On crée les structures de données: `Poll`, `Registry`, `Reactor`
* On crée le `REACTOR` qui permet de partager les `Waker`
* On déplace la boucle d'évènements dans un thread séparé
## Amélioration 2: le `Reactor` 5/
```rust
fn event_loop(mut poll: Poll, wakers: Wakers) {
let mut events = Events::with_capacity(100);
loop {
poll.poll(&mut events, None).unwrap();
events.iter().for_each(|e| {
let Token(id) = e.token();
let wakers = wakers.lock().unwrap();
if let Some(waker) = wakers.get(&id) {
waker.wake();
}
});
}
}
```
* On a une boucle infinie qui monitore le système
* Chaque nouvel évènement, on récupère l'identifiant et on `wake()` le `Waker` correspondant
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment