diff --git a/zion/CMakeLists.txt b/zion/CMakeLists.txt index 4057051..60eb6f8 100644 --- a/zion/CMakeLists.txt +++ b/zion/CMakeLists.txt @@ -21,6 +21,7 @@ add_executable(zion object/address_space.cpp object/channel.cpp object/endpoint.cpp + object/ipc_object.cpp object/memory_object.cpp object/port.cpp object/process.cpp diff --git a/zion/interrupt/interrupt.cpp b/zion/interrupt/interrupt.cpp index ccaab0b..7988065 100644 --- a/zion/interrupt/interrupt.cpp +++ b/zion/interrupt/interrupt.cpp @@ -142,7 +142,7 @@ glcr::RefPtr pci1_port; extern "C" void isr_pci1(); extern "C" void interrupt_pci1(InterruptFrame*) { dbgln("Interrupt PCI line 1"); - pci1_port->Write(0, nullptr, 0, nullptr); + pci1_port->Send(0, nullptr, 0, nullptr); SignalEOI(); } diff --git a/zion/lib/message_queue.h b/zion/lib/message_queue.h index 43ef538..14b0acd 100644 --- a/zion/lib/message_queue.h +++ b/zion/lib/message_queue.h @@ -16,6 +16,7 @@ class MessageQueue { uint64_t num_caps, const z_cap_t* caps) = 0; virtual glcr::ErrorCode PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) = 0; + virtual bool empty() = 0; }; class UnboundedMessageQueue : public MessageQueue { @@ -32,8 +33,7 @@ class UnboundedMessageQueue : public MessageQueue { void WriteKernel(uint64_t init, glcr::RefPtr cap); - uint64_t size() { return pending_messages_.size(); } - bool empty() { return size() == 0; } + bool empty() override { return pending_messages_.size() == 0; } private: struct Message { @@ -58,7 +58,7 @@ class SingleMessageQueue : public MessageQueue { glcr::ErrorCode PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, z_cap_t* caps) override; - bool empty() { return has_written_ == false; }; + bool empty() override { return has_written_ == false; }; private: bool has_written_ = false; diff --git a/zion/object/channel.cpp b/zion/object/channel.cpp index 4dd4b87..cfd8634 100644 --- a/zion/object/channel.cpp +++ b/zion/object/channel.cpp @@ -5,44 +5,9 @@ glcr::Pair, glcr::RefPtr> Channel::CreateChannelPair() { - auto c1 = glcr::MakeRefCounted(); - auto c2 = glcr::MakeRefCounted(); + auto c1 = glcr::AdoptPtr(new Channel); + auto c2 = glcr::AdoptPtr(new Channel); c1->SetPeer(c2); c2->SetPeer(c1); return {c1, c2}; } - -z_err_t Channel::Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, - const z_cap_t* caps) { - return peer_->WriteInternal(num_bytes, bytes, num_caps, caps); -} - -z_err_t Channel::Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, - z_cap_t* caps) { - mutex_.Lock(); - while (message_queue_.empty()) { - auto thread = gScheduler->CurrentThread(); - thread->SetState(Thread::BLOCKED); - blocked_threads_.PushBack(thread); - mutex_.Unlock(); - gScheduler->Yield(); - mutex_.Lock(); - } - mutex_.Unlock(); - - MutexHolder lock(mutex_); - return message_queue_.PopFront(num_bytes, bytes, num_caps, caps); -} - -z_err_t Channel::WriteInternal(uint64_t num_bytes, const void* bytes, - uint64_t num_caps, const z_cap_t* caps) { - MutexHolder lock(mutex_); - RET_ERR(message_queue_.PushBack(num_bytes, bytes, num_caps, caps)); - - if (blocked_threads_.size() > 0) { - auto thread = blocked_threads_.PopFront(); - thread->SetState(Thread::RUNNABLE); - gScheduler->Enqueue(thread); - } - return glcr::OK; -} diff --git a/zion/object/channel.h b/zion/object/channel.h index 8124221..6203487 100644 --- a/zion/object/channel.h +++ b/zion/object/channel.h @@ -1,6 +1,5 @@ #pragma once -#include #include #include @@ -8,6 +7,7 @@ #include "include/ztypes.h" #include "lib/message_queue.h" #include "lib/mutex.h" +#include "object/ipc_object.h" #include "object/kernel_object.h" #include "usr/zcall_internal.h" @@ -18,7 +18,7 @@ struct KernelObjectTag { static const uint64_t type = KernelObject::CHANNEL; }; -class Channel : public KernelObject { +class Channel : public IpcObject { public: uint64_t TypeTag() override { return KernelObject::CHANNEL; } static glcr::Pair, glcr::RefPtr> @@ -26,25 +26,20 @@ class Channel : public KernelObject { glcr::RefPtr peer() { return peer_; } - z_err_t Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, - const z_cap_t* caps); - z_err_t Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, - z_cap_t* caps); + virtual MessageQueue& GetSendMessageQueue() override { + return peer_->message_queue_; + } + virtual MessageQueue& GetRecvMessageQueue() override { + return message_queue_; + } private: // FIXME: We will likely never close the channel based on this // circular dependency. glcr::RefPtr peer_{nullptr}; - Mutex mutex_{"channel"}; UnboundedMessageQueue message_queue_; - glcr::IntrusiveList blocked_threads_; - - friend class glcr::MakeRefCountedFriend; Channel() {} void SetPeer(const glcr::RefPtr& peer) { peer_ = peer; } - - z_err_t WriteInternal(uint64_t num_bytes, const void* bytes, - uint64_t num_caps, const z_cap_t* caps); }; diff --git a/zion/object/endpoint.cpp b/zion/object/endpoint.cpp index 3bcb340..7689876 100644 --- a/zion/object/endpoint.cpp +++ b/zion/object/endpoint.cpp @@ -5,39 +5,3 @@ glcr::RefPtr Endpoint::Create() { return glcr::AdoptPtr(new Endpoint); } - -glcr::ErrorCode Endpoint::Write(uint64_t num_bytes, const void* data, - z_cap_t reply_port_cap) { - MutexHolder h(mutex_); - RET_ERR(message_queue_.PushBack(num_bytes, data, 1, &reply_port_cap)); - - if (blocked_threads_.size() > 0) { - auto thread = blocked_threads_.PopFront(); - thread->SetState(Thread::RUNNABLE); - gScheduler->Enqueue(thread); - } - - return glcr::OK; -} -glcr::ErrorCode Endpoint::Read(uint64_t* num_bytes, void* data, - z_cap_t* reply_port_cap) { - mutex_.Lock(); - while (message_queue_.empty()) { - auto thread = gScheduler->CurrentThread(); - thread->SetState(Thread::BLOCKED); - mutex_.Unlock(); - gScheduler->Yield(); - mutex_.Lock(); - } - mutex_.Unlock(); - - MutexHolder h(mutex_); - - uint64_t num_caps = 1; - RET_ERR(message_queue_.PopFront(num_bytes, data, &num_caps, reply_port_cap)); - - if (num_caps != 1) { - return glcr::INTERNAL; - } - return glcr::OK; -} diff --git a/zion/object/endpoint.h b/zion/object/endpoint.h index 5b9b2f1..5aa6ca5 100644 --- a/zion/object/endpoint.h +++ b/zion/object/endpoint.h @@ -6,6 +6,7 @@ #include "lib/message_queue.h" #include "lib/mutex.h" +#include "object/ipc_object.h" #include "object/kernel_object.h" class Endpoint; @@ -16,22 +17,23 @@ struct KernelObjectTag { static const uint64_t type = KernelObject::ENDPOINT; }; -class Endpoint : public KernelObject { +class Endpoint : public IpcObject { public: uint64_t TypeTag() override { return KernelObject::ENDPOINT; } static glcr::RefPtr Create(); - glcr::ErrorCode Write(uint64_t num_bytes, const void* data, - z_cap_t reply_port_cap); - glcr::ErrorCode Read(uint64_t* num_bytes, void* data, z_cap_t* reply_port_cap); - private: - Mutex mutex_{"endpoint"}; - UnboundedMessageQueue message_queue_; + virtual MessageQueue& GetSendMessageQueue() override { + return message_queue_; + } + virtual MessageQueue& GetRecvMessageQueue() override { + return message_queue_; + } - glcr::IntrusiveList blocked_threads_; + private: + UnboundedMessageQueue message_queue_; Endpoint() {} }; diff --git a/zion/object/ipc_object.cpp b/zion/object/ipc_object.cpp new file mode 100644 index 0000000..b1ef2e9 --- /dev/null +++ b/zion/object/ipc_object.cpp @@ -0,0 +1,35 @@ +#include "object/ipc_object.h" + +#include "scheduler/scheduler.h" + +glcr::ErrorCode IpcObject::Send(uint64_t num_bytes, const void* bytes, + uint64_t num_caps, const z_cap_t* caps) { + auto& message_queue = GetSendMessageQueue(); + MutexHolder lock(mutex_); + RET_ERR(message_queue.PushBack(num_bytes, bytes, num_caps, caps)); + + if (blocked_threads_.size() > 0) { + auto thread = blocked_threads_.PopFront(); + thread->SetState(Thread::RUNNABLE); + gScheduler->Enqueue(thread); + } + return glcr::OK; +} + +glcr::ErrorCode IpcObject::Recv(uint64_t* num_bytes, void* bytes, + uint64_t* num_caps, z_cap_t* caps) { + auto& message_queue = GetRecvMessageQueue(); + mutex_.Lock(); + while (message_queue.empty()) { + auto thread = gScheduler->CurrentThread(); + thread->SetState(Thread::BLOCKED); + blocked_threads_.PushBack(thread); + mutex_.Unlock(); + gScheduler->Yield(); + mutex_.Lock(); + } + mutex_.Unlock(); + + MutexHolder lock(mutex_); + return message_queue.PopFront(num_bytes, bytes, num_caps, caps); +} diff --git a/zion/object/ipc_object.h b/zion/object/ipc_object.h new file mode 100644 index 0000000..0f01086 --- /dev/null +++ b/zion/object/ipc_object.h @@ -0,0 +1,33 @@ +#pragma once + +#include +#include + +#include "include/ztypes.h" +#include "lib/message_queue.h" +#include "lib/mutex.h" +#include "object/kernel_object.h" + +class IpcObject : public KernelObject { + public: + IpcObject(){}; + virtual ~IpcObject() {} + + virtual glcr::ErrorCode Send(uint64_t num_bytes, const void* bytes, + uint64_t num_caps, const z_cap_t* caps) final; + virtual glcr::ErrorCode Recv(uint64_t* num_bytes, void* bytes, + uint64_t* num_caps, z_cap_t* caps) final; + + bool HasMessages() { + MutexHolder h(mutex_); + return !GetRecvMessageQueue().empty(); + } + + virtual MessageQueue& GetSendMessageQueue() = 0; + virtual MessageQueue& GetRecvMessageQueue() = 0; + + protected: + // FIXME: move locking and blocked threads to the message queue itself. + Mutex mutex_{"ipc"}; + glcr::IntrusiveList blocked_threads_; +}; diff --git a/zion/object/port.cpp b/zion/object/port.cpp index 6374195..647477d 100644 --- a/zion/object/port.cpp +++ b/zion/object/port.cpp @@ -2,43 +2,7 @@ #include "scheduler/scheduler.h" -Port::Port() {} - -z_err_t Port::Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, - const z_cap_t* caps) { - MutexHolder h(mutex_); - RET_ERR(message_queue_.PushBack(num_bytes, bytes, num_caps, caps)); - if (blocked_threads_.size() > 0) { - auto thread = blocked_threads_.PopFront(); - thread->SetState(Thread::RUNNABLE); - gScheduler->Enqueue(thread); - } - return glcr::OK; -} - -z_err_t Port::Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, - z_cap_t* caps) { - mutex_.Lock(); - while (message_queue_.empty()) { - auto thread = gScheduler->CurrentThread(); - thread->SetState(Thread::BLOCKED); - blocked_threads_.PushBack(thread); - mutex_.Unlock(); - gScheduler->Yield(); - mutex_.Lock(); - } - mutex_.Unlock(); - - MutexHolder lock(mutex_); - return message_queue_.PopFront(num_bytes, bytes, num_caps, caps); -} - void Port::WriteKernel(uint64_t init, glcr::RefPtr cap) { MutexHolder h(mutex_); message_queue_.WriteKernel(init, cap); } - -bool Port::HasMessages() { - MutexHolder h(mutex_); - return !message_queue_.empty(); -} diff --git a/zion/object/port.h b/zion/object/port.h index 8c303f1..d757d4d 100644 --- a/zion/object/port.h +++ b/zion/object/port.h @@ -6,6 +6,7 @@ #include "capability/capability.h" #include "lib/message_queue.h" #include "lib/mutex.h" +#include "object/ipc_object.h" #include "object/kernel_object.h" #include "object/thread.h" #include "usr/zcall_internal.h" @@ -17,24 +18,21 @@ struct KernelObjectTag { static const uint64_t type = KernelObject::PORT; }; -class Port : public KernelObject { +class Port : public IpcObject { public: uint64_t TypeTag() override { return KernelObject::PORT; } - Port(); - - z_err_t Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, - const z_cap_t* caps); - z_err_t Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, - z_cap_t* caps); + Port() = default; void WriteKernel(uint64_t init, glcr::RefPtr cap); - bool HasMessages(); + virtual MessageQueue& GetSendMessageQueue() override { + return message_queue_; + } + virtual MessageQueue& GetRecvMessageQueue() override { + return message_queue_; + } private: UnboundedMessageQueue message_queue_; - glcr::IntrusiveList blocked_threads_; - - Mutex mutex_{"Port"}; }; diff --git a/zion/object/reply_port.cpp b/zion/object/reply_port.cpp index a2b4e49..19f07f0 100644 --- a/zion/object/reply_port.cpp +++ b/zion/object/reply_port.cpp @@ -5,37 +5,3 @@ glcr::RefPtr ReplyPort::Create() { return glcr::AdoptPtr(new ReplyPort); } - -uint64_t ReplyPort::Write(uint64_t num_bytes, const void* data, - uint64_t num_caps, uint64_t* caps) { - MutexHolder h(mutex_); - RET_ERR(message_holder_.PushBack(num_bytes, data, num_caps, caps)); - - if (blocked_thread_) { - // FIXME: We need to handle the case where the blocked thread has died I - // think. - blocked_thread_->SetState(Thread::RUNNABLE); - gScheduler->Enqueue(blocked_thread_); - blocked_thread_ = nullptr; - } - return glcr::OK; -} - -uint64_t ReplyPort::Read(uint64_t* num_bytes, void* data, uint64_t* num_caps, - uint64_t* caps) { - mutex_.Lock(); - if (message_holder_.empty()) { - // Multiple threads can't block on a reply port. - if (blocked_thread_) { - mutex_.Unlock(); - return glcr::FAILED_PRECONDITION; - } - blocked_thread_ = gScheduler->CurrentThread(); - blocked_thread_->SetState(Thread::BLOCKED); - mutex_.Unlock(); - gScheduler->Yield(); - mutex_.Lock(); - } - - return message_holder_.PopFront(num_bytes, data, num_caps, caps); -} diff --git a/zion/object/reply_port.h b/zion/object/reply_port.h index c160909..8dff55a 100644 --- a/zion/object/reply_port.h +++ b/zion/object/reply_port.h @@ -4,6 +4,7 @@ #include "lib/message_queue.h" #include "lib/mutex.h" +#include "object/ipc_object.h" #include "object/kernel_object.h" class ReplyPort; @@ -13,21 +14,20 @@ struct KernelObjectTag { static const uint64_t type = KernelObject::REPLY_PORT; }; -class ReplyPort : public KernelObject { +class ReplyPort : public IpcObject { public: uint64_t TypeTag() override { return KernelObject::REPLY_PORT; } static glcr::RefPtr Create(); - uint64_t Write(uint64_t num_bytes, const void* data, uint64_t num_caps, - uint64_t* caps); - uint64_t Read(uint64_t* num_bytes, void* data, uint64_t* num_caps, - uint64_t* caps); + virtual MessageQueue& GetSendMessageQueue() override { + return message_holder_; + } + virtual MessageQueue& GetRecvMessageQueue() override { + return message_holder_; + } private: - Mutex mutex_{"reply_port"}; SingleMessageQueue message_holder_; - glcr::RefPtr blocked_thread_; - ReplyPort() {} }; diff --git a/zion/syscall/ipc.cpp b/zion/syscall/ipc.cpp index a29c9ce..226ee0a 100644 --- a/zion/syscall/ipc.cpp +++ b/zion/syscall/ipc.cpp @@ -21,7 +21,7 @@ z_err_t ChannelSend(ZChannelSendReq* req) { RET_ERR(ValidateCapability(chan_cap, ZC_WRITE)); auto chan = chan_cap->obj(); - return chan->Write(req->num_bytes, req->data, req->num_caps, req->caps); + return chan->Send(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t ChannelRecv(ZChannelRecvReq* req) { @@ -30,7 +30,7 @@ z_err_t ChannelRecv(ZChannelRecvReq* req) { RET_ERR(ValidateCapability(chan_cap, ZC_READ)); auto chan = chan_cap->obj(); - return chan->Read(req->num_bytes, req->data, req->num_caps, req->caps); + return chan->Recv(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t PortCreate(ZPortCreateReq* req) { @@ -46,7 +46,7 @@ z_err_t PortSend(ZPortSendReq* req) { RET_ERR(ValidateCapability(port_cap, ZC_WRITE)); auto port = port_cap->obj(); - return port->Write(req->num_bytes, req->data, req->num_caps, req->caps); + return port->Send(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t PortRecv(ZPortRecvReq* req) { @@ -61,7 +61,7 @@ z_err_t PortRecv(ZPortRecvReq* req) { .num_caps = *req->num_caps, .caps = req->caps, }; - return port->Read(req->num_bytes, req->data, req->num_caps, req->caps); + return port->Recv(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t PortPoll(ZPortPollReq* req) { @@ -75,7 +75,7 @@ z_err_t PortPoll(ZPortPollReq* req) { if (!port->HasMessages()) { return glcr::EMPTY; } - return port->Read(req->num_bytes, req->data, req->num_caps, req->caps); + return port->Recv(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t IrqRegister(ZIrqRegisterReq* req) { @@ -107,7 +107,7 @@ glcr::ErrorCode EndpointSend(ZEndpointSendReq* req) { auto reply_port = ReplyPort::Create(); *req->reply_port_cap = proc.AddNewCapability(reply_port, ZC_READ); uint64_t reply_port_cap_to_send = proc.AddNewCapability(reply_port, ZC_WRITE); - return endpoint->Write(req->num_bytes, req->data, reply_port_cap_to_send); + return endpoint->Send(req->num_bytes, req->data, 1, &reply_port_cap_to_send); } glcr::ErrorCode EndpointRecv(ZEndpointRecvReq* req) { @@ -117,7 +117,13 @@ glcr::ErrorCode EndpointRecv(ZEndpointRecvReq* req) { ValidateCapability(endpoint_cap, ZC_READ); auto endpoint = endpoint_cap->obj(); - return endpoint->Read(req->num_bytes, req->data, req->reply_port_cap); + uint64_t num_caps = 1; + RET_ERR(endpoint->Recv(req->num_bytes, req->data, &num_caps, + req->reply_port_cap)); + if (num_caps != 1) { + return glcr::INTERNAL; + } + return glcr::OK; } glcr::ErrorCode ReplyPortSend(ZReplyPortSendReq* req) { @@ -126,7 +132,7 @@ glcr::ErrorCode ReplyPortSend(ZReplyPortSendReq* req) { ValidateCapability(reply_port_cap, ZC_WRITE); auto reply_port = reply_port_cap->obj(); - return reply_port->Write(req->num_bytes, req->data, req->num_caps, req->caps); + return reply_port->Send(req->num_bytes, req->data, req->num_caps, req->caps); } glcr::ErrorCode ReplyPortRecv(ZReplyPortRecvReq* req) { auto& proc = gScheduler->CurrentProcess(); @@ -135,5 +141,5 @@ glcr::ErrorCode ReplyPortRecv(ZReplyPortRecvReq* req) { ValidateCapability(reply_port_cap, ZC_READ); auto reply_port = reply_port_cap->obj(); - return reply_port->Read(req->num_bytes, req->data, req->num_caps, req->caps); + return reply_port->Recv(req->num_bytes, req->data, req->num_caps, req->caps); }