411 lines
		
	
	
		
			9.1 KiB
		
	
	
	
		
			C++
		
	
	
	
			
		
		
	
	
			411 lines
		
	
	
		
			9.1 KiB
		
	
	
	
		
			C++
		
	
	
	
| /* threaded_queue.cc
 | |
| 
 | |
|    Copyright 2001, 2002, 2003, 2014 Red Hat Inc.
 | |
| 
 | |
|    Written by Robert Collins <rbtcollins@hotmail.com>
 | |
| 
 | |
| This file is part of Cygwin.
 | |
| 
 | |
| This software is a copyrighted work licensed under the terms of the
 | |
| Cygwin license.  Please consult the file "CYGWIN_LICENSE" for
 | |
| details. */
 | |
| 
 | |
| #ifdef __OUTSIDE_CYGWIN__
 | |
| #include "woutsup.h"
 | |
| 
 | |
| #include <assert.h>
 | |
| #include <errno.h>
 | |
| #include <stdio.h>
 | |
| #include <unistd.h>
 | |
| #include <sys/types.h>
 | |
| #include <stdlib.h>
 | |
| #include "threaded_queue.h"
 | |
| 
 | |
| /*****************************************************************************/
 | |
| 
 | |
| /* queue_request */
 | |
| 
 | |
| queue_request::~queue_request ()
 | |
| {}
 | |
| 
 | |
| /*****************************************************************************/
 | |
| 
 | |
| /* threaded_queue */
 | |
| 
 | |
| threaded_queue::threaded_queue (const size_t initial_workers)
 | |
|   : _workers_count (0),
 | |
|     _running (false),
 | |
|     _submitters_head (NULL),
 | |
|     _requests_count (0),
 | |
|     _requests_head (NULL),
 | |
|     _requests_sem (NULL)
 | |
| {
 | |
|   InitializeCriticalSection (&_queue_lock);
 | |
| 
 | |
|   // This semaphore's count is the number of requests on the queue.
 | |
|   // The maximum count (129792) is calculated as MAXIMUM_WAIT_OBJECTS
 | |
|   // multiplied by max. threads per process (2028?), which is (a few)
 | |
|   // more requests than could ever be pending with the current design.
 | |
| 
 | |
|   _requests_sem = CreateSemaphore (NULL,   // SECURITY_ATTRIBUTES
 | |
| 				   0,	   // Initial count
 | |
| 				   129792, // Maximum count
 | |
| 				   NULL);  // Anonymous
 | |
| 
 | |
|   if (!_requests_sem)
 | |
|     {
 | |
|       system_printf (("failed to create the request queue semaphore, "
 | |
| 		      "error = %u"),
 | |
| 		     GetLastError ());
 | |
|       abort ();
 | |
|     }
 | |
| 
 | |
|   create_workers (initial_workers);
 | |
| }
 | |
| 
 | |
| threaded_queue::~threaded_queue ()
 | |
| {
 | |
|   if (_running)
 | |
|     stop ();
 | |
| 
 | |
|   debug_printf ("deleting all pending queue requests");
 | |
|   queue_request *reqptr = _requests_head;
 | |
|   while (reqptr)
 | |
|     {
 | |
|       queue_request *const ptr = reqptr;
 | |
|       reqptr = reqptr->_next;
 | |
|       delete ptr;
 | |
|     }
 | |
| 
 | |
|   DeleteCriticalSection (&_queue_lock);
 | |
|   if (_requests_sem)
 | |
|     (void) CloseHandle (_requests_sem);
 | |
| }
 | |
| 
 | |
| /* FIXME: return success or failure rather than quitting */
 | |
| void
 | |
| threaded_queue::add_submission_loop (queue_submission_loop *const submitter)
 | |
| {
 | |
|   assert (this);
 | |
|   assert (submitter);
 | |
|   assert (submitter->_queue == this);
 | |
|   assert (!submitter->_next);
 | |
| 
 | |
|   submitter->_next =
 | |
|     TInterlockedExchangePointer (&_submitters_head, submitter);
 | |
| 
 | |
|   if (_running)
 | |
|     submitter->start ();
 | |
| }
 | |
| 
 | |
| bool
 | |
| threaded_queue::start ()
 | |
| {
 | |
|   EnterCriticalSection (&_queue_lock);
 | |
|   const bool was_running = _running;
 | |
|   _running = true;
 | |
|   queue_submission_loop *loopptr = _submitters_head;
 | |
|   LeaveCriticalSection (&_queue_lock);
 | |
| 
 | |
|   if (!was_running)
 | |
|     {
 | |
|       debug_printf ("starting all queue submission loops");
 | |
| 
 | |
|       while (loopptr)
 | |
| 	{
 | |
| 	  queue_submission_loop *const ptr = loopptr;
 | |
| 	  loopptr = loopptr->_next;
 | |
| 	  ptr->start ();
 | |
| 	}
 | |
|     }
 | |
| 
 | |
|   return was_running;
 | |
| }
 | |
| 
 | |
| bool
 | |
| threaded_queue::stop ()
 | |
| {
 | |
|   EnterCriticalSection (&_queue_lock);
 | |
|   const bool was_running = _running;
 | |
|   _running = false;
 | |
|   queue_submission_loop *loopptr = _submitters_head;
 | |
|   LeaveCriticalSection (&_queue_lock);
 | |
| 
 | |
|   if (was_running)
 | |
|     {
 | |
|       debug_printf ("stopping all queue submission loops");
 | |
|       while (loopptr)
 | |
| 	{
 | |
| 	  queue_submission_loop *const ptr = loopptr;
 | |
| 	  loopptr = loopptr->_next;
 | |
| 	  ptr->stop ();
 | |
| 	}
 | |
| 
 | |
|       ReleaseSemaphore (_requests_sem, _workers_count, NULL);
 | |
|       while (_workers_count)
 | |
| 	{
 | |
| 	  debug_printf (("waiting for worker threads to terminate: "
 | |
| 			 "%u still running"),
 | |
| 			_workers_count);
 | |
| 	  Sleep (1000);
 | |
| 	}
 | |
|       debug_printf ("all worker threads have terminated");
 | |
|     }
 | |
| 
 | |
|   return was_running;
 | |
| }
 | |
| 
 | |
| /* FIXME: return success or failure */
 | |
| void
 | |
| threaded_queue::add (queue_request *const therequest)
 | |
| {
 | |
|   assert (this);
 | |
|   assert (therequest);
 | |
|   assert (!therequest->_next);
 | |
| 
 | |
|   if (!_workers_count)
 | |
|     {
 | |
|       system_printf ("warning: no worker threads to handle request!");
 | |
|       // FIXME: And then what?
 | |
|     }
 | |
| 
 | |
|   EnterCriticalSection (&_queue_lock);
 | |
|   if (!_requests_head)
 | |
|     _requests_head = therequest;
 | |
|   else
 | |
|     {
 | |
|       /* Add to the queue end. */
 | |
|       queue_request *reqptr = _requests_head;
 | |
|       for (; reqptr->_next; reqptr = reqptr->_next)
 | |
| 	{}
 | |
|       assert (reqptr);
 | |
|       assert (!reqptr->_next);
 | |
|       reqptr->_next = therequest;
 | |
|     }
 | |
| 
 | |
|   _requests_count += 1;
 | |
|   assert (_requests_count > 0);
 | |
|   LeaveCriticalSection (&_queue_lock);
 | |
| 
 | |
|   (void) ReleaseSemaphore (_requests_sem, 1, NULL);
 | |
| }
 | |
| 
 | |
| /*static*/ DWORD WINAPI
 | |
| threaded_queue::start_routine (const LPVOID lpParam)
 | |
| {
 | |
|   class threaded_queue *const queue = (class threaded_queue *) lpParam;
 | |
|   assert (queue);
 | |
| 
 | |
|   queue->worker_loop ();
 | |
| 
 | |
|   const long count = InterlockedDecrement (&queue->_workers_count);
 | |
|   assert (count >= 0);
 | |
| 
 | |
|   if (queue->_running)
 | |
|     debug_printf ("worker loop has exited; thread about to terminate");
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /* Called from the constructor: so no need to be thread-safe until the
 | |
|  * worker threads start to be created; thus the interlocked increment
 | |
|  * of the `_workers_count' field.
 | |
|  */
 | |
| 
 | |
| void
 | |
| threaded_queue::create_workers (const size_t initial_workers)
 | |
| {
 | |
|   assert (initial_workers > 0);
 | |
| 
 | |
|   for (unsigned int i = 0; i != initial_workers; i++)
 | |
|     {
 | |
|       const long count = InterlockedIncrement (&_workers_count);
 | |
|       assert (count > 0);
 | |
| 
 | |
|       DWORD tid;
 | |
|       const HANDLE hThread =
 | |
| 	CreateThread (NULL, 0, start_routine, this, 0, &tid);
 | |
| 
 | |
|       if (!hThread)
 | |
| 	{
 | |
| 	  system_printf ("failed to create thread, error = %u",
 | |
| 			 GetLastError ());
 | |
| 	  abort ();
 | |
| 	}
 | |
| 
 | |
|       (void) CloseHandle (hThread);
 | |
|     }
 | |
| }
 | |
| 
 | |
| void
 | |
| threaded_queue::worker_loop ()
 | |
| {
 | |
|   while (true)
 | |
|     {
 | |
|       const DWORD rc = WaitForSingleObject (_requests_sem, INFINITE);
 | |
|       if (rc == WAIT_FAILED)
 | |
| 	{
 | |
| 	  system_printf ("wait for request semaphore failed, error = %u",
 | |
| 			 GetLastError ());
 | |
| 	  return;
 | |
| 	}
 | |
|       assert (rc == WAIT_OBJECT_0);
 | |
| 
 | |
|       EnterCriticalSection (&_queue_lock);
 | |
|       if (!_running)
 | |
| 	{
 | |
| 	  LeaveCriticalSection (&_queue_lock);
 | |
| 	  return;
 | |
| 	}
 | |
| 
 | |
|       assert (_requests_head);
 | |
|       queue_request *const reqptr = _requests_head;
 | |
|       _requests_head = reqptr->_next;
 | |
| 
 | |
|       _requests_count -= 1;
 | |
|       assert (_requests_count >= 0);
 | |
|       LeaveCriticalSection (&_queue_lock);
 | |
| 
 | |
|       assert (reqptr);
 | |
|       reqptr->process ();
 | |
|       delete reqptr;
 | |
|     }
 | |
| }
 | |
| 
 | |
| /*****************************************************************************/
 | |
| 
 | |
| /* queue_submission_loop */
 | |
| 
 | |
| queue_submission_loop::queue_submission_loop (threaded_queue *const queue,
 | |
| 					      const bool ninterruptible)
 | |
|   : _running (false),
 | |
|     _interrupt_event (NULL),
 | |
|     _queue (queue),
 | |
|     _interruptible (ninterruptible),
 | |
|     _hThread (NULL),
 | |
|     _tid (0),
 | |
|     _next (NULL)
 | |
| {
 | |
|   if (_interruptible)
 | |
|     {
 | |
|       // verbose: debug_printf ("creating an interruptible processing thread");
 | |
| 
 | |
|       _interrupt_event = CreateEvent (NULL,  // SECURITY_ATTRIBUTES
 | |
| 				      FALSE, // Auto-reset
 | |
| 				      FALSE, // Initially non-signalled
 | |
| 				      NULL); // Anonymous
 | |
| 
 | |
|       if (!_interrupt_event)
 | |
| 	{
 | |
| 	  system_printf ("failed to create interrupt event, error = %u",
 | |
| 			 GetLastError ());
 | |
| 	  abort ();
 | |
| 	}
 | |
|     }
 | |
| }
 | |
| 
 | |
| queue_submission_loop::~queue_submission_loop ()
 | |
| {
 | |
|   if (_running)
 | |
|     stop ();
 | |
|   if (_interrupt_event)
 | |
|     (void) CloseHandle (_interrupt_event);
 | |
|   if (_hThread)
 | |
|     (void) CloseHandle (_hThread);
 | |
| }
 | |
| 
 | |
| bool
 | |
| queue_submission_loop::start ()
 | |
| {
 | |
|   assert (this);
 | |
|   assert (!_hThread);
 | |
| 
 | |
|   const bool was_running = _running;
 | |
| 
 | |
|   if (!was_running)
 | |
|     {
 | |
|       _running = true;
 | |
| 
 | |
|       _hThread = CreateThread (NULL, 0, start_routine, this, 0, &_tid);
 | |
|       if (!_hThread)
 | |
| 	{
 | |
| 	  system_printf ("failed to create thread, error = %u",
 | |
| 			 GetLastError ());
 | |
| 	  abort ();
 | |
| 	}
 | |
|     }
 | |
| 
 | |
|   return was_running;
 | |
| }
 | |
| 
 | |
| bool
 | |
| queue_submission_loop::stop ()
 | |
| {
 | |
|   assert (this);
 | |
|   assert (_hThread && _hThread != INVALID_HANDLE_VALUE);
 | |
| 
 | |
|   const bool was_running = _running;
 | |
| 
 | |
|   if (_running)
 | |
|     {
 | |
|       _running = false;
 | |
| 
 | |
|       if (_interruptible)
 | |
| 	{
 | |
| 	  assert (_interrupt_event
 | |
| 		  && _interrupt_event != INVALID_HANDLE_VALUE);
 | |
| 
 | |
| 	  SetEvent (_interrupt_event);
 | |
| 
 | |
| 	  if (WaitForSingleObject (_hThread, 1000) == WAIT_TIMEOUT)
 | |
| 	    {
 | |
| 	      system_printf (("request loop thread %u failed to shutdown "
 | |
| 			      "when asked politely: about to get heavy"),
 | |
| 			     _tid);
 | |
| 
 | |
| 	      if (!TerminateThread (_hThread, 0))
 | |
| 		{
 | |
| 		  system_printf (("failed to kill request loop thread %u"
 | |
| 				  ", error = %u"),
 | |
| 				 _tid, GetLastError ());
 | |
| 		  abort ();
 | |
| 		}
 | |
| 	    }
 | |
| 	}
 | |
|       else
 | |
| 	{
 | |
| 	  // FIXME: could wait to see if the request loop notices that
 | |
| 	  // the submission loop is no longer running and shuts down
 | |
| 	  // voluntarily.
 | |
| 
 | |
| 	  debug_printf ("killing request loop thread %u", _tid);
 | |
| 
 | |
| 	  if (!TerminateThread (_hThread, 0))
 | |
| 	    system_printf (("failed to kill request loop thread %u"
 | |
| 			    ", error = %u"),
 | |
| 			   _tid, GetLastError ());
 | |
| 	}
 | |
|     }
 | |
| 
 | |
|   return was_running;
 | |
| }
 | |
| 
 | |
| /*static*/ DWORD WINAPI
 | |
| queue_submission_loop::start_routine (const LPVOID lpParam)
 | |
| {
 | |
|   class queue_submission_loop *const submission_loop =
 | |
|     (class queue_submission_loop *) lpParam;
 | |
|   assert (submission_loop);
 | |
| 
 | |
|   submission_loop->request_loop ();
 | |
| 
 | |
|   debug_printf ("submission loop has exited; thread about to terminate");
 | |
| 
 | |
|   submission_loop->stop ();
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| /*****************************************************************************/
 | |
| #endif /* __OUTSIDE_CYGWIN__ */
 |