From d60b2bdc61898f6a72d17cc0fcde356c50641066 Mon Sep 17 00:00:00 2001 From: Drew Galbraith Date: Tue, 20 Jun 2023 15:36:17 -0700 Subject: [PATCH] [zion] Move channel to use the message queue. --- zion/object/channel.cpp | 67 ++++++++-------------------------------- zion/object/channel.h | 21 +++++-------- zion/object/port.cpp | 4 ++- zion/object/thread.h | 1 + zion/syscall/channel.cpp | 23 ++------------ 5 files changed, 27 insertions(+), 89 deletions(-) diff --git a/zion/object/channel.cpp b/zion/object/channel.cpp index 8736d42..5ec1670 100644 --- a/zion/object/channel.cpp +++ b/zion/object/channel.cpp @@ -11,14 +11,18 @@ Pair, RefPtr> Channel::CreateChannelPair() { return {c1, c2}; } -z_err_t Channel::Write(const ZMessage& msg) { - return peer_->EnqueueMessage(msg); +z_err_t Channel::Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, + const z_cap_t* caps) { + return peer_->WriteInternal(num_bytes, bytes, num_caps, caps); } -z_err_t Channel::Read(ZMessage& msg) { +z_err_t Channel::Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, + z_cap_t* caps) { mutex_.Lock(); - while (pending_messages_.size() == 0) { - blocked_threads_.PushBack(gScheduler->CurrentThread()); + while (message_queue_.empty()) { + auto thread = gScheduler->CurrentThread(); + thread->SetState(Thread::BLOCKED); + blocked_threads_.PushBack(thread); mutex_.Unlock(); gScheduler->Yield(); mutex_.Lock(); @@ -26,58 +30,13 @@ z_err_t Channel::Read(ZMessage& msg) { mutex_.Unlock(); MutexHolder lock(mutex_); - auto next_msg = pending_messages_.PeekFront(); - if (next_msg->num_bytes > msg.num_bytes) { - return Z_ERR_BUFF_SIZE; - } - if (next_msg->caps.size() > msg.num_caps) { - return Z_ERR_BUFF_SIZE; - } - - msg.num_bytes = next_msg->num_bytes; - - for (uint64_t i = 0; i < msg.num_bytes; i++) { - static_cast(msg.data)[i] = next_msg->bytes[i]; - } - - msg.num_caps = next_msg->caps.size(); - auto& proc = gScheduler->CurrentProcess(); - for (uint64_t i = 0; i < msg.num_caps; i++) { - msg.caps[i] = proc.AddExistingCapability(next_msg->caps.PopFront()); - } - - pending_messages_.PopFront(); - - return Z_OK; + return message_queue_.PopFront(num_bytes, bytes, num_caps, caps); } -z_err_t Channel::EnqueueMessage(const ZMessage& msg) { - if (msg.num_bytes > 0x1000) { - dbgln("Large message size unimplemented: %x", msg.num_bytes); - return Z_ERR_INVALID; - } - - auto message = MakeShared(); - - // Copy Message body. - message->num_bytes = msg.num_bytes; - message->bytes = new uint8_t[msg.num_bytes]; - for (uint64_t i = 0; i < msg.num_bytes; i++) { - message->bytes[i] = static_cast(msg.data)[i]; - } - - // Release and store capabilities. - for (uint64_t i = 0; i < msg.num_caps; i++) { - auto cap = gScheduler->CurrentProcess().ReleaseCapability(msg.caps[i]); - if (!cap) { - return Z_ERR_CAP_NOT_FOUND; - } - message->caps.PushBack(cap); - } - - // Enqueue. +z_err_t Channel::WriteInternal(uint64_t num_bytes, const void* bytes, + uint64_t num_caps, const z_cap_t* caps) { MutexHolder lock(mutex_); - pending_messages_.PushBack(message); + RET_ERR(message_queue_.PushBack(num_bytes, bytes, num_caps, caps)); if (blocked_threads_.size() > 0) { auto thread = blocked_threads_.PopFront(); diff --git a/zion/object/channel.h b/zion/object/channel.h index 6349cf2..bc23c47 100644 --- a/zion/object/channel.h +++ b/zion/object/channel.h @@ -3,6 +3,7 @@ #include "capability/capability.h" #include "include/ztypes.h" #include "lib/linked_list.h" +#include "lib/message_queue.h" #include "lib/mutex.h" #include "lib/pair.h" #include "lib/ref_ptr.h" @@ -24,8 +25,10 @@ class Channel : public KernelObject { RefPtr peer() { return peer_; } - z_err_t Write(const ZMessage& msg); - z_err_t Read(ZMessage& msg); + z_err_t Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, + const z_cap_t* caps); + z_err_t Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, + z_cap_t* caps); private: // FIXME: We will likely never close the channel based on this @@ -33,22 +36,14 @@ class Channel : public KernelObject { RefPtr peer_{nullptr}; Mutex mutex_{"channel"}; + UnboundedMessageQueue message_queue_; - struct Message { - uint64_t num_bytes; - uint8_t* bytes; - - LinkedList> caps; - }; - - // FIXME: This is probably dangerous because of an - // implicit shallow copy. - LinkedList> pending_messages_; LinkedList> blocked_threads_; friend class MakeRefCountedFriend; Channel() {} void SetPeer(const RefPtr& peer) { peer_ = peer; } - z_err_t EnqueueMessage(const ZMessage& msg); + z_err_t WriteInternal(uint64_t num_bytes, const void* bytes, + uint64_t num_caps, const z_cap_t* caps); }; diff --git a/zion/object/port.cpp b/zion/object/port.cpp index 00221ae..22e8b8f 100644 --- a/zion/object/port.cpp +++ b/zion/object/port.cpp @@ -20,7 +20,9 @@ z_err_t Port::Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) { mutex_.Lock(); while (message_queue_.empty()) { - blocked_threads_.PushBack(gScheduler->CurrentThread()); + auto thread = gScheduler->CurrentThread(); + thread->SetState(Thread::BLOCKED); + blocked_threads_.PushBack(thread); mutex_.Unlock(); gScheduler->Yield(); mutex_.Lock(); diff --git a/zion/object/thread.h b/zion/object/thread.h index 2dd3ba9..0353636 100644 --- a/zion/object/thread.h +++ b/zion/object/thread.h @@ -22,6 +22,7 @@ class Thread : public KernelObject { CREATED, RUNNING, RUNNABLE, + BLOCKED, FINISHED, }; static RefPtr RootThread(Process& root_proc); diff --git a/zion/syscall/channel.cpp b/zion/syscall/channel.cpp index b7275e2..da54dac 100644 --- a/zion/syscall/channel.cpp +++ b/zion/syscall/channel.cpp @@ -19,15 +19,7 @@ z_err_t ChannelSend(ZChannelSendReq* req) { auto chan = chan_cap->obj(); RET_IF_NULL(chan); - // FIXME: Get rid of this hack. - ZMessage message{ - .num_bytes = req->num_bytes, - .data = const_cast(req->data), - .num_caps = req->num_caps, - .caps = req->caps, - - }; - return chan->Write(message); + return chan->Write(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t ChannelRecv(ZChannelRecvReq* req) { @@ -37,16 +29,5 @@ z_err_t ChannelRecv(ZChannelRecvReq* req) { auto chan = chan_cap->obj(); RET_IF_NULL(chan); - // FIXME: Get rid of this hack. - ZMessage message{ - .num_bytes = *req->num_bytes, - .data = const_cast(req->data), - .num_caps = *req->num_caps, - .caps = req->caps, - - }; - RET_ERR(chan->Read(message)); - *req->num_bytes = message.num_bytes; - *req->num_caps = message.num_caps; - return Z_OK; + return chan->Read(req->num_bytes, req->data, req->num_caps, req->caps); }