From dc63084d61e7bbcd5fd1794c50ae65286b6b56c4 Mon Sep 17 00:00:00 2001 From: Drew Galbraith Date: Wed, 21 Jun 2023 23:57:23 -0700 Subject: [PATCH] [zion] Move synchronization to the message queue --- zion/lib/linked_list.h | 1 + zion/lib/message_queue.cpp | 42 +++++++++++++++++++++++++++++++++++++- zion/lib/message_queue.h | 18 ++++++++++++++-- zion/object/ipc_object.cpp | 22 +------------------- zion/object/ipc_object.h | 12 +---------- zion/object/port.cpp | 1 - 6 files changed, 60 insertions(+), 36 deletions(-) diff --git a/zion/lib/linked_list.h b/zion/lib/linked_list.h index b6d5570..8ed6b05 100644 --- a/zion/lib/linked_list.h +++ b/zion/lib/linked_list.h @@ -11,6 +11,7 @@ class LinkedList { LinkedList(const LinkedList&) = delete; + bool empty() const { return size_ == 0; } uint64_t size() const { return size_; } void PushBack(const T& item) { diff --git a/zion/lib/message_queue.cpp b/zion/lib/message_queue.cpp index 6bcced6..cd9dc7b 100644 --- a/zion/lib/message_queue.cpp +++ b/zion/lib/message_queue.cpp @@ -26,12 +26,31 @@ z_err_t UnboundedMessageQueue::PushBack(uint64_t num_bytes, const void* bytes, message->caps.PushBack(cap); } + MutexHolder h(mutex_); pending_messages_.PushBack(message); + + if (blocked_threads_.size() > 0) { + auto thread = blocked_threads_.PopFront(); + thread->SetState(Thread::RUNNABLE); + gScheduler->Enqueue(thread); + } return glcr::OK; } z_err_t UnboundedMessageQueue::PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) { + mutex_.Lock(); + while (pending_messages_.empty()) { + auto thread = gScheduler->CurrentThread(); + thread->SetState(Thread::BLOCKED); + blocked_threads_.PushBack(thread); + mutex_.Unlock(); + gScheduler->Yield(); + mutex_.Lock(); + } + mutex_.Unlock(); + + MutexHolder lock(mutex_); auto next_msg = pending_messages_.PeekFront(); if (next_msg->num_bytes > *num_bytes) { return glcr::BUFFER_SIZE; @@ -58,6 +77,7 @@ z_err_t UnboundedMessageQueue::PopFront(uint64_t* num_bytes, void* bytes, void UnboundedMessageQueue::WriteKernel(uint64_t init, glcr::RefPtr cap) { + // FIXME: Add synchronization here in case it is ever used outside of init. auto msg = glcr::MakeShared(); msg->bytes = new uint8_t[8]; msg->num_bytes = sizeof(init); @@ -70,10 +90,12 @@ void UnboundedMessageQueue::WriteKernel(uint64_t init, pending_messages_.PushBack(msg); } + glcr::ErrorCode SingleMessageQueue::PushBack(uint64_t num_bytes, const void* bytes, uint64_t num_caps, const z_cap_t* caps) { + MutexHolder h(mutex_); if (has_written_) { return glcr::FAILED_PRECONDITION; } @@ -95,13 +117,31 @@ glcr::ErrorCode SingleMessageQueue::PushBack(uint64_t num_bytes, has_written_ = true; + if (blocked_threads_.size() > 0) { + auto thread = blocked_threads_.PopFront(); + thread->SetState(Thread::RUNNABLE); + gScheduler->Enqueue(thread); + } + return glcr::OK; } glcr::ErrorCode SingleMessageQueue::PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) { - if (!has_written_ || has_read_) { + mutex_.Lock(); + while (!has_written_) { + auto thread = gScheduler->CurrentThread(); + thread->SetState(Thread::BLOCKED); + blocked_threads_.PushBack(thread); + mutex_.Unlock(); + gScheduler->Yield(); + mutex_.Lock(); + } + mutex_.Unlock(); + + MutexHolder lock(mutex_); + if (has_read_) { return glcr::FAILED_PRECONDITION; } diff --git a/zion/lib/message_queue.h b/zion/lib/message_queue.h index 14b0acd..c58d0c6 100644 --- a/zion/lib/message_queue.h +++ b/zion/lib/message_queue.h @@ -1,5 +1,6 @@ #pragma once +#include #include #include #include @@ -7,6 +8,7 @@ #include "capability/capability.h" #include "include/ztypes.h" #include "lib/linked_list.h" +#include "lib/mutex.h" class MessageQueue { public: @@ -17,6 +19,12 @@ class MessageQueue { virtual glcr::ErrorCode PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) = 0; virtual bool empty() = 0; + + protected: + Mutex mutex_{"message"}; + // FIXME: This maybe shouldn't be shared between classes since the + // SingleMessageQueue should only ever have one blocked thread. + glcr::IntrusiveList blocked_threads_; }; class UnboundedMessageQueue : public MessageQueue { @@ -33,7 +41,10 @@ class UnboundedMessageQueue : public MessageQueue { void WriteKernel(uint64_t init, glcr::RefPtr cap); - bool empty() override { return pending_messages_.size() == 0; } + bool empty() override { + MutexHolder h(mutex_); + return pending_messages_.size() == 0; + } private: struct Message { @@ -58,7 +69,10 @@ class SingleMessageQueue : public MessageQueue { glcr::ErrorCode PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) override; - bool empty() override { return has_written_ == false; }; + bool empty() override { + MutexHolder h(mutex_); + return has_written_ == false; + }; private: bool has_written_ = false; diff --git a/zion/object/ipc_object.cpp b/zion/object/ipc_object.cpp index b1ef2e9..a10f0fb 100644 --- a/zion/object/ipc_object.cpp +++ b/zion/object/ipc_object.cpp @@ -5,31 +5,11 @@ glcr::ErrorCode IpcObject::Send(uint64_t num_bytes, const void* bytes, uint64_t num_caps, const z_cap_t* caps) { auto& message_queue = GetSendMessageQueue(); - MutexHolder lock(mutex_); - RET_ERR(message_queue.PushBack(num_bytes, bytes, num_caps, caps)); - - if (blocked_threads_.size() > 0) { - auto thread = blocked_threads_.PopFront(); - thread->SetState(Thread::RUNNABLE); - gScheduler->Enqueue(thread); - } - return glcr::OK; + return message_queue.PushBack(num_bytes, bytes, num_caps, caps); } glcr::ErrorCode IpcObject::Recv(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) { auto& message_queue = GetRecvMessageQueue(); - mutex_.Lock(); - while (message_queue.empty()) { - auto thread = gScheduler->CurrentThread(); - thread->SetState(Thread::BLOCKED); - blocked_threads_.PushBack(thread); - mutex_.Unlock(); - gScheduler->Yield(); - mutex_.Lock(); - } - mutex_.Unlock(); - - MutexHolder lock(mutex_); return message_queue.PopFront(num_bytes, bytes, num_caps, caps); } diff --git a/zion/object/ipc_object.h b/zion/object/ipc_object.h index 0f01086..69595a7 100644 --- a/zion/object/ipc_object.h +++ b/zion/object/ipc_object.h @@ -1,11 +1,9 @@ #pragma once -#include #include #include "include/ztypes.h" #include "lib/message_queue.h" -#include "lib/mutex.h" #include "object/kernel_object.h" class IpcObject : public KernelObject { @@ -18,16 +16,8 @@ class IpcObject : public KernelObject { virtual glcr::ErrorCode Recv(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) final; - bool HasMessages() { - MutexHolder h(mutex_); - return !GetRecvMessageQueue().empty(); - } + bool HasMessages() { return !GetRecvMessageQueue().empty(); } virtual MessageQueue& GetSendMessageQueue() = 0; virtual MessageQueue& GetRecvMessageQueue() = 0; - - protected: - // FIXME: move locking and blocked threads to the message queue itself. - Mutex mutex_{"ipc"}; - glcr::IntrusiveList blocked_threads_; }; diff --git a/zion/object/port.cpp b/zion/object/port.cpp index 647477d..8f9ff5a 100644 --- a/zion/object/port.cpp +++ b/zion/object/port.cpp @@ -3,6 +3,5 @@ #include "scheduler/scheduler.h" void Port::WriteKernel(uint64_t init, glcr::RefPtr cap) { - MutexHolder h(mutex_); message_queue_.WriteKernel(init, cap); }