diff --git a/zion/CMakeLists.txt b/zion/CMakeLists.txt index abf387a..63b9e1d 100644 --- a/zion/CMakeLists.txt +++ b/zion/CMakeLists.txt @@ -11,6 +11,7 @@ add_executable(zion interrupt/interrupt_enter.s interrupt/timer.cpp lib/mutex.cpp + lib/message_queue.cpp loader/init_loader.cpp memory/kernel_heap.cpp memory/kernel_stack_manager.cpp diff --git a/zion/interrupt/interrupt.cpp b/zion/interrupt/interrupt.cpp index d6ff5c2..d8243a7 100644 --- a/zion/interrupt/interrupt.cpp +++ b/zion/interrupt/interrupt.cpp @@ -142,7 +142,7 @@ RefPtr pci1_port; extern "C" void isr_pci1(); extern "C" void interrupt_pci1(InterruptFrame*) { dbgln("Interrupt PCI line 1"); - pci1_port->Write({}); + pci1_port->Write(0, nullptr, 0, nullptr); SignalEOI(); } diff --git a/zion/lib/message_queue.cpp b/zion/lib/message_queue.cpp new file mode 100644 index 0000000..62e0755 --- /dev/null +++ b/zion/lib/message_queue.cpp @@ -0,0 +1,71 @@ +#include "lib/message_queue.h" + +#include "scheduler/scheduler.h" + +z_err_t UnboundedMessageQueue::PushBack(uint64_t num_bytes, const void* bytes, + uint64_t num_caps, + const z_cap_t* caps) { + if (num_bytes > 0x1000) { + dbgln("Large message size unimplemented: %x", num_bytes); + return Z_ERR_UNIMPLEMENTED; + } + + auto message = MakeShared(); + message->num_bytes = num_bytes; + message->bytes = new uint8_t[num_bytes]; + for (uint64_t i = 0; i < num_bytes; i++) { + message->bytes[i] = static_cast(bytes)[i]; + } + + for (uint64_t i = 0; i < num_caps; i++) { + // FIXME: This would feel safer closer to the relevant syscall. + auto cap = gScheduler->CurrentProcess().ReleaseCapability(caps[i]); + if (!cap) { + return Z_ERR_CAP_NOT_FOUND; + } + message->caps.PushBack(cap); + } + + pending_messages_.PushBack(message); + return Z_OK; +} + +z_err_t UnboundedMessageQueue::PopFront(uint64_t* num_bytes, void* bytes, + uint64_t* num_caps, z_cap_t* caps) { + auto next_msg = pending_messages_.PeekFront(); + if (next_msg->num_bytes > *num_bytes) { + return Z_ERR_BUFF_SIZE; + } + if (next_msg->caps.size() > *num_caps) { + return Z_ERR_BUFF_SIZE; + } + + next_msg = pending_messages_.PopFront(); + + *num_bytes = next_msg->num_bytes; + + for (uint64_t i = 0; i < *num_bytes; i++) { + static_cast(bytes)[i] = next_msg->bytes[i]; + } + + *num_caps = next_msg->caps.size(); + auto& proc = gScheduler->CurrentProcess(); + for (uint64_t i = 0; i < *num_caps; i++) { + caps[i] = proc.AddExistingCapability(next_msg->caps.PopFront()); + } + return Z_OK; +} + +void UnboundedMessageQueue::WriteKernel(uint64_t init, RefPtr cap) { + auto msg = MakeShared(); + msg->bytes = new uint8_t[8]; + msg->num_bytes = sizeof(init); + + uint8_t* data = reinterpret_cast(&init); + for (uint8_t i = 0; i < sizeof(init); i++) { + msg->bytes[i] = data[i]; + } + msg->caps.PushBack(cap); + + pending_messages_.PushBack(msg); +} diff --git a/zion/lib/message_queue.h b/zion/lib/message_queue.h new file mode 100644 index 0000000..420e9e0 --- /dev/null +++ b/zion/lib/message_queue.h @@ -0,0 +1,44 @@ +#pragma once + +#include "capability/capability.h" +#include "include/ztypes.h" +#include "lib/linked_list.h" +#include "lib/shared_ptr.h" + +class MessageQueue { + public: + virtual ~MessageQueue() {} + + virtual z_err_t PushBack(uint64_t num_bytes, const void* bytes, + uint64_t num_caps, const z_cap_t* caps) = 0; + virtual z_err_t PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, + z_cap_t* caps) = 0; +}; + +class UnboundedMessageQueue : public MessageQueue { + public: + UnboundedMessageQueue() {} + UnboundedMessageQueue(const UnboundedMessageQueue&) = delete; + UnboundedMessageQueue& operator=(const UnboundedMessageQueue&) = delete; + virtual ~UnboundedMessageQueue() override {} + + z_err_t PushBack(uint64_t num_bytes, const void* bytes, uint64_t num_caps, + const z_cap_t* caps) override; + z_err_t PopFront(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, + z_cap_t* caps) override; + + void WriteKernel(uint64_t init, RefPtr cap); + + uint64_t size() { return pending_messages_.size(); } + bool empty() { return size() == 0; } + + private: + struct Message { + uint64_t num_bytes; + uint8_t* bytes; + + LinkedList> caps; + }; + + LinkedList> pending_messages_; +}; diff --git a/zion/object/port.cpp b/zion/object/port.cpp index c8440d3..00221ae 100644 --- a/zion/object/port.cpp +++ b/zion/object/port.cpp @@ -4,29 +4,10 @@ Port::Port() {} -z_err_t Port::Write(const ZMessage& msg) { - if (msg.num_bytes > 0x1000) { - dbgln("Large message size unimplemented: %x", msg.num_bytes); - return Z_ERR_INVALID; - } - - auto message = MakeShared(); - message->num_bytes = msg.num_bytes; - message->bytes = new uint8_t[msg.num_bytes]; - for (uint64_t i = 0; i < msg.num_bytes; i++) { - message->bytes[i] = static_cast(msg.data)[i]; - } - - for (uint64_t i = 0; i < msg.num_caps; i++) { - auto cap = gScheduler->CurrentProcess().ReleaseCapability(msg.caps[i]); - if (!cap) { - return Z_ERR_CAP_NOT_FOUND; - } - message->caps.PushBack(cap); - } - - MutexHolder lock(mutex_); - pending_messages_.PushBack(message); +z_err_t Port::Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, + const z_cap_t* caps) { + MutexHolder h(mutex_); + RET_ERR(message_queue_.PushBack(num_bytes, bytes, num_caps, caps)); if (blocked_threads_.size() > 0) { auto thread = blocked_threads_.PopFront(); thread->SetState(Thread::RUNNABLE); @@ -35,9 +16,10 @@ z_err_t Port::Write(const ZMessage& msg) { return Z_OK; } -z_err_t Port::Read(ZMessage& msg) { +z_err_t Port::Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, + z_cap_t* caps) { mutex_.Lock(); - while (pending_messages_.size() < 1) { + while (message_queue_.empty()) { blocked_threads_.PushBack(gScheduler->CurrentThread()); mutex_.Unlock(); gScheduler->Yield(); @@ -46,48 +28,15 @@ z_err_t Port::Read(ZMessage& msg) { mutex_.Unlock(); MutexHolder lock(mutex_); - auto next_msg = pending_messages_.PeekFront(); - if (next_msg->num_bytes > msg.num_bytes) { - return Z_ERR_BUFF_SIZE; - } - if (next_msg->caps.size() > msg.num_caps) { - return Z_ERR_BUFF_SIZE; - } - - msg.num_bytes = next_msg->num_bytes; - - for (uint64_t i = 0; i < msg.num_bytes; i++) { - static_cast(msg.data)[i] = next_msg->bytes[i]; - } - - msg.num_caps = next_msg->caps.size(); - auto& proc = gScheduler->CurrentProcess(); - for (uint64_t i = 0; i < msg.num_caps; i++) { - msg.caps[i] = proc.AddExistingCapability(next_msg->caps.PopFront()); - } - - pending_messages_.PopFront(); - - return Z_OK; + return message_queue_.PopFront(num_bytes, bytes, num_caps, caps); } void Port::WriteKernel(uint64_t init, RefPtr cap) { MutexHolder h(mutex_); - - auto msg = MakeShared(); - msg->bytes = new uint8_t[8]; - msg->num_bytes = sizeof(init); - - uint8_t* data = reinterpret_cast(&init); - for (uint8_t i = 0; i < sizeof(init); i++) { - msg->bytes[i] = data[i]; - } - msg->caps.PushBack(cap); - - pending_messages_.PushBack(msg); + message_queue_.WriteKernel(init, cap); } bool Port::HasMessages() { MutexHolder h(mutex_); - return pending_messages_.size() != 0; + return !message_queue_.empty(); } diff --git a/zion/object/port.h b/zion/object/port.h index c0e5b1d..7bfb7e6 100644 --- a/zion/object/port.h +++ b/zion/object/port.h @@ -2,6 +2,7 @@ #include "capability/capability.h" #include "lib/linked_list.h" +#include "lib/message_queue.h" #include "lib/mutex.h" #include "lib/shared_ptr.h" #include "object/kernel_object.h" @@ -21,23 +22,17 @@ class Port : public KernelObject { Port(); - z_err_t Write(const ZMessage& msg); - z_err_t Read(ZMessage& msg); + z_err_t Write(uint64_t num_bytes, const void* bytes, uint64_t num_caps, + const z_cap_t* caps); + z_err_t Read(uint64_t* num_bytes, void* bytes, uint64_t* num_caps, + z_cap_t* caps); void WriteKernel(uint64_t init, RefPtr cap); bool HasMessages(); private: - struct Message { - uint64_t num_bytes; - uint8_t* bytes; - - LinkedList> caps; - }; - - LinkedList> pending_messages_; - + UnboundedMessageQueue message_queue_; LinkedList> blocked_threads_; Mutex mutex_{"Port"}; diff --git a/zion/syscall/port.cpp b/zion/syscall/port.cpp index 6014d07..c3b5472 100644 --- a/zion/syscall/port.cpp +++ b/zion/syscall/port.cpp @@ -18,13 +18,7 @@ z_err_t PortSend(ZPortSendReq* req) { auto port = port_cap->obj(); RET_IF_NULL(port); - ZMessage message{ - .num_bytes = req->num_bytes, - .data = const_cast(req->data), - .num_caps = req->num_caps, - .caps = req->caps, - }; - return port->Write(message); + return port->Write(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t PortRecv(ZPortRecvReq* req) { @@ -40,10 +34,7 @@ z_err_t PortRecv(ZPortRecvReq* req) { .num_caps = *req->num_caps, .caps = req->caps, }; - RET_ERR(port->Read(message)); - *req->num_bytes = message.num_bytes; - *req->num_caps = message.num_caps; - return Z_OK; + return port->Read(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t PortPoll(ZPortPollReq* req) { @@ -53,19 +44,12 @@ z_err_t PortPoll(ZPortPollReq* req) { auto port = port_cap->obj(); RET_IF_NULL(port); + // FIXME: Race condition here where this call could block if the last message + // is removed between this check and the port read. if (!port->HasMessages()) { return Z_ERR_EMPTY; } - ZMessage message{ - .num_bytes = *req->num_bytes, - .data = const_cast(req->data), - .num_caps = *req->num_caps, - .caps = req->caps, - }; - RET_ERR(port->Read(message)); - *req->num_bytes = message.num_bytes; - *req->num_caps = message.num_caps; - return Z_OK; + return port->Read(req->num_bytes, req->data, req->num_caps, req->caps); } z_err_t IrqRegister(ZIrqRegisterReq* req) {