Skip to content
Snippets Groups Projects
Verified Commit 7278a74d authored by orestis.malaspin's avatar orestis.malaspin
Browse files

added exemple

parent 1825f705
No related branches found
No related tags found
No related merge requests found
Pipeline #38992 passed
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 4
[[package]]
name = "coroutines-variables"
version = "0.1.0"
dependencies = [
"mio",
]
[[package]]
name = "libc"
version = "0.2.171"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c19937216e9d3aa9956d9bb8dfc0b0c8beb6058fc4f7a4dc4d850edf86a237d6"
[[package]]
name = "log"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13dc2df351e3202783a1fe0d44375f7295ffb4049267b0f3018346dc122a1d94"
[[package]]
name = "mio"
version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys",
]
[[package]]
name = "wasi"
version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "windows-sys"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_gnullvm",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3"
[[package]]
name = "windows_aarch64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469"
[[package]]
name = "windows_i686_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b"
[[package]]
name = "windows_i686_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66"
[[package]]
name = "windows_i686_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66"
[[package]]
name = "windows_x86_64_gnu"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d"
[[package]]
name = "windows_x86_64_msvc"
version = "0.52.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[package]
name = "coroutines-variables"
version = "0.1.0"
edition = "2024"
[dependencies]
mio = { version = "1.0.3", features = ["net", "os-poll"] }
use crate::runtime::Waker;
// Future trait looks like standard Future except for the missing context
// pub trait Future {
// type Output;
// fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
//}
// We now also have the `Waker` as an argument of the `Future` trait. We see that in the standard
// library the Waker is hidden in a Context which we do not need here
pub trait Future {
type Output;
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output>;
}
// Similar to the Poll #[derive(Debug)]
// pub enum Poll<T> {
// Ready(T),
// Pending,
// }
pub enum PollState<T> {
Ready(T),
NotReady,
}
// Exercise adapt the join_all function to accept the Waker structs
use mio::Interest;
use crate::future::{Future, PollState};
use crate::runtime::{self, reactor, Waker};
use std::io::{ErrorKind, Read, Write};
// Simple helper function to write GET requests
fn get_req(path: &str) -> String {
format!(
"GET {path} HTTP/1.1\r\n\
Host: localhost\r\n\
Connection: close\r\n\
\r\n"
)
}
pub struct Http;
impl Http {
pub fn get(path: &str) -> impl Future<Output = String> {
HttpGetFuture::new(path)
}
}
// The id of the `Future` is also a needed information to register it for the `Waker` and `Poll`
struct HttpGetFuture {
// Option since we will not conect upon creation
stream: Option<mio::net::TcpStream>,
// buffer to read from the TcpStream
buffer: Vec<u8>,
// The GET request we will construct
path: String,
// The id of our Future
id: usize,
}
impl HttpGetFuture {
// Create the new HttpGetFuture with default state
// We increment the `next_id()` and assign it to the id field
fn new(path: &str) -> Self {
let id = reactor().next_id();
Self {
stream: None,
buffer: vec![],
path: String::from(path),
id,
}
}
// Writes the request to a non bocking stream. Here we are using mio t connect to the socket
// We just create the stream and do not perform any request here
fn write_request(&mut self) {
let stream = std::net::TcpStream::connect("localhost:8080").unwrap();
stream.set_nonblocking(true).unwrap();
let mut stream = mio::net::TcpStream::from_std(stream);
stream.write_all(get_req(&self.path).as_bytes()).unwrap();
self.stream = Some(stream);
}
}
impl Future for HttpGetFuture {
type Output = String;
// We have three different states here:
// 1. stream is None, therefore we have not started executing anything (State would be
// NotStarted)
// 2. stream is Some and read returns WouldBlock (State would be Pending)
// 3. stream is Some and read returns 0 (State would be Resolved)
// These states are implicit here but we actually have some kind of state machine
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
// First poll we write the GET request to the server and register our interest into read
// evnts on te TcpStream. We then directly poll the stream immediatly (instead of returning
// NotReady).
//
// It is very important to note that we call poll() at least once before doing anything so
// we actually start doing something not when we create the future but when we await it
// This is an example of Lazy evaluation (we only do something when we poll not before)
if self.stream.is_none() {
println!("First poll - start operation");
self.write_request();
// Registering the Tcp stream as a source of events in the poll instance
runtime::reactor().register(self.stream.as_mut().unwrap(), Interest::READABLE, self.id);
// We also set the `Waker` responsible for handling the `Future`
runtime::reactor().set_waker(waker, self.id);
}
let mut buf = vec![0u8; 4096];
// We loop until everything is read and return Ready when done
loop {
// We listen on the socket and read
match self.stream.as_mut().unwrap().read(&mut buf) {
// 0 bytes read so we are finished we got the entire GET response
Ok(0) => {
let s = String::from_utf8_lossy(&self.buffer);
// The future is over we deregister our interest from this source of events for
// the current id
runtime::reactor().deregister(self.stream.as_mut().unwrap(), self.id);
break PollState::Ready(String::from(s));
}
// We read n bytes so we continue reading
Ok(n) => {
self.buffer.extend(&buf[0..n]);
continue;
}
// This error indicates that the data is not ready yet or that that there is more
// data we did not received yet
Err(e) if e.kind() == ErrorKind::WouldBlock => {
println!("Data not ready");
// We need to
runtime::reactor().set_waker(waker, self.id);
break PollState::NotReady;
}
// This error can happen if a signal interrupted the read (which can happen) we
// handle this case by reading again
Err(e) if e.kind() == ErrorKind::Interrupted => {
continue;
}
// Nothing we can do. We panic!
Err(e) => panic!("{e:?}"),
}
}
}
}
mod future;
mod http;
mod runtime;
use future::{Future, PollState};
use http::Http;
use runtime::Waker;
use std::fmt::Write;
fn main() {
let mut executor = runtime::init();
executor.block_on(async_main());
}
// =================================
// We rewrite this:
// =================================
// The code we want to write into a buffer instead of printing in the terminal
// Here we use the writeln!() macro which is similar to println! but instead of writing in the
// stadard output we write in a buffer. Also writer represents the buffer and a mutable reference
// to it.
// coroutine fn async_main() {
// let mut buffer = String::from("\nBUFFER:\n------\n");
// let writer = &mut buffer;
// let mut counter = 0;
// println!("Program starting");
// let txt = Http::get("/600/HelloAsyncWait").wait;
// writeln!(writer, "{txt}").unwrap();
// counter += 1;
// let txt = Http::get("/400/HelloAsyncWait").wait;
// writeln!(writer, "{txt}").unwrap();
// writeln!(writer, "------").unwrap();
// counter += 1;
// println!("Received {} responses", counter);
// println!("{buffer}");
// }
// =================================
// Into this:
// =================================
fn async_main() -> impl Future<Output = String> {
Coroutine::new()
}
// New: Only stores an Option<usize> which will be our counter
// We first want a counter, then a self referencial String
// all in options
struct Stack {}
enum State {
Start,
Wait1(Box<dyn Future<Output = String>>),
Wait2(Box<dyn Future<Output = String>>),
Resolved,
}
struct Coroutine {
state: State,
// New: We add a stack that will store an internal state
}
impl Coroutine {
fn new() -> Self {
// New: Add initial stack state
Self {
state: State::Start,
}
}
}
impl Future for Coroutine {
type Output = String;
fn poll(&mut self, waker: &Waker) -> PollState<Self::Output> {
loop {
match self.state {
State::Start => {
// New: We initialize the counter, and buffer/writer when the Coroutine starts
// We create the buffer and the writer by taking the mutable reference from it
// and storing into the Option. Here the type in the Option is coerced from
// &mut to *mut
println!("Program starting");
let fut1 = Box::new(Http::get("/600/HelloAsyncWait"));
self.state = State::Wait1(fut1);
}
State::Wait1(ref mut f1) => {
match f1.poll(waker) {
PollState::Ready(txt) => {
// New: restore the stack
// take() gets the value stored in the Option and leaves a None in its
// place. It like "sapping" the internal state with "nothing". We thus
// take the ownership of the internal stake in order to modify it.
// Add txt to the writer, and incremetn the counter
let fut2 = Box::new(Http::get("/400/HelloAsyncWait"));
self.state = State::Wait2(fut2);
// New: We need to save the stack (overwrite the None part)
}
PollState::NotReady => break PollState::NotReady,
}
}
State::Wait2(ref mut f2) => {
match f2.poll(waker) {
PollState::Ready(txt) => {
// New: restore the stack
// take() gets the value stored in the Option and leaves a None in its
// place. It like "sapping" the internal state with "nothing". We thus
// take the ownership of the internal stake in order to modify it.
// if we took ownershiup here, we would invalidate the pointer of the
// writer. Add txt to the writer, and incremetn the counter
// Println! the buffer and the counte
self.state = State::Resolved;
// New: We have finished. The internal State must be back to None, which is
// what `take()` did for us. Except for the buffer which we should also
// release here. If we do not to that here the memory would leave until
// the coroutine goes out of scope.
break PollState::Ready(String::new());
}
PollState::NotReady => break PollState::NotReady,
}
}
State::Resolved => panic!("Polled a resolved future"),
}
}
}
}
mod executor;
mod reactor;
pub use executor::{Executor, Waker};
pub use reactor::reactor;
pub fn init() -> Executor {
reactor::start();
Executor::new()
}
use std::{
cell::{Cell, RefCell},
collections::HashMap,
sync::{Arc, Mutex},
thread::{self, Thread},
};
use crate::future::{Future, PollState};
// A `Waker` contains the thread, `std::thread::Thread`,
// from (`thread::current()`) which it is called in order to be `park()` or `unpark()`. This
// is not a robust method, as `park()` or `unpark()` could be called from several
// places in the code, resulting in potential deadlocks. We will use this
// method to ask the OS to wake up a thread at the right moment (or to put it to sleep).
// To be really generic, we should have a `Waker` feature, but that would be too
// complicated in the scope of this course.
//
// A waker contains its thread, an `id` which identifies the associated task, and a list of
// of task identifiers that are ready to be executed. This is necessary to add
// the current identifier to the list of tasks to be executed
#[derive(Clone)]
pub struct Waker {
thread: Thread,
id: usize,
ready_queue: Arc<Mutex<Vec<usize>>>,
}
impl Waker {
// Wake means add the id of the task to the ready_queue and `unpark()` the associated thread.
// We can then call `poll()` on the task to see its progress.
// Of course we must lock the ready_queue first otherwise there may be a place where we are
// modifying concurrently the ready_queue which would cause a concurrent access
pub fn wake(&self) {
self.ready_queue
.lock()
.map(|mut queue| queue.push(self.id))
.unwrap();
self.thread.unpark();
}
}
// A Task is just something that implements the `Future` trait
type Task = Box<dyn Future<Output = String>>;
// CURRENT_EXECUTOR is a static variable that is unique to the thread it was called from. It can
// not be accessed from any other thread
thread_local! {
static CURRENT_EXECUTOR: ExecutorCore = ExecutorCore::default();
}
// The `ExecutorCore` holds the state of the `Executor`.
//
// The `tasks` is a `HashMap` between its `id` and the actual `Taksk` (anything that implements the
// `Future` trait). We only need a RefCell since it will never be accessed by different threads.
// A `RefCell` allow **interior** mutability (the thing stored inside can be mutated) but rules are
// checked at runtime instead of at compile time. Only one instance can hold a mutable reference at
// any given time (no need for Arc). `RefCell` allows to access a value by &mut T
//
// The `ready_queue` is the same as the queue for the `Waker`. It contains the list of tasks ready
// for execution. Each `Waker` having a copy and each `Waker` being on a different thread an
// `Arc<Mutex<_>>` of the ids is necessary here.
//
// The `next_id` is a counter giving us the top level id of each `Future` that is spawned. It is a
// simple `Cell` because the value inside will be replaced everything we need to mutate it. `Cell`
// does not allow to access a `&mut T` but we need to take the value out of the `Cell` and replace
// it wil something else.
#[derive(Default)]
struct ExecutorCore {
tasks: RefCell<HashMap<usize, Task>>,
ready_queue: Arc<Mutex<Vec<usize>>>,
next_id: Cell<usize>,
}
// The `spawn` function does the following things:
// 1. Gets the next available id for our top level `Future`
// 2. Adds the id and the future, into our tasks HashMap
// 3. Adds the id into the list of ids that are ready to be executes (this would be their first
// executrion)
// 4. Prepares the next id (by incrementation by 1)
//
// Here `F` must have a 'static lifetime which means that the `Future` must live until the end of
// our program. In other terms it must outlive `spawn()`
pub fn spawn<F>(future: F)
where
F: Future<Output = String> + 'static,
{
CURRENT_EXECUTOR.with(|e| {
let id = e.next_id.get();
e.tasks.borrow_mut().insert(id, Box::new(future));
e.ready_queue
.lock()
.map(|mut queue| queue.push(id))
.unwrap();
e.next_id.set(id + 1);
});
}
// The executor is all stored in tthe `ExecutorCore` which is a local static variable anyway, so we
// just create an empty streuct for a namespacing purpose
pub struct Executor;
impl Executor {
// Just a new instance, since everything is done in the CURRENT_EXECUTOR static variable when a
// new thread is spawned
pub fn new() -> Self {
Self {}
}
// Pops an id from the ready_queue list. Must of course be locked before doing anything. The
// queue works rather as a stack than as a queue but it's not very important.
fn pop_ready(&self) -> Option<usize> {
CURRENT_EXECUTOR.with(|e| e.ready_queue.lock().map(|mut queue| queue.pop()).unwrap())
}
// Returns the task that is related to the id and removes it from the tasks hashmap
// This will be used for polling. If the task returns not ready we will have to put it back
// again later
fn get_future(&self, id: usize) -> Option<Task> {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().remove(&id))
}
// Creates a new Waker from an id
fn get_waker(&self, id: usize) -> Waker {
Waker {
id,
thread: thread::current(),
ready_queue: CURRENT_EXECUTOR.with(|e| e.ready_queue.clone()),
}
}
// Inserts a new task with a given id into the tasks HashMap
fn insert_task(&self, id: usize, task: Task) {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow_mut().insert(id, task));
}
// Counts how many tasks are left to be executed
fn task_count(&self) -> usize {
CURRENT_EXECUTOR.with(|e| e.tasks.borrow().len())
}
pub fn block_on<F>(&mut self, future: F)
where
F: Future<Output = String> + 'static,
{
// New: OPTIMIZATION: It could be that the future is ready on the first call to block_on() and
// instead to do all these things here, we just want to return realy.
// We therefore create a waker with any ID, poll() and return early if we are Ready. The
// top level future is the one we are waiting and if it's ready we don't want to block
// until the end of ages
//let waker = self.get_waker(usize::MAX);
//let mut future = future;
//match future.poll(&waker) {
// PollState::NotReady => (),
// PollState::Ready(_) => return,
//}
// END OPTIMIZATION
spawn(future);
loop {
while let Some(id) = self.pop_ready() {
let mut future = match self.get_future(id) {
Some(f) => f,
// guard against false wakeups
None => continue,
};
let waker = self.get_waker(id);
match future.poll(&waker) {
PollState::NotReady => self.insert_task(id, future),
PollState::Ready(_) => continue,
}
}
let task_count = self.task_count();
let name = String::from(thread::current().name().unwrap_or_default());
if task_count > 0 {
println!("{name}: {task_count}, pending tasks. Sleep until notified.");
thread::park();
} else {
println!("{name}: All tasks finished");
break;
}
}
}
}
use std::{
collections::HashMap,
sync::{atomic::AtomicUsize, Arc, Mutex, OnceLock},
};
use mio::{net::TcpStream, Events, Interest, Poll, Registry, Token};
use crate::runtime::Waker;
type Wakers = Arc<Mutex<HashMap<usize, Waker>>>;
// The REACTOR may be accessed from variuous threads and must therefore be hidden behind a lock.
// Also it must never change one initialized when we start the reactor.
static REACTOR: OnceLock<Reactor> = OnceLock::new();
// To access the Reractor that will be borrowed from various places
pub fn reactor() -> &'static Reactor {
REACTOR.get().expect("Called outside a runtime context")
}
// The reactor will be made of three fields:
//
// 1. A list of wakers that is a HashMap of id, Waker
// 2. The registry that will help us interact with the Poll event queue
// 3. The next available id for a Waker.
pub struct Reactor {
wakers: Wakers,
registry: Registry,
next_id: AtomicUsize,
}
impl Reactor {
// We register interest for the `TcpStream` for a given interest and assign it the `id` in
// argumetn as a Token
pub fn register(&self, stream: &mut TcpStream, interest: Interest, id: usize) {
self.registry.register(stream, Token(id), interest).unwrap()
}
// Adds a waker into the `Wakers` `HashMap` along with its `id`. We replace the `Waker` if one
// with the same `id` is already present
pub fn set_waker(&self, waker: &Waker, id: usize) {
self.wakers
.lock()
.map(|mut w| w.insert(id, waker.clone()))
.unwrap();
}
// Removes a `Waker` from our `HashMap` and deregisters the `TcpStream` from the `Poll`
// instance
pub fn deregister(&self, stream: &mut TcpStream, id: usize) {
self.wakers.lock().map(|mut w| w.remove(&id)).unwrap();
self.registry.deregister(stream).unwrap();
}
// Increments the counter atomically. Relaxed just means atomicity. Other possiblities of
// Ordering can garantee that there is a certain ordering in how values are added from
// different threads.
pub fn next_id(&self) -> usize {
self.next_id
.fetch_add(1, std::sync::atomic::Ordering::Relaxed)
}
}
// 1. We poll events without a timeout in an infinite loop
// 2. For each event we get the token associated to the events (it is the id that was passed upon
// creation of the interest)
// 3. We `wake()` the waker by id
fn event_loop(mut poll: Poll, wakers: Wakers) {
let mut events = Events::with_capacity(100);
loop {
poll.poll(&mut events, None).unwrap();
events.iter().for_each(|e| {
let Token(id) = e.token();
let wakers = wakers.lock().unwrap();
if let Some(waker) = wakers.get(&id) {
waker.wake();
}
});
}
}
// This function starts the Reactor by:
// 1. Creating the Wakers HashMap and the Poll instance.
// 2. Cloning the registry used for the source (the TcpStream)
// 3. Creating a new Reactor
// 4. Initializing the static variable REACTOR and making sure no other instance is running
// 5. Starting the `envent_loop()` for this Poll and Wakers and move it into a separate thread (we
// never take ownership again of Poll)
pub fn start() {
let wakers = Arc::new(Mutex::new(HashMap::<usize, Waker>::new()));
let poll = Poll::new().unwrap();
let registry = poll.registry().try_clone().unwrap();
let next_id = AtomicUsize::new(1);
let reactor = Reactor {
wakers: wakers.clone(),
registry,
next_id,
};
REACTOR.set(reactor).ok().expect("Rector already running");
std::thread::spawn(move || event_loop(poll, wakers));
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment