diff --git a/rust/lib/mammoth/src/lib.rs b/rust/lib/mammoth/src/lib.rs index cf5d3f0..2058a6b 100644 --- a/rust/lib/mammoth/src/lib.rs +++ b/rust/lib/mammoth/src/lib.rs @@ -16,5 +16,6 @@ pub mod mem; pub mod port; pub mod sync; pub mod syscall; +pub mod task; pub mod thread; pub mod zion; diff --git a/rust/lib/mammoth/src/task/mod.rs b/rust/lib/mammoth/src/task/mod.rs new file mode 100644 index 0000000..0b23a67 --- /dev/null +++ b/rust/lib/mammoth/src/task/mod.rs @@ -0,0 +1,125 @@ +use core::{ + future::Future, + pin::Pin, + sync::atomic::{AtomicU64, Ordering}, + task::{Context, Poll, Waker}, +}; + +use alloc::{ + boxed::Box, + collections::{BTreeMap, VecDeque}, + sync::Arc, + task::Wake, +}; + +use crate::{sync::Mutex, syscall}; + +#[derive(PartialEq, Eq, PartialOrd, Ord, Copy, Clone)] +struct TaskId(u64); + +impl TaskId { + pub fn new() -> TaskId { + static NEXT_ID: AtomicU64 = AtomicU64::new(0); + TaskId(NEXT_ID.fetch_add(1, Ordering::Relaxed)) + } +} + +pub struct Task { + id: TaskId, + // FIXME: This only needs to be sync because of the CURRENT_EXECUTOR + // needing to be shared between threads. + future: Pin + Sync>>, +} + +impl Task { + pub fn new(future: impl Future + Sync + 'static) -> Task { + Task { + id: TaskId::new(), + future: Box::pin(future), + } + } + + pub fn poll(&mut self, context: &mut Context) -> Poll<()> { + self.future.as_mut().poll(context) + } +} + +struct TaskWaker { + task_id: TaskId, + task_queue: Arc>>, +} + +impl TaskWaker { + fn new(task_id: TaskId, task_queue: Arc>>) -> Waker { + Waker::from(Arc::new(TaskWaker { + task_id, + task_queue, + })) + } + fn wake_task(&self) { + self.task_queue.lock().push_back(self.task_id); + } +} + +impl Wake for TaskWaker { + fn wake(self: Arc) { + self.wake_task(); + } + + fn wake_by_ref(self: &Arc) { + self.wake_task(); + } +} + +pub struct Executor { + tasks: BTreeMap, + // TODO: Consider a better datastructure for this. + task_queue: Arc>>, + waker_cache: BTreeMap, +} + +impl Executor { + pub fn new() -> Executor { + Executor { + tasks: BTreeMap::new(), + task_queue: Arc::new(Mutex::new(VecDeque::new())), + waker_cache: BTreeMap::new(), + } + } + + pub fn spawn(&mut self, task: Task) { + let task_id = task.id; + if self.tasks.insert(task_id, task).is_some() { + panic!("Task is already existed in executor map"); + } + self.task_queue.lock().push_back(task_id); + } + + fn run_ready_tasks(&mut self) { + while let Some(task_id) = self.task_queue.lock().pop_front() { + let task = self.tasks.get_mut(&task_id).unwrap(); + let waker = self + .waker_cache + .entry(task_id) + .or_insert_with(|| TaskWaker::new(task_id, self.task_queue.clone())); + let mut ctx = Context::from_waker(waker); + match task.poll(&mut ctx) { + Poll::Ready(()) => { + self.tasks.remove(&task_id); + self.waker_cache.remove(&task_id); + } + Poll::Pending => {} + }; + } + } + + pub fn run(&mut self) { + loop { + self.run_ready_tasks(); + // TODO: We need some sort of semaphore wait here. + syscall::thread_sleep(50).unwrap(); + } + } +} + +static CURRENT_EXECUTOR: Option = None; diff --git a/rust/usr/testbed/src/main.rs b/rust/usr/testbed/src/main.rs index c782312..2ea144b 100644 --- a/rust/usr/testbed/src/main.rs +++ b/rust/usr/testbed/src/main.rs @@ -7,6 +7,8 @@ use alloc::boxed::Box; use alloc::string::ToString; use mammoth::debug; use mammoth::define_entry; +use mammoth::task::Executor; +use mammoth::task::Task; use mammoth::thread; use mammoth::zion::z_err_t; use yellowstone_yunq::GetEndpointRequest; @@ -17,6 +19,22 @@ pub fn testthread() { debug!("Testing 1, 8 ,9"); } +async fn inner_async() { + debug!("Inner Async executed"); +} + +async fn test_async() { + debug!("Async executed"); + + let i = inner_async(); + + debug!("inner async created"); + + i.await; + + debug!("inner async returned"); +} + #[no_mangle] pub extern "C" fn main() -> z_err_t { debug!("Testing!"); @@ -47,5 +65,11 @@ pub extern "C" fn main() -> z_err_t { let t = thread::spawn(|| debug!("Testing 4, 5, 6")); t.join().expect("Failed to wait."); + let mut executor = Executor::new(); + + executor.spawn(Task::new(test_async())); + + executor.run(); + 0 }