[zion] Move synchronization to the message queue
This commit is contained in:
		
							parent
							
								
									9dd457391c
								
							
						
					
					
						commit
						dc63084d61
					
				|  | @ -11,6 +11,7 @@ class LinkedList { | |||
| 
 | ||||
|   LinkedList(const LinkedList&) = delete; | ||||
| 
 | ||||
|   bool empty() const { return size_ == 0; } | ||||
|   uint64_t size() const { return size_; } | ||||
| 
 | ||||
|   void PushBack(const T& item) { | ||||
|  |  | |||
|  | @ -26,12 +26,31 @@ z_err_t UnboundedMessageQueue::PushBack(uint64_t num_bytes, const void* bytes, | |||
|     message->caps.PushBack(cap); | ||||
|   } | ||||
| 
 | ||||
|   MutexHolder h(mutex_); | ||||
|   pending_messages_.PushBack(message); | ||||
| 
 | ||||
|   if (blocked_threads_.size() > 0) { | ||||
|     auto thread = blocked_threads_.PopFront(); | ||||
|     thread->SetState(Thread::RUNNABLE); | ||||
|     gScheduler->Enqueue(thread); | ||||
|   } | ||||
|   return glcr::OK; | ||||
| } | ||||
| 
 | ||||
| z_err_t UnboundedMessageQueue::PopFront(uint64_t* num_bytes, void* bytes, | ||||
|                                         uint64_t* num_caps, z_cap_t* caps) { | ||||
|   mutex_.Lock(); | ||||
|   while (pending_messages_.empty()) { | ||||
|     auto thread = gScheduler->CurrentThread(); | ||||
|     thread->SetState(Thread::BLOCKED); | ||||
|     blocked_threads_.PushBack(thread); | ||||
|     mutex_.Unlock(); | ||||
|     gScheduler->Yield(); | ||||
|     mutex_.Lock(); | ||||
|   } | ||||
|   mutex_.Unlock(); | ||||
| 
 | ||||
|   MutexHolder lock(mutex_); | ||||
|   auto next_msg = pending_messages_.PeekFront(); | ||||
|   if (next_msg->num_bytes > *num_bytes) { | ||||
|     return glcr::BUFFER_SIZE; | ||||
|  | @ -58,6 +77,7 @@ z_err_t UnboundedMessageQueue::PopFront(uint64_t* num_bytes, void* bytes, | |||
| 
 | ||||
| void UnboundedMessageQueue::WriteKernel(uint64_t init, | ||||
|                                         glcr::RefPtr<Capability> cap) { | ||||
|   // FIXME: Add synchronization here in case it is ever used outside of init.
 | ||||
|   auto msg = glcr::MakeShared<Message>(); | ||||
|   msg->bytes = new uint8_t[8]; | ||||
|   msg->num_bytes = sizeof(init); | ||||
|  | @ -70,10 +90,12 @@ void UnboundedMessageQueue::WriteKernel(uint64_t init, | |||
| 
 | ||||
|   pending_messages_.PushBack(msg); | ||||
| } | ||||
| 
 | ||||
| glcr::ErrorCode SingleMessageQueue::PushBack(uint64_t num_bytes, | ||||
|                                              const void* bytes, | ||||
|                                              uint64_t num_caps, | ||||
|                                              const z_cap_t* caps) { | ||||
|   MutexHolder h(mutex_); | ||||
|   if (has_written_) { | ||||
|     return glcr::FAILED_PRECONDITION; | ||||
|   } | ||||
|  | @ -95,13 +117,31 @@ glcr::ErrorCode SingleMessageQueue::PushBack(uint64_t num_bytes, | |||
| 
 | ||||
|   has_written_ = true; | ||||
| 
 | ||||
|   if (blocked_threads_.size() > 0) { | ||||
|     auto thread = blocked_threads_.PopFront(); | ||||
|     thread->SetState(Thread::RUNNABLE); | ||||
|     gScheduler->Enqueue(thread); | ||||
|   } | ||||
| 
 | ||||
|   return glcr::OK; | ||||
| } | ||||
| 
 | ||||
| glcr::ErrorCode SingleMessageQueue::PopFront(uint64_t* num_bytes, void* bytes, | ||||
|                                              uint64_t* num_caps, | ||||
|                                              z_cap_t* caps) { | ||||
|   if (!has_written_ || has_read_) { | ||||
|   mutex_.Lock(); | ||||
|   while (!has_written_) { | ||||
|     auto thread = gScheduler->CurrentThread(); | ||||
|     thread->SetState(Thread::BLOCKED); | ||||
|     blocked_threads_.PushBack(thread); | ||||
|     mutex_.Unlock(); | ||||
|     gScheduler->Yield(); | ||||
|     mutex_.Lock(); | ||||
|   } | ||||
|   mutex_.Unlock(); | ||||
| 
 | ||||
|   MutexHolder lock(mutex_); | ||||
|   if (has_read_) { | ||||
|     return glcr::FAILED_PRECONDITION; | ||||
|   } | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,5 +1,6 @@ | |||
| #pragma once | ||||
| 
 | ||||
| #include <glacier/container/intrusive_list.h> | ||||
| #include <glacier/memory/ref_ptr.h> | ||||
| #include <glacier/memory/shared_ptr.h> | ||||
| #include <glacier/status/error.h> | ||||
|  | @ -7,6 +8,7 @@ | |||
| #include "capability/capability.h" | ||||
| #include "include/ztypes.h" | ||||
| #include "lib/linked_list.h" | ||||
| #include "lib/mutex.h" | ||||
| 
 | ||||
| class MessageQueue { | ||||
|  public: | ||||
|  | @ -17,6 +19,12 @@ class MessageQueue { | |||
|   virtual glcr::ErrorCode PopFront(uint64_t* num_bytes, void* bytes, | ||||
|                                    uint64_t* num_caps, z_cap_t* caps) = 0; | ||||
|   virtual bool empty() = 0; | ||||
| 
 | ||||
|  protected: | ||||
|   Mutex mutex_{"message"}; | ||||
|   // FIXME: This maybe shouldn't be shared between classes since the
 | ||||
|   // SingleMessageQueue should only ever have one blocked thread.
 | ||||
|   glcr::IntrusiveList<Thread> blocked_threads_; | ||||
| }; | ||||
| 
 | ||||
| class UnboundedMessageQueue : public MessageQueue { | ||||
|  | @ -33,7 +41,10 @@ class UnboundedMessageQueue : public MessageQueue { | |||
| 
 | ||||
|   void WriteKernel(uint64_t init, glcr::RefPtr<Capability> cap); | ||||
| 
 | ||||
|   bool empty() override { return pending_messages_.size() == 0; } | ||||
|   bool empty() override { | ||||
|     MutexHolder h(mutex_); | ||||
|     return pending_messages_.size() == 0; | ||||
|   } | ||||
| 
 | ||||
|  private: | ||||
|   struct Message { | ||||
|  | @ -58,7 +69,10 @@ class SingleMessageQueue : public MessageQueue { | |||
|   glcr::ErrorCode PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, | ||||
|                            z_cap_t* caps) override; | ||||
| 
 | ||||
|   bool empty() override { return has_written_ == false; }; | ||||
|   bool empty() override { | ||||
|     MutexHolder h(mutex_); | ||||
|     return has_written_ == false; | ||||
|   }; | ||||
| 
 | ||||
|  private: | ||||
|   bool has_written_ = false; | ||||
|  |  | |||
|  | @ -5,31 +5,11 @@ | |||
| glcr::ErrorCode IpcObject::Send(uint64_t num_bytes, const void* bytes, | ||||
|                                 uint64_t num_caps, const z_cap_t* caps) { | ||||
|   auto& message_queue = GetSendMessageQueue(); | ||||
|   MutexHolder lock(mutex_); | ||||
|   RET_ERR(message_queue.PushBack(num_bytes, bytes, num_caps, caps)); | ||||
| 
 | ||||
|   if (blocked_threads_.size() > 0) { | ||||
|     auto thread = blocked_threads_.PopFront(); | ||||
|     thread->SetState(Thread::RUNNABLE); | ||||
|     gScheduler->Enqueue(thread); | ||||
|   } | ||||
|   return glcr::OK; | ||||
|   return message_queue.PushBack(num_bytes, bytes, num_caps, caps); | ||||
| } | ||||
| 
 | ||||
| glcr::ErrorCode IpcObject::Recv(uint64_t* num_bytes, void* bytes, | ||||
|                                 uint64_t* num_caps, z_cap_t* caps) { | ||||
|   auto& message_queue = GetRecvMessageQueue(); | ||||
|   mutex_.Lock(); | ||||
|   while (message_queue.empty()) { | ||||
|     auto thread = gScheduler->CurrentThread(); | ||||
|     thread->SetState(Thread::BLOCKED); | ||||
|     blocked_threads_.PushBack(thread); | ||||
|     mutex_.Unlock(); | ||||
|     gScheduler->Yield(); | ||||
|     mutex_.Lock(); | ||||
|   } | ||||
|   mutex_.Unlock(); | ||||
| 
 | ||||
|   MutexHolder lock(mutex_); | ||||
|   return message_queue.PopFront(num_bytes, bytes, num_caps, caps); | ||||
| } | ||||
|  |  | |||
|  | @ -1,11 +1,9 @@ | |||
| #pragma once | ||||
| 
 | ||||
| #include <glacier/container/intrusive_list.h> | ||||
| #include <glacier/status/error.h> | ||||
| 
 | ||||
| #include "include/ztypes.h" | ||||
| #include "lib/message_queue.h" | ||||
| #include "lib/mutex.h" | ||||
| #include "object/kernel_object.h" | ||||
| 
 | ||||
| class IpcObject : public KernelObject { | ||||
|  | @ -18,16 +16,8 @@ class IpcObject : public KernelObject { | |||
|   virtual glcr::ErrorCode Recv(uint64_t* num_bytes, void* bytes, | ||||
|                                uint64_t* num_caps, z_cap_t* caps) final; | ||||
| 
 | ||||
|   bool HasMessages() { | ||||
|     MutexHolder h(mutex_); | ||||
|     return !GetRecvMessageQueue().empty(); | ||||
|   } | ||||
|   bool HasMessages() { return !GetRecvMessageQueue().empty(); } | ||||
| 
 | ||||
|   virtual MessageQueue& GetSendMessageQueue() = 0; | ||||
|   virtual MessageQueue& GetRecvMessageQueue() = 0; | ||||
| 
 | ||||
|  protected: | ||||
|   // FIXME: move locking and blocked threads to the message queue itself.
 | ||||
|   Mutex mutex_{"ipc"}; | ||||
|   glcr::IntrusiveList<Thread> blocked_threads_; | ||||
| }; | ||||
|  |  | |||
|  | @ -3,6 +3,5 @@ | |||
| #include "scheduler/scheduler.h" | ||||
| 
 | ||||
| void Port::WriteKernel(uint64_t init, glcr::RefPtr<Capability> cap) { | ||||
|   MutexHolder h(mutex_); | ||||
|   message_queue_.WriteKernel(init, cap); | ||||
| } | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue