Skip to content
Snippets Groups Projects
Verified Commit a0fcfd00 authored by orestis.malaspin's avatar orestis.malaspin
Browse files

added first example

parent 4de5f402
No related branches found
No related tags found
No related merge requests found
Pipeline #38461 passed
---
title: "Coroutines sans pile"
author: "Orestis Malaspinas"
date: "2025-03-17"
patat:
slideNumber: true
wrap: true
margins:
left: 10
right: 10
top: auto
slideLevel: 2
images:
backend: kitty
...
# Introduction aux coroutines sans pile
## Stackless coroutines
* Simplemnt une tâche qu'on peut mettre en pause et redémarrer: rendre le contrôle au scheduler (ou à une autre coroutine)
* En opposition aux coroutines avec pile (fibres/green-threads)
* On utilise une machine d'états contenant les informations pour arrêter-redémarrer une tâche (on met pas ces informations dans une pile séparée)
## Ce qu'on va faire
* Écrire notre propre trait `Future`
* Faire un client HTTP simplifié qui fera de requêtes `GET`
* Une tâche qu'on peut arrêter-redémarrer implémentée sous la forme d'une machine d'états
* Notre syntaxe `async/await` à nous (nommé `coroutine/wait` ben oui on peut pas réutiliser `async/await`)
* Un préprocesseur qui va transformer notre syntaxe en machine d'états
## Ce qu'on va pas faire
* Gérer les erreurs (erreur => panique)
* Pas de généricité
* Pas de macros (meilleure lisibilité)
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
[[package]]
name = "coroutines"
version = "0.1.0"
dependencies = [
"mio",
]
[[package]]
name = "libc"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "log"
version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
[[package]]
name = "mio"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[package]
name = "coroutines"
version = "0.1.0"
edition = "2021"
[dependencies]
mio = { version = "1.0.3", features = ["net", "os-poll"] }
// Future trait looks like standard Future except for the missing context
// pub trait Future {
// type Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
//}
pub trait Future {
type Output;
fn poll(&mut self) -> PollState<Self::Output>;
}
// Similar to the Poll #[derive(Debug)]
// pub enum Poll<T> {
// Ready(T),
// Pending,
// }
pub enum PollState<T> {
Ready(T),
NotReady,
}
use crate::future::{Future, PollState};
use std::io::{ErrorKind, Read, Write};
// Simple helper function to write GET requests
fn get_req(path: &str) -> String {
format!(
"GET {path} HTTP/1.1\r\n\
Host: localhost\r\n\
Connection: close\r\n\
\r\n"
)
}
pub struct Http;
impl Http {
pub fn get(path: &str) -> impl Future<Output = String> {
HttpGetFuture::new(path)
}
}
struct HttpGetFuture {
// Option since we will not conect upon creation
stream: Option<mio::net::TcpStream>,
// buffer to read from the TcpStream
buffer: Vec<u8>,
// The GET request we will construct
path: String,
}
impl HttpGetFuture {
// Create the new HttpGetFuture with default state
fn new(path: &str) -> Self {
Self {
stream: None,
buffer: vec![],
path: String::from(path),
}
}
// Writes the request to a non bocking stream. Here we are using mio t connect to the socket
// We just create the stream and do not perform any request here
fn write_request(&mut self) {
let stream = std::net::TcpStream::connect("localhost:8080").unwrap();
stream.set_nonblocking(true).unwrap();
let mut stream = mio::net::TcpStream::from_std(stream);
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
self.stream = Some(stream);
}
}
impl Future for HttpGetFuture {
type Output = String;
// We have three different states here:
// 1. stream is None, therefore we have not started executing anything (State would be
// NotStarted)
// 2. stream is Some and read returns WouldBlock (State would be Pending)
// 3. stream is Some and read returns 0 (State would be Resolved)
// These states are implicit here but we actually have some kind of state machine
fn poll(&mut self) -> PollState<Self::Output> {
// First poll we write the GET request to the server and become NotReady
// It is very important to note that we call poll() at least once before doing anything so
// we actually start doing something not when we create the future but when we await it
// This is an example of Lazy evaluation (we only do something when we poll not before)
if self.stream.is_none() {
println!("First poll - start operation");
self.write_request();
return PollState::NotReady;
}
let mut buf = vec![0u8; 4096];
// We loop until everything is read and return Ready when done
loop {
// We listen on the socket and read
match self.stream.as_mut().unwrap().read(&mut buf) {
// 0 bytes read so we are finished we got the entire GET response
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer);
break PollState::Ready(String::from(s));
}
// We read n bytes so we continue reading
Ok(n) => {
self.buffer.extend(&buf[0..n]);
continue;
}
// This error indicates that the data is not ready yet or that that there is more
// data we did not received yet
Err(e) if e.kind() == ErrorKind::WouldBlock => {
break PollState::NotReady;
}
// This error can happen if a signal interrupted the read (which can happen) we
// handle this case by reading again
Err(e) if e.kind() == ErrorKind::Interrupted => {
continue;
}
// Nothing we can do. We panic!
Err(e) => panic!("{e:?}"),
}
}
}
}
pub mod future;
pub mod http;
use std::thread;
use std::time::Duration;
use coroutines::future::{Future, PollState};
use coroutines::http::Http;
// We create a task that we can pause and resume
// as a state machine
//
// We will use our model of pausable tasks to write code that looks a lot like async / await
//
// Our program will do the following
// 1. Print a message when our pausable task is starting
// 2. Make a GET request to a server
// 3. Wait for the GET request
// 4. Print the response form the server
// 5. Make a second GET request
// 6. Wait for a second response from the server
// 7. Print the response from the server
// 8. Exit the program
// For the moment no executor, or reactor
//
// With the standard syntax this code would look like:
// async fn async_main() {
// println!("Program starting")
// let txt = Http::get("/1000/HelloWorld").await;
// println!("{txt}");
// let txt2 = Http::("500/HelloWorld2").await;
// println!("{txt2}");
//}
struct Coroutine {
state: CoroutineState,
}
impl Coroutine {
// when creating the Coroutine we never called poll on it
fn new() -> Self {
Self {
state: CoroutineState::Start,
}
}
}
impl Future for Coroutine {
type Output = (); // We return nothing from the Coroutine Future
fn poll(&mut self) -> PollState<Self::Output> {
// We have an infinite loop in the poll method
// that constantly matches the state to drive the state forward where we poll our children
// futures that will either return NotReady either Ready leading to the mother Future to
// finally go into a Ready state
loop {
match self.state {
// First poll called on Coroutine. We output something and then progress the state
// to the next CoroutineState which is Wait1 which must Wait1 until our first child
// future is resolved
CoroutineState::Start => {
// That's what we promised to do after starting the task
println!("Program starting");
self.state = CoroutineState::Wait1(Box::new(Http::get("/600/HelloWorld1")));
}
// We must first resolve the first child future which is the Http::get function
// which is polled until it is completed. Allowing to go to the ext state, which is
// Wait2. If the first child Future is NotReady we break out of the loop and return
// NotReady
CoroutineState::Wait1(ref mut future) => match future.poll() {
PollState::Ready(txt) => {
println!("{txt}");
self.state = CoroutineState::Wait2(Box::new(Http::get("/400/HelloWorld2")));
}
PollState::NotReady => break PollState::NotReady,
},
// Wait2 in turn stores the second child Future and either breaks if it is not
// ready by returning NotReady or prints the second get response and changes the
// CoroutineState to Resolved and breaking with a Ready return. Also the state
// changes to Resolved as the next state of our Coroutine
CoroutineState::Wait2(ref mut future) => match future.poll() {
PollState::Ready(txt) => {
println!("{txt}");
self.state = CoroutineState::Resolved;
break PollState::Ready(());
}
PollState::NotReady => break PollState::NotReady,
},
// If anybody polls Coroutine again we just panic!
CoroutineState::Resolved => panic!("Polled a resolved Future which is illegal!"),
}
}
}
}
enum CoroutineState {
// The coroutine has been created but never polled yet
Start,
// When we call Http::get we get get the HttpGetFuture that we store in the CoroutineState
// variant. We then return the control back to the calling function
// We are generic over all Future traits implementing types
Wait1(Box<dyn Future<Output = String>>),
// Same as Wait1 but a second time
Wait2(Box<dyn Future<Output = String>>),
// The future is over nothing else to be done
Resolved,
}
//
fn async_main() -> impl Future<Output = ()> {
Coroutine::new()
}
fn main() {
// We call the new Coroutine that returns a Future
let mut future = async_main();
// We poll the Future until completion writing Schedule other tasks each time the Future is in
// NotReady State. We alo wait for 100 milliseconds to not be overwhelmed by messages
loop {
match future.poll() {
PollState::NotReady => {
println!("Schedule other tasks");
}
PollState::Ready(_) => break,
}
thread::sleep(Duration::from_millis(100));
}
}
// The Output will look like:
//
//Program starting
//First poll - start operation
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//HTTP/1.1 200 OK
//content-type: text/plain; charset=utf-8
//content-length: 11
//connection: close
//date: Sun, 16 Mar 2025 16:28:55 GMT
//
//HelloWorld1
//First poll - start operation
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//Schedule other tasks
//HTTP/1.1 200 OK
//content-type: text/plain; charset=utf-8
//content-length: 11
//connection: close
//date: Sun, 16 Mar 2025 16:28:55 GMT
//
//HelloWorld2
//
// The flow of the program is the following:
//
// 1. Program starting (just after the first poll of Coroutine)
// 2. First poll - start operation is when we first poll the http::get
// 3. Back into the main function as we are NotReady and printing for scheduler calls. We could be
// running other tasks here.
// 4. After about 600ms we get the responses
// 5. The same happens after another 400ms and get the response
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment