From 6f0dfa8719558c68116e22ab0ce54e1cfa14a675 Mon Sep 17 00:00:00 2001 From: Drew Galbraith Date: Sat, 1 Feb 2025 13:09:42 -0800 Subject: [PATCH] Register denali on yellowstone, handle read requests. --- rust/lib/mammoth/src/task/mod.rs | 41 ++++++++++++++++++++------ rust/lib/yunq/src/server.rs | 11 ++++--- rust/sys/denali/src/ahci/controller.rs | 1 - rust/sys/denali/src/bin/denali.rs | 17 +++++++++-- rust/sys/denali/src/denali_server.rs | 2 +- yunq/rust/src/codegen.rs | 6 ++-- 6 files changed, 56 insertions(+), 22 deletions(-) diff --git a/rust/lib/mammoth/src/task/mod.rs b/rust/lib/mammoth/src/task/mod.rs index aeba68b..4d8e8d1 100644 --- a/rust/lib/mammoth/src/task/mod.rs +++ b/rust/lib/mammoth/src/task/mod.rs @@ -26,9 +26,7 @@ impl TaskId { pub struct Task { id: TaskId, - // FIXME: This only needs to be sync because of the CURRENT_EXECUTOR - // needing to be shared between threads. - future: Pin + Sync + Send>>, + future: Pin + Send>>, } impl Task { @@ -72,7 +70,7 @@ impl Wake for TaskWaker { } pub struct Executor { - tasks: BTreeMap, + tasks: Arc>>, // TODO: Consider a better datastructure for this. task_queue: Arc>>, waker_cache: BTreeMap, @@ -81,7 +79,7 @@ pub struct Executor { impl Executor { pub fn new() -> Executor { Executor { - tasks: BTreeMap::new(), + tasks: Arc::new(Mutex::new(BTreeMap::new())), task_queue: Arc::new(Mutex::new(VecDeque::new())), waker_cache: BTreeMap::new(), } @@ -89,7 +87,7 @@ impl Executor { pub fn spawn(&mut self, task: Task) { let task_id = task.id; - if self.tasks.insert(task_id, task).is_some() { + if self.tasks.lock().insert(task_id, task).is_some() { panic!("Task is already existed in executor map"); } self.task_queue.lock().push_back(task_id); @@ -97,7 +95,8 @@ impl Executor { fn run_ready_tasks(&mut self) { while let Some(task_id) = self.task_queue.lock().pop_front() { - let task = self.tasks.get_mut(&task_id).unwrap(); + let mut tasks = self.tasks.lock(); + let task = tasks.get_mut(&task_id).unwrap(); let waker = self .waker_cache .entry(task_id) @@ -105,7 +104,7 @@ impl Executor { let mut ctx = Context::from_waker(waker); match task.poll(&mut ctx) { Poll::Ready(()) => { - self.tasks.remove(&task_id); + tasks.remove(&task_id); self.waker_cache.remove(&task_id); } Poll::Pending => {} @@ -120,6 +119,30 @@ impl Executor { syscall::thread_sleep(50).unwrap(); } } + + pub fn new_spawner(&self) -> Spawner { + Spawner::new(self.tasks.clone(), self.task_queue.clone()) + } } -static CURRENT_EXECUTOR: Option = None; +pub struct Spawner { + tasks: Arc>>, + task_queue: Arc>>, +} + +impl Spawner { + fn new( + tasks: Arc>>, + task_queue: Arc>>, + ) -> Self { + Spawner { tasks, task_queue } + } + + pub fn spawn(&self, task: Task) { + let task_id = task.id; + if self.tasks.lock().insert(task_id, task).is_some() { + panic!("Task is already existed in executor map"); + } + self.task_queue.lock().push_back(task_id); + } +} diff --git a/rust/lib/yunq/src/server.rs b/rust/lib/yunq/src/server.rs index b4e3340..a5ded0b 100644 --- a/rust/lib/yunq/src/server.rs +++ b/rust/lib/yunq/src/server.rs @@ -4,9 +4,8 @@ use crate::buffer::ByteBuffer; use alloc::sync::Arc; use alloc::vec::Vec; use mammoth::cap::Capability; -use mammoth::sync::Mutex; use mammoth::syscall; -use mammoth::task::Executor; +use mammoth::task::Spawner; use mammoth::task::Task; use mammoth::thread; use mammoth::thread::JoinHandle; @@ -70,7 +69,7 @@ pub trait AsyncYunqServer where Self: Send + Sync + 'static, { - fn server_loop(self: Arc, executor: Arc>) { + fn server_loop(self: Arc, spawner: Spawner) { loop { let mut byte_buffer = ByteBuffer::<1024>::new(); let mut cap_buffer = vec![0; 10]; @@ -85,7 +84,7 @@ where .at::(8) .expect("Failed to access request length."); let self_clone = self.clone(); - executor.lock().spawn(Task::new((async move || { + spawner.spawn(Task::new((async move || { self_clone .handle_request_and_response(method, byte_buffer, cap_buffer, reply_port_cap) .await; @@ -137,11 +136,11 @@ where ) -> impl Future> + Sync + Send; } -pub fn spawn_async_server_thread(server: Arc, executor: Arc>) -> JoinHandle +pub fn spawn_async_server_thread(server: Arc, spawner: Spawner) -> JoinHandle where T: AsyncYunqServer + Send + Sync + 'static, { thread::spawn(move || { - server.server_loop(executor); + server.server_loop(spawner); }) } diff --git a/rust/sys/denali/src/ahci/controller.rs b/rust/sys/denali/src/ahci/controller.rs index e4f5e97..6a46440 100644 --- a/rust/sys/denali/src/ahci/controller.rs +++ b/rust/sys/denali/src/ahci/controller.rs @@ -197,7 +197,6 @@ pub fn spawn_irq_thread(controller: Arc) -> thread::JoinHandle { }); loop { irq_port.recv_null().unwrap(); - mammoth::debug!("Interrupt!"); controller.handle_irq(); } }; diff --git a/rust/sys/denali/src/bin/denali.rs b/rust/sys/denali/src/bin/denali.rs index ee84990..0e831d3 100644 --- a/rust/sys/denali/src/bin/denali.rs +++ b/rust/sys/denali/src/bin/denali.rs @@ -11,8 +11,10 @@ use mammoth::{ zion::z_err_t, }; -use denali::ahci::{self, identify_ports, spawn_irq_thread, AhciController}; +use denali::ahci::{identify_ports, spawn_irq_thread, AhciController}; use denali::{denali_server::DenaliServerImpl, AsyncDenaliServer}; +use yellowstone_yunq::RegisterEndpointRequest; +use yunq::server::AsyncYunqServer; define_entry!(); @@ -42,7 +44,18 @@ extern "C" fn main() -> z_err_t { let denali_server = Arc::new(AsyncDenaliServer::new(DenaliServerImpl::new(ahci_controller.clone())).unwrap()); - let server_thread = yunq::server::spawn_async_server_thread(denali_server, executor.clone()); + let server_thread = yunq::server::spawn_async_server_thread( + denali_server.clone(), + executor.lock().new_spawner(), + ); + + let yellowstone = yellowstone_yunq::from_init_endpoint(); + yellowstone + .register_endpoint(&RegisterEndpointRequest { + endpoint_name: "denali".into(), + endpoint_capability: denali_server.create_client_cap().unwrap().release(), + }) + .unwrap(); executor.clone().lock().run(); diff --git a/rust/sys/denali/src/denali_server.rs b/rust/sys/denali/src/denali_server.rs index ebea567..a48cace 100644 --- a/rust/sys/denali/src/denali_server.rs +++ b/rust/sys/denali/src/denali_server.rs @@ -32,7 +32,7 @@ impl AsyncDenaliServerHandler for DenaliServerImpl { }) } - async fn read_many(&self, req: ReadManyRequest) -> Result { + async fn read_many(&self, _req: ReadManyRequest) -> Result { Err(ZError::UNIMPLEMENTED) } } diff --git a/yunq/rust/src/codegen.rs b/yunq/rust/src/codegen.rs index b022a33..d80ef67 100644 --- a/yunq/rust/src/codegen.rs +++ b/yunq/rust/src/codegen.rs @@ -435,13 +435,13 @@ fn generate_async_server_method(method: &Method) -> TokenStream { let maybe_resp = method.response.clone().map(|r| ident(&r)); match (maybe_req, maybe_resp) { (Some(req), Some(resp)) => quote! { - fn #name (&self, req: #req) -> impl Future> + Sync; + fn #name (&self, req: #req) -> impl Future> + Sync + Send; }, (Some(req), None) => quote! { - fn #name (&self, req: #req) -> impl Future> + Sync; + fn #name (&self, req: #req) -> impl Future> + Sync + Send; }, (None, Some(resp)) => quote! { - fn #name (&self) -> impl Future> + Sync; + fn #name (&self) -> impl Future> + Sync + Send; }, _ => unreachable!(), }