1005 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
			
		
		
	
	
			1005 lines
		
	
	
		
			27 KiB
		
	
	
	
		
			C++
		
	
	
	
| /* aio.cc: Posix asynchronous i/o functions.
 | |
| 
 | |
| This file is part of Cygwin.
 | |
| 
 | |
| This software is a copyrighted work licensed under the terms of the
 | |
| Cygwin license.  Please consult the file "CYGWIN_LICENSE" for
 | |
| details. */
 | |
| 
 | |
| #include "winsup.h"
 | |
| #include "path.h"
 | |
| #include "fhandler.h"
 | |
| #include "dtable.h"
 | |
| #include "cygheap.h"
 | |
| #include "sigproc.h"
 | |
| #include <aio.h>
 | |
| #include <fcntl.h>
 | |
| #include <semaphore.h>
 | |
| #include <unistd.h>
 | |
| 
 | |
| #ifdef __cplusplus
 | |
| extern "C" {
 | |
| #endif
 | |
| 
 | |
| /* 'aioinitialized' is a thread-safe status of AIO feature initialization:
 | |
|  * 0 means uninitialized, >0 means initializing, <0 means initialized
 | |
|  */
 | |
| static NO_COPY volatile LONG    aioinitialized = 0;
 | |
| 
 | |
| /* This implementation supports two flavors of asynchronous operation:
 | |
|  * "inline" and "queued".  Inline AIOs are used when:
 | |
|  *     (1) fd refers to a local non-locked disk file opened in binary mode,
 | |
|  *     (2) no more than AIO_MAX inline AIOs will be in progress at same time.
 | |
|  * In all other cases queued AIOs will be used.
 | |
|  *
 | |
|  * An inline AIO is performed by the calling app's thread as a pread|pwrite on
 | |
|  * a shadow fd that permits Windows asynchronous i/o, with event notification
 | |
|  * on completion.  Event arrival causes AIO context for the fd to be updated.
 | |
|  *
 | |
|  * A queued AIO is performed in a similar manner, but by an AIO worker thread
 | |
|  * rather than the calling app's thread.  The queued flavor can also operate
 | |
|  * on sockets, pipes, non-binary files, mandatory-locked files, and files
 | |
|  * that don't support pread|pwrite.  Generally all these cases are handled as
 | |
|  * synchronous read|write operations, but still don't delay the app because
 | |
|  * they're taken care of by AIO worker threads.
 | |
|  */
 | |
| 
 | |
| /* These variables support inline AIO operations */
 | |
| static NO_COPY HANDLE            evt_handles[AIO_MAX];
 | |
| static NO_COPY struct aiocb     *evt_aiocbs[AIO_MAX];
 | |
| static NO_COPY CRITICAL_SECTION  evt_locks[AIO_MAX]; /* per-slot locks */
 | |
| static NO_COPY CRITICAL_SECTION  slotcrit; /* lock for slot variables in toto */
 | |
| 
 | |
| /* These variables support queued AIO operations */
 | |
| static NO_COPY sem_t             worksem;   /* tells whether AIOs are queued */
 | |
| static NO_COPY CRITICAL_SECTION  workcrit;        /* lock for AIO work queue */
 | |
| TAILQ_HEAD(queue, aiocb) worklist = TAILQ_HEAD_INITIALIZER(worklist);
 | |
| 
 | |
| static int
 | |
| aiochkslot (struct aiocb *aio)
 | |
| {
 | |
|   EnterCriticalSection (&slotcrit);
 | |
| 
 | |
|   /* Sanity check.. make sure this AIO is not already busy */
 | |
|   for (int slot = 0; slot < AIO_MAX; ++slot)
 | |
|     if (evt_aiocbs[slot] == aio)
 | |
|       {
 | |
|         debug_printf ("aio %p is already busy in slot %d", aio, slot);
 | |
|         LeaveCriticalSection (&slotcrit);
 | |
|         return slot;
 | |
|       }
 | |
| 
 | |
|   LeaveCriticalSection (&slotcrit);
 | |
|   return -1;
 | |
| }
 | |
| 
 | |
| static int
 | |
| aiogetslot (struct aiocb *aio)
 | |
| {
 | |
|   EnterCriticalSection (&slotcrit);
 | |
| 
 | |
|   /* Find free slot for this inline AIO; if none available AIO will be queued */
 | |
|   for (int slot = 0; slot < AIO_MAX; ++slot)
 | |
|     if (evt_aiocbs[slot] == NULL)
 | |
|       {
 | |
|         /* If aio is NULL this is just an availability check.. no change made */
 | |
|         if (aio)
 | |
|           evt_aiocbs[slot] = aio;
 | |
|         LeaveCriticalSection (&slotcrit);
 | |
|         return slot;
 | |
|       }
 | |
| 
 | |
|   LeaveCriticalSection (&slotcrit);
 | |
|   return -1;
 | |
| }
 | |
| 
 | |
| static int
 | |
| aiorelslot (struct aiocb *aio)
 | |
| {
 | |
|   EnterCriticalSection (&slotcrit);
 | |
| 
 | |
|   /* Find slot associated with this inline AIO and free it */
 | |
|   for (int slot = 0; slot < AIO_MAX; ++slot)
 | |
|     if (evt_aiocbs[slot] == aio)
 | |
|       {
 | |
|         evt_aiocbs[slot] = NULL;
 | |
|         LeaveCriticalSection (&slotcrit);
 | |
|         return slot;
 | |
|       }
 | |
| 
 | |
|   LeaveCriticalSection (&slotcrit);
 | |
|   return -1;
 | |
| }
 | |
| 
 | |
| static void
 | |
| aionotify_on_pthread (struct sigevent *evp)
 | |
| {
 | |
|   pthread_attr_t *attr;
 | |
|   pthread_attr_t  default_attr;
 | |
|   int             rc;
 | |
|   pthread_t       vaquita; /* == "little porpoise", endangered, see below */
 | |
| 
 | |
|   if (evp->sigev_notify_attributes)
 | |
|     attr = evp->sigev_notify_attributes;
 | |
|   else
 | |
|     {
 | |
|       pthread_attr_init (attr = &default_attr);
 | |
|       pthread_attr_setdetachstate (attr, PTHREAD_CREATE_DETACHED);
 | |
|     }
 | |
| 
 | |
|   /* A "vaquita" thread is a temporary pthread created to deliver a signal to
 | |
|    * the application.  We don't wait around for the thread to return from the
 | |
|    * app.  There's some symbolism here of sending a little creature off to tell
 | |
|    * the app something important.  If all the vaquitas end up wiped out in the
 | |
|    * wild, a distinct near-term possibility, at least this code remembers them.
 | |
|    */
 | |
|   rc = pthread_create (&vaquita, attr,
 | |
|                        (void * (*) (void *)) evp->sigev_notify_function,
 | |
|                        evp->sigev_value.sival_ptr);
 | |
| 
 | |
|   /* The following error is not expected. If seen often, develop a recovery. */
 | |
|   if (rc)
 | |
|     debug_printf ("aio vaquita thread creation failed, %E");
 | |
| 
 | |
|   /* Should we wait for the signal delivery thread to finish?  We can't: Who
 | |
|    * knows what mischief the app coder may have in their handler?  Worst case
 | |
|    * is they accidentally used non-signal-safe functions in their handler.  We
 | |
|    * return hoping for the best and finish cleaning up our end of notification.
 | |
|    */
 | |
|   return;
 | |
| }
 | |
| 
 | |
| static void
 | |
| aionotify (struct aiocb *aio)
 | |
| {
 | |
|   siginfo_t si = {0};
 | |
|   si.si_code = SI_ASYNCIO;
 | |
| 
 | |
|   /* If signal notification wanted, send AIO-complete signal */
 | |
|   switch (aio->aio_sigevent.sigev_notify) {
 | |
|   case SIGEV_NONE:
 | |
|     break;
 | |
| 
 | |
|   case SIGEV_SIGNAL:
 | |
|     si.si_signo = aio->aio_sigevent.sigev_signo;
 | |
|     si.si_value = aio->aio_sigevent.sigev_value;
 | |
|     if (si.si_signo)
 | |
|       sig_send (myself, si);
 | |
|     break;
 | |
| 
 | |
|   case SIGEV_THREAD:
 | |
|     aionotify_on_pthread (&aio->aio_sigevent);
 | |
|     break;
 | |
|   }
 | |
| 
 | |
|   /* If this op is on LIO list and is last op, send LIO-complete signal */
 | |
|   if (aio->aio_liocb)
 | |
|     {
 | |
|       if (1 == InterlockedExchangeAdd (&aio->aio_liocb->lio_count, -1))
 | |
|         {
 | |
|           /* LIO's count has decremented to zero */
 | |
|           switch (aio->aio_liocb->lio_sigevent->sigev_notify) {
 | |
|           case SIGEV_NONE:
 | |
|             break;
 | |
| 
 | |
|           case SIGEV_SIGNAL:
 | |
|             si.si_signo = aio->aio_liocb->lio_sigevent->sigev_signo;
 | |
|             si.si_value = aio->aio_liocb->lio_sigevent->sigev_value;
 | |
|             if (si.si_signo)
 | |
|               sig_send (myself, si);
 | |
|             break;
 | |
| 
 | |
|           case SIGEV_THREAD:
 | |
|             aionotify_on_pthread (aio->aio_liocb->lio_sigevent);
 | |
|             break;
 | |
|           }
 | |
| 
 | |
|           free (aio->aio_liocb);
 | |
|           aio->aio_liocb = NULL;
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| static DWORD WINAPI __attribute__ ((noreturn))
 | |
| aiowaiter (void *unused)
 | |
| { /* One instance, called on its own cygthread; runs until program exits */
 | |
|   struct aiocb *aio;
 | |
| 
 | |
|   while (1)
 | |
|     {
 | |
|       /* Wait forever for at least one event to be set */
 | |
|       DWORD res = WaitForMultipleObjects(AIO_MAX, evt_handles, FALSE, INFINITE);
 | |
|       switch (res)
 | |
|         {
 | |
|           case WAIT_FAILED:
 | |
|             api_fatal ("aiowaiter fatal error, %E");
 | |
| 
 | |
|           default:
 | |
|             if (res < WAIT_OBJECT_0 || res >= WAIT_OBJECT_0 + AIO_MAX)
 | |
|               api_fatal ("aiowaiter unexpected WFMO result %d", res);
 | |
|             int slot = res - WAIT_OBJECT_0;
 | |
| 
 | |
|             /* Guard against "saw completion before request finished" gotcha */
 | |
|             EnterCriticalSection (&evt_locks[slot]);
 | |
|             LeaveCriticalSection (&evt_locks[slot]);
 | |
| 
 | |
|             aio = evt_aiocbs[slot];
 | |
|             debug_printf ("WFMO returns %d, aio %p", res, aio);
 | |
| 
 | |
|             if (aio->aio_errno == EBUSY)
 | |
|               {
 | |
|                 /* Capture Windows status and convert to Cygwin status */
 | |
|                 NTSTATUS status = (NTSTATUS) aio->aio_wincb.status;
 | |
|                 if (NT_SUCCESS (status))
 | |
|                   {
 | |
|                     aio->aio_rbytes = (ssize_t) aio->aio_wincb.info;
 | |
|                     aio->aio_errno = 0;
 | |
|                   }
 | |
|                 else
 | |
|                   {
 | |
|                     aio->aio_rbytes = -1;
 | |
|                     aio->aio_errno = geterrno_from_nt_status (status);
 | |
|                   }
 | |
|               }
 | |
|             else
 | |
|               {
 | |
|                 /* Async operation was simulated; AIO status already updated */
 | |
|               }
 | |
| 
 | |
|             /* Send completion signal if user requested it */
 | |
|             aionotify (aio);
 | |
| 
 | |
|             /* Free up the slot used for this inline AIO.  We do this
 | |
|              * manually rather than calling aiorelslot() because we
 | |
|              * already have the slot number handy.
 | |
|              */
 | |
|             EnterCriticalSection (&slotcrit);
 | |
|             evt_aiocbs[slot] = NULL;
 | |
|             LeaveCriticalSection (&slotcrit);
 | |
|             debug_printf ("retired aio %p; slot %d released", aio, slot);
 | |
| 
 | |
|             /* Notify workers that a slot has opened up */
 | |
|             sem_post (&worksem);
 | |
|         }
 | |
|     }
 | |
| }
 | |
| 
 | |
| static ssize_t
 | |
| asyncread (struct aiocb *aio)
 | |
| { /* Try to initiate an asynchronous read, either from app or worker thread */
 | |
|   ssize_t       res = 0;
 | |
| 
 | |
|   cygheap_fdget cfd (aio->aio_fildes);
 | |
|   if (cfd < 0)
 | |
|     res = -1; /* errno has been set to EBADF */
 | |
|   else
 | |
|     {
 | |
|       int slot = aiogetslot (aio);
 | |
|       debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
 | |
|       if (slot >= 0)
 | |
|         {
 | |
|           EnterCriticalSection (&evt_locks[slot]);
 | |
|           aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
 | |
|           aio->aio_wincb.event = (void *) evt_handles[slot];
 | |
|           res = cfd->pread ((void *) aio->aio_buf, aio->aio_nbytes,
 | |
|                             aio->aio_offset, (void *) aio);
 | |
|           LeaveCriticalSection (&evt_locks[slot]);
 | |
|         }
 | |
|       else
 | |
|         {
 | |
|           set_errno (ENOBUFS); /* Internal use only */
 | |
|           res = -1;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| static ssize_t
 | |
| asyncwrite (struct aiocb *aio)
 | |
| { /* Try to initiate an asynchronous write, either from app or worker thread */
 | |
|   ssize_t       res = 0;
 | |
| 
 | |
|   cygheap_fdget cfd (aio->aio_fildes);
 | |
|   if (cfd < 0)
 | |
|     res = -1; /* errno has been set to EBADF */
 | |
|   else
 | |
|     {
 | |
|       int slot = aiogetslot (aio);
 | |
|       debug_printf ("slot %d%s", slot, slot >= 0 ? " acquired" : "");
 | |
|       if (slot >= 0)
 | |
|         {
 | |
|           EnterCriticalSection (&evt_locks[slot]);
 | |
|           aio->aio_errno = EBUSY; /* Mark AIO as physically underway now */
 | |
|           aio->aio_wincb.event = (void *) evt_handles[slot];
 | |
|           res = cfd->pwrite ((void *) aio->aio_buf, aio->aio_nbytes,
 | |
|                              aio->aio_offset, (void *) aio);
 | |
|           LeaveCriticalSection (&evt_locks[slot]);
 | |
|         }
 | |
|       else
 | |
|         {
 | |
|           set_errno (ENOBUFS); /* Internal use only */
 | |
|           res = -1;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| /* Have to forward ref because of chicken v. egg situation */
 | |
| static DWORD WINAPI __attribute__ ((noreturn)) aioworker (void *);
 | |
| 
 | |
| static void
 | |
| aioinit (void)
 | |
| {
 | |
|   /* First a cheap test to speed processing after initialization completes */
 | |
|   if (aioinitialized >= 0)
 | |
|     {
 | |
|       /* Guard against multiple threads initializing at same time */
 | |
|       if (0 == InterlockedExchangeAdd (&aioinitialized, 1))
 | |
|         {
 | |
|           int       i = AIO_MAX;
 | |
|           char     *tnames = (char *) malloc (AIO_MAX * 8);
 | |
| 
 | |
|           if (!tnames)
 | |
|             api_fatal ("couldn't create aioworker tname table");
 | |
| 
 | |
|           InitializeCriticalSection (&slotcrit);
 | |
|           InitializeCriticalSection (&workcrit);
 | |
|           sem_init (&worksem, 0, 0);
 | |
|           TAILQ_INIT(&worklist);
 | |
| 
 | |
|           /* Create AIO_MAX number of aioworker threads for queued AIOs */
 | |
|           while (i--)
 | |
|             {
 | |
|               __small_sprintf (&tnames[i * 8], "aio%d", AIO_MAX - i);
 | |
|               if (!new cygthread (aioworker, NULL, &tnames[i * 8]))
 | |
|                 api_fatal ("couldn't create an aioworker thread, %E");
 | |
|             }
 | |
| 
 | |
|           /* Initialize event handles and slot locks arrays for inline AIOs */
 | |
|           for (i = 0; i < AIO_MAX; ++i)
 | |
|             {
 | |
|               /* Events are non-inheritable, auto-reset, init unset, unnamed */
 | |
|               evt_handles[i] = CreateEvent (NULL, FALSE, FALSE, NULL);
 | |
|               if (!evt_handles[i])
 | |
|                 api_fatal ("couldn't create an event, %E");
 | |
| 
 | |
|               InitializeCriticalSection (&evt_locks[i]);
 | |
|             }
 | |
| 
 | |
|           /* Create aiowaiter thread; waits for inline AIO completion events */
 | |
|           if (!new cygthread (aiowaiter, NULL, "aio"))
 | |
|             api_fatal ("couldn't create aiowaiter thread, %E");
 | |
| 
 | |
|           /* Indicate we have completed initialization */
 | |
|           InterlockedExchange (&aioinitialized, -1);
 | |
|         }
 | |
|       else
 | |
|         /* If 'aioinitialized' is greater than zero, another thread is
 | |
|          * initializing for us; wait until 'aioinitialized' goes negative
 | |
|          */
 | |
|         while (InterlockedExchangeAdd (&aioinitialized, 0) >= 0)
 | |
|           yield ();
 | |
|     }
 | |
| }
 | |
| 
 | |
| static int
 | |
| aioqueue (struct aiocb *aio)
 | |
| { /* Add an AIO to the worklist, to be serviced by a worker thread */
 | |
|   if (aioinitialized >= 0)
 | |
|     aioinit ();
 | |
| 
 | |
|   EnterCriticalSection (&workcrit);
 | |
|   TAILQ_INSERT_TAIL(&worklist, aio, aio_chain);
 | |
|   LeaveCriticalSection (&workcrit);
 | |
| 
 | |
|   debug_printf ("queued aio %p", aio);
 | |
|   sem_post (&worksem);
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| static DWORD WINAPI __attribute__ ((noreturn))
 | |
| aioworker (void *unused)
 | |
| { /* Multiple instances; called on own cygthreads; runs 'til program exits */
 | |
|   struct aiocb *aio;
 | |
| 
 | |
|   while (1)
 | |
|     {
 | |
|       /* Park here until there's work to do or a slot becomes available */
 | |
|       sem_wait (&worksem);
 | |
| 
 | |
| look4work:
 | |
|       EnterCriticalSection (&workcrit);
 | |
|       if (TAILQ_EMPTY(&worklist))
 | |
|         {
 | |
|           /* Another aioworker picked up the work already */
 | |
|           LeaveCriticalSection (&workcrit);
 | |
|           continue;
 | |
|         }
 | |
| 
 | |
|       /* Make sure a slot is available before starting this AIO */
 | |
|       aio = TAILQ_FIRST(&worklist);
 | |
|       int slot = aiogetslot (NULL);
 | |
|       if (slot >= 0) // a slot is available
 | |
|         TAILQ_REMOVE(&worklist, aio, aio_chain);
 | |
|       LeaveCriticalSection (&workcrit);
 | |
|       if (slot < 0) // no slot is available, so worklist unchanged and we park
 | |
|         continue;
 | |
| 
 | |
|       debug_printf ("starting aio %p", aio);
 | |
|       switch (aio->aio_lio_opcode)
 | |
|         {
 | |
|           case LIO_NOP:
 | |
|             aio->aio_rbytes = 0;
 | |
|             break;
 | |
| 
 | |
|           case LIO_READ:
 | |
|             aio->aio_rbytes = asyncread (aio);
 | |
|             break;
 | |
| 
 | |
|           case LIO_WRITE:
 | |
|             aio->aio_rbytes = asyncwrite (aio);
 | |
|             break;
 | |
| 
 | |
|           default:
 | |
|             errno = EINVAL;
 | |
|             aio->aio_rbytes = -1;
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|       /* If operation still underway, let aiowaiter hear about its finish */
 | |
|       if (aio->aio_rbytes == 0 && aio->aio_nbytes != 0) // not racy
 | |
|         continue;
 | |
| 
 | |
|       /* If operation errored, save error number, else clear it */
 | |
|       if (aio->aio_rbytes == -1)
 | |
|         aio->aio_errno = get_errno ();
 | |
|       else
 | |
|         aio->aio_errno = 0;
 | |
| 
 | |
|       /* If a slot for this queued async AIO was available, but we lost out */
 | |
|       if (aio->aio_errno == ENOBUFS)
 | |
|         {
 | |
|           aio->aio_errno = EINPROGRESS;
 | |
|           aioqueue (aio); /* Re-queue the AIO */
 | |
| 
 | |
|           /* Another option would be to fail the AIO with error EAGAIN, but
 | |
|            * experience with iozone showed apps might not expect to see a
 | |
|            * deferred EAGAIN.  I.e. they should expect EAGAIN on their call to
 | |
|            * aio_read() or aio_write() but probably not expect to see EAGAIN
 | |
|            * on an aio_error() query after they'd previously seen EINPROGRESS
 | |
|            * on the initial AIO call.
 | |
|            */
 | |
|           continue;
 | |
|         }
 | |
| 
 | |
|       /* If seeks aren't permitted on given fd, or pread|pwrite not legal */
 | |
|       if (aio->aio_errno == ESPIPE)
 | |
|         {
 | |
|           ssize_t res = 0;
 | |
|           off_t curpos;
 | |
| 
 | |
|           cygheap_fdget cfd (aio->aio_fildes);
 | |
|           if (cfd < 0)
 | |
|             {
 | |
|               res = -1;
 | |
|               goto done; /* errno has been set to EBADF */
 | |
|             }
 | |
| 
 | |
|           /* If we can get current file position, seek to aio_offset */
 | |
|           curpos = cfd->lseek (0, SEEK_CUR);
 | |
|           if (curpos < 0 || cfd->lseek (aio->aio_offset, SEEK_SET) < 0)
 | |
|             {
 | |
|               /* Can't seek */
 | |
|               res = curpos;
 | |
|               set_errno (0); /* Get rid of ESPIPE we've incurred */
 | |
|               aio->aio_errno = 0; /* Here too */
 | |
|             }
 | |
| 
 | |
|           /* Do the requested AIO operation manually, synchronously */
 | |
|           switch (aio->aio_lio_opcode)
 | |
|             {
 | |
|               case LIO_READ:
 | |
|                 /* 2nd argument to cfd->read() is passed by reference... */
 | |
|                 cfd->read ((void *) aio->aio_buf, aio->aio_nbytes);
 | |
|                 /* ...so on return it contains the number of bytes read */
 | |
|                 res = aio->aio_nbytes;
 | |
|                 break;
 | |
| 
 | |
|               case LIO_WRITE:
 | |
|                 res = cfd->write ((void *) aio->aio_buf, aio->aio_nbytes);
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|           /* If we had seeked successfully, restore original file position */
 | |
|           if (curpos >= 0)
 | |
|             if (cfd->lseek (curpos, SEEK_SET) < 0)
 | |
|               res = -1;
 | |
| 
 | |
| done:
 | |
|           /* Update AIO to reflect final result */
 | |
|           aio->aio_rbytes = res;
 | |
|           aio->aio_errno = res == -1 ? get_errno () : 0;
 | |
| 
 | |
|           /* Make like the requested async operation completed normally */
 | |
|           for (int i = 0; i < AIO_MAX; i++)
 | |
|             if (evt_aiocbs[i] == aio)
 | |
|               {
 | |
|                 SetEvent (evt_handles[i]);
 | |
|                 goto truly_done;
 | |
|               }
 | |
| 
 | |
|           /* Free up the slot we ended up not using */
 | |
|           int slot = aiorelslot (aio);
 | |
|           debug_printf ("slot %d released", slot);
 | |
|         }
 | |
| 
 | |
|       /* Send completion signal if user requested it */
 | |
|       aionotify (aio);
 | |
| 
 | |
| truly_done:
 | |
|       debug_printf ("completed aio %p", aio);
 | |
|       goto look4work;
 | |
|     }
 | |
| }
 | |
| 
 | |
| int
 | |
| aio_cancel (int fildes, struct aiocb *aio)
 | |
| {
 | |
|   int           aiocount = 0;
 | |
|   struct aiocb *ptr;
 | |
|   siginfo_t     si = {0};
 | |
|   si.si_code = SI_ASYNCIO;
 | |
| 
 | |
|   /* Note 'aio' is allowed to be NULL here; it's used as a wildcard */
 | |
| restart:
 | |
|   EnterCriticalSection (&workcrit);
 | |
|   TAILQ_FOREACH(ptr, &worklist, aio_chain)
 | |
|     {
 | |
|       if (ptr->aio_fildes == fildes && (!aio || ptr == aio))
 | |
|         {
 | |
|           /* This queued AIO qualifies for cancellation */
 | |
|           TAILQ_REMOVE(&worklist, ptr, aio_chain);
 | |
|           LeaveCriticalSection (&workcrit);
 | |
| 
 | |
|           ptr->aio_errno = ECANCELED;
 | |
|           ptr->aio_rbytes = -1;
 | |
| 
 | |
|           /* If signal notification wanted, send AIO-canceled signal */
 | |
|           switch (ptr->aio_sigevent.sigev_notify) {
 | |
|           case SIGEV_NONE:
 | |
|             break;
 | |
| 
 | |
|           case SIGEV_SIGNAL:
 | |
|             si.si_signo = ptr->aio_sigevent.sigev_signo;
 | |
|             si.si_value = ptr->aio_sigevent.sigev_value;
 | |
|             if (si.si_signo)
 | |
|               sig_send (myself, si);
 | |
|             break;
 | |
| 
 | |
|           case SIGEV_THREAD:
 | |
|             aionotify_on_pthread (&ptr->aio_sigevent);
 | |
|             break;
 | |
|           }
 | |
| 
 | |
|           ++aiocount;
 | |
|           goto restart;
 | |
|         }
 | |
|     }
 | |
|   LeaveCriticalSection (&workcrit);
 | |
| 
 | |
|   /* Note that AIO_NOTCANCELED is not possible in this implementation.  That's
 | |
|    * because AIOs are dequeued to execute; the worklist search above won't
 | |
|    * find an AIO that's been dequeued from the worklist.
 | |
|    */
 | |
|   if (aiocount)
 | |
|     return AIO_CANCELED;
 | |
|   else
 | |
|     return AIO_ALLDONE;
 | |
| }
 | |
| 
 | |
| int
 | |
| aio_error (const struct aiocb *aio)
 | |
| {
 | |
|   int res;
 | |
| 
 | |
|   if (!aio)
 | |
|     {
 | |
|       set_errno (EINVAL);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   switch (aio->aio_errno)
 | |
|     {
 | |
|       case EBUSY:   /* This state for internal use only; not visible to app */
 | |
|       case ENOBUFS: /* This state for internal use only; not visible to app */
 | |
|         res = EINPROGRESS;
 | |
|         break;
 | |
| 
 | |
|       default:
 | |
|         res = aio->aio_errno;
 | |
|     }
 | |
| 
 | |
|   return res;
 | |
| }
 | |
| 
 | |
| int
 | |
| aio_fsync (int mode, struct aiocb *aio)
 | |
| {
 | |
|   if (!aio)
 | |
|     {
 | |
|       set_errno (EINVAL);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   switch (mode)
 | |
|     {
 | |
| #if defined(O_SYNC)
 | |
|       case O_SYNC:
 | |
|         aio->aio_rbytes = fsync (aio->aio_fildes);
 | |
|         break;
 | |
| 
 | |
| #if defined(O_DSYNC) && O_DSYNC != O_SYNC
 | |
|       case O_DSYNC:
 | |
|         aio->aio_rbytes = fdatasync (aio->aio_fildes);
 | |
|         break;
 | |
| #endif
 | |
| #endif
 | |
| 
 | |
|       default:
 | |
|         set_errno (EINVAL);
 | |
|         return -1;
 | |
|     }
 | |
| 
 | |
|   if (aio->aio_rbytes == -1)
 | |
|     aio->aio_errno = get_errno ();
 | |
| 
 | |
|   return aio->aio_rbytes;
 | |
| }
 | |
| 
 | |
| int
 | |
| aio_read (struct aiocb *aio)
 | |
| {
 | |
|   ssize_t       res = 0;
 | |
| 
 | |
|   if (!aio)
 | |
|     {
 | |
|       set_errno (EINVAL);
 | |
|       return -1;
 | |
|     }
 | |
|   if (aioinitialized >= 0)
 | |
|     aioinit ();
 | |
|   if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
 | |
|     {
 | |
|       set_errno (EAGAIN);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   aio->aio_lio_opcode = LIO_READ;
 | |
|   aio->aio_errno = EINPROGRESS;
 | |
|   aio->aio_rbytes = -1;
 | |
| 
 | |
|   /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
 | |
|   if (aio->aio_sigevent.sigev_signo == 0)
 | |
|     aio->aio_sigevent.sigev_notify = SIGEV_NONE;
 | |
| 
 | |
|   /* Try to launch inline async read; only on ESPIPE/ENOBUFS is it queued */
 | |
|   pthread_testcancel ();
 | |
|   res = asyncread (aio);
 | |
| 
 | |
|   /* If async read couldn't be launched, queue the AIO for a worker thread */
 | |
|   if (res == -1)
 | |
|     switch (get_errno ()) {
 | |
|     case ESPIPE:
 | |
|       {
 | |
|         int slot = aiorelslot (aio);
 | |
|         if (slot >= 0)
 | |
|           debug_printf ("slot %d released", slot);
 | |
|       }
 | |
|       fallthrough;
 | |
| 
 | |
|     case ENOBUFS:
 | |
|       aio->aio_errno = EINPROGRESS;
 | |
|       aio->aio_rbytes = -1;
 | |
| 
 | |
|       res = aioqueue (aio);
 | |
|       break;
 | |
| 
 | |
|     default:
 | |
|       ; /* I think this is not possible */
 | |
|     }
 | |
| 
 | |
|   return res < 0 ? (int) res : 0; /* return 0 on success */
 | |
| }
 | |
| 
 | |
| ssize_t
 | |
| aio_return (struct aiocb *aio)
 | |
| {
 | |
|   if (!aio)
 | |
|     {
 | |
|       set_errno (EINVAL);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   switch (aio->aio_errno)
 | |
|     {
 | |
|       case EBUSY:       /* AIO is currently underway (internal state) */
 | |
|       case ENOBUFS:     /* AIO is currently underway (internal state) */
 | |
|       case EINPROGRESS: /* AIO has been queued successfully */
 | |
|         set_errno (EINPROGRESS);
 | |
|         return -1;
 | |
| 
 | |
|       case EINVAL:      /* aio_return() has already been called on this AIO */
 | |
|         set_errno (aio->aio_errno);
 | |
|         return -1;
 | |
| 
 | |
|       default:          /* AIO has completed, successfully or not */
 | |
|         ;
 | |
|     }
 | |
| 
 | |
|   /* This AIO has completed so grab any error status if present */
 | |
|   if (aio->aio_rbytes == -1)
 | |
|     set_errno (aio->aio_errno);
 | |
| 
 | |
|   /* Set this AIO's errno so later aio_return() calls on this AIO fail */
 | |
|   aio->aio_errno = EINVAL;
 | |
| 
 | |
|   return aio->aio_rbytes;
 | |
| }
 | |
| 
 | |
| static int
 | |
| aiosuspend (const struct aiocb *const aiolist[],
 | |
|          int nent, const struct timespec *timeout)
 | |
| {
 | |
|   /* Returns lowest list index of completed aios, else 'nent' if all completed.
 | |
|    * If none completed on entry, wait for interval specified by 'timeout'.
 | |
|    */
 | |
|   int       res;
 | |
|   sigset_t  sigmask;
 | |
|   siginfo_t si;
 | |
|   ULONGLONG nsecs = 0;
 | |
|   ULONGLONG time0, time1;
 | |
|   struct timespec to = {0};
 | |
| 
 | |
|   if (timeout)
 | |
|     {
 | |
|       to = *timeout;
 | |
|       if (!valid_timespec (to))
 | |
|         {
 | |
|           set_errno (EINVAL);
 | |
|           return -1;
 | |
|         }
 | |
|       nsecs = (NSPERSEC * to.tv_sec) + to.tv_nsec;
 | |
|     }
 | |
| 
 | |
| retry:
 | |
|   sigemptyset (&sigmask);
 | |
|   int aiocount = 0;
 | |
|   for (int i = 0; i < nent; ++i)
 | |
|     if (aiolist[i] && aiolist[i]->aio_liocb)
 | |
|       {
 | |
|         if (aiolist[i]->aio_errno == EINPROGRESS ||
 | |
|             aiolist[i]->aio_errno == ENOBUFS ||
 | |
|             aiolist[i]->aio_errno == EBUSY)
 | |
|           {
 | |
|             ++aiocount;
 | |
|             if (aiolist[i]->aio_sigevent.sigev_notify == SIGEV_SIGNAL ||
 | |
|                 aiolist[i]->aio_sigevent.sigev_notify == SIGEV_THREAD)
 | |
|               sigaddset (&sigmask, aiolist[i]->aio_sigevent.sigev_signo);
 | |
|           }
 | |
|         else
 | |
|           return i;
 | |
|       }
 | |
| 
 | |
|   if (aiocount == 0)
 | |
|     return nent;
 | |
| 
 | |
|   if (timeout && nsecs == 0)
 | |
|     {
 | |
|       set_errno (EAGAIN);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   time0 = get_clock (CLOCK_MONOTONIC)->nsecs ();
 | |
|   /* Note wait below is abortable even w/ empty sigmask and infinite timeout */
 | |
|   res = sigtimedwait (&sigmask, &si, timeout ? &to : NULL);
 | |
|   if (res == -1)
 | |
|     return -1; /* Return with errno set by failed sigtimedwait() */
 | |
|   time1 = get_clock (CLOCK_MONOTONIC)->nsecs ();
 | |
| 
 | |
|   /* Adjust timeout to account for time just waited */
 | |
|   time1 -= time0;
 | |
|   if (time1 > nsecs)
 | |
|     nsecs = 0; // just in case we didn't get rescheduled very quickly
 | |
|   else
 | |
|     nsecs -= time1;
 | |
|   to.tv_sec = nsecs / NSPERSEC;
 | |
|   to.tv_nsec = nsecs % NSPERSEC;
 | |
| 
 | |
|   goto retry;
 | |
| }
 | |
| 
 | |
| int
 | |
| aio_suspend (const struct aiocb *const aiolist[],
 | |
|              int nent, const struct timespec *timeout)
 | |
| {
 | |
|   int res;
 | |
| 
 | |
|   if (nent < 0)
 | |
|     {
 | |
|       set_errno (EINVAL);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   pthread_testcancel ();
 | |
|   res = aiosuspend (aiolist, nent, timeout);
 | |
| 
 | |
|   /* If there was an error, or no AIOs completed before or during timeout */
 | |
|   if (res == -1)
 | |
|     return res; /* If no AIOs completed, errno has been set to EAGAIN */
 | |
| 
 | |
|   /* Else if all AIOs have completed */
 | |
|   else if (res == nent)
 | |
|     return 0;
 | |
| 
 | |
|   /* Else at least one of the AIOs completed */
 | |
|   else
 | |
|     return 0;
 | |
| }
 | |
| 
 | |
| int
 | |
| aio_write (struct aiocb *aio)
 | |
| {
 | |
|   ssize_t       res = 0;
 | |
| 
 | |
|   if (!aio)
 | |
|     {
 | |
|       set_errno (EINVAL);
 | |
|       return -1;
 | |
|     }
 | |
|   if (aioinitialized >= 0)
 | |
|     aioinit ();
 | |
|   if (aio->aio_errno == EINPROGRESS || -1 != aiochkslot (aio))
 | |
|     {
 | |
|       set_errno (EAGAIN);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   aio->aio_lio_opcode = LIO_WRITE;
 | |
|   aio->aio_errno = EINPROGRESS;
 | |
|   aio->aio_rbytes = -1;
 | |
| 
 | |
|   /* Ensure zeroed (i.e. initialized but unused) aio_sigevent doesn't signal */
 | |
|   if (aio->aio_sigevent.sigev_signo == 0)
 | |
|     aio->aio_sigevent.sigev_notify = SIGEV_NONE;
 | |
| 
 | |
|   /* Try to launch inline async write; only on ESPIPE/ENOBUFS is it queued */
 | |
|   pthread_testcancel ();
 | |
|   res = asyncwrite (aio);
 | |
| 
 | |
|   /* If async write couldn't be launched, queue the AIO for a worker thread */
 | |
|   if (res == -1)
 | |
|     switch (get_errno ()) {
 | |
|     case ESPIPE:
 | |
|       {
 | |
|         int slot = aiorelslot (aio);
 | |
|         if (slot >= 0)
 | |
|           debug_printf ("slot %d released", slot);
 | |
|       }
 | |
|       fallthrough;
 | |
| 
 | |
|     case ENOBUFS:
 | |
|       aio->aio_errno = EINPROGRESS;
 | |
|       aio->aio_rbytes = -1;
 | |
| 
 | |
|       res = aioqueue (aio);
 | |
|       break;
 | |
| 
 | |
|     default:
 | |
|       ; /* I think this is not possible */
 | |
|     }
 | |
| 
 | |
|   return res < 0 ? (int) res : 0; /* return 0 on success */
 | |
| }
 | |
| 
 | |
| int
 | |
| lio_listio (int mode, struct aiocb *__restrict const aiolist[__restrict],
 | |
|             int nent, struct sigevent *__restrict sig)
 | |
| {
 | |
|   struct aiocb *aio;
 | |
|   struct __liocb *lio;
 | |
| 
 | |
|   pthread_testcancel ();
 | |
| 
 | |
|   if ((mode != LIO_WAIT && mode != LIO_NOWAIT) ||
 | |
|       (nent < 0 || nent > AIO_LISTIO_MAX))
 | |
|     {
 | |
|       set_errno (EINVAL);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   if (sig && nent && mode == LIO_NOWAIT)
 | |
|     {
 | |
|       lio = (struct __liocb *) malloc (sizeof (struct __liocb));
 | |
|       if (!lio)
 | |
|         {
 | |
|           set_errno (ENOMEM);
 | |
|           return -1;
 | |
|         }
 | |
| 
 | |
|       lio->lio_count = nent;
 | |
|       lio->lio_sigevent = sig;
 | |
|     }
 | |
|   else
 | |
|     lio = NULL;
 | |
| 
 | |
|   int aiocount = 0;
 | |
|   for (int i = 0; i < nent; ++i)
 | |
|     {
 | |
|       aio = (struct aiocb *) aiolist[i];
 | |
|       if (!aio)
 | |
|         {
 | |
|           if (lio)
 | |
|             InterlockedDecrement (&lio->lio_count);
 | |
|           continue;
 | |
|         }
 | |
| 
 | |
|       aio->aio_liocb = lio;
 | |
|       switch (aio->aio_lio_opcode)
 | |
|         {
 | |
|           case LIO_NOP:
 | |
|             if (lio)
 | |
|               InterlockedDecrement (&lio->lio_count);
 | |
|             continue;
 | |
| 
 | |
|           case LIO_READ:
 | |
|             aio_read (aio);
 | |
|             ++aiocount;
 | |
|             continue;
 | |
| 
 | |
|           case LIO_WRITE:
 | |
|             aio_write (aio);
 | |
|             ++aiocount;
 | |
|             continue;
 | |
| 
 | |
|           default:
 | |
|             break;
 | |
|         }
 | |
| 
 | |
|       if (lio)
 | |
|         InterlockedDecrement (&lio->lio_count);
 | |
|       aio->aio_errno = EINVAL;
 | |
|       aio->aio_rbytes = -1;
 | |
|     }
 | |
| 
 | |
|   /* mode is LIO_NOWAIT so return some kind of answer immediately */
 | |
|   if (mode == LIO_NOWAIT)
 | |
|     {
 | |
|       /* At least one AIO has been launched or queued */
 | |
|       if (aiocount)
 | |
|         return 0;
 | |
| 
 | |
|       /* No AIOs have been launched or queued */
 | |
|       set_errno (EAGAIN);
 | |
|       return -1;
 | |
|     }
 | |
| 
 | |
|   /* Else mode is LIO_WAIT so wait for all AIOs to complete or error */
 | |
|   while (nent)
 | |
|     {
 | |
|       int i = aiosuspend ((const struct aiocb *const *) aiolist, nent, NULL);
 | |
|       if (i >= nent)
 | |
|         break;
 | |
|       else
 | |
|         aiolist[i]->aio_liocb = NULL; /* Avoids repeating notify on this AIO */
 | |
|     }
 | |
| 
 | |
|   return 0;
 | |
| }
 | |
| 
 | |
| #ifdef __cplusplus
 | |
| }
 | |
| #endif
 |