Cygwin: pipe: Stop counting reader and read all available data.

- By guarding read with read_mtx, no more than one ReadFile can
  be called simultaneously. So couting read handles is no longer
  necessary.
- Make raw_read code as similar as possible to raw_write code.
This commit is contained in:
Takashi Yano 2021-09-06 20:12:16 +09:00 committed by Corinna Vinschen
parent 085fc12948
commit fadbedd9ca
1 changed files with 60 additions and 40 deletions

View File

@ -221,12 +221,10 @@ fhandler_pipe::get_proc_fd_name (char *buf)
void __reg3 void __reg3
fhandler_pipe::raw_read (void *ptr, size_t& len) fhandler_pipe::raw_read (void *ptr, size_t& len)
{ {
NTSTATUS status; size_t nbytes = 0;
NTSTATUS status = STATUS_SUCCESS;
IO_STATUS_BLOCK io; IO_STATUS_BLOCK io;
HANDLE evt = NULL; HANDLE evt = NULL;
DWORD waitret = WAIT_OBJECT_0;
bool keep_looping = false;
size_t orig_len = len;
if (!len) if (!len)
return; return;
@ -239,43 +237,59 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
return; return;
} }
do DWORD timeout = is_nonblocking () ? 0 : INFINITE;
DWORD waitret = cygwait (read_mtx, timeout);
switch (waitret)
{ {
len = orig_len; case WAIT_OBJECT_0:
keep_looping = false; break;
case WAIT_TIMEOUT:
set_errno (EAGAIN);
len = (size_t) -1;
return;
default:
set_errno (EINTR);
len = (size_t) -1;
return;
}
while (nbytes < len)
{
ULONG_PTR nbytes_now = 0;
size_t left = len - nbytes;
ULONG len1 = (ULONG) left;
waitret = WAIT_OBJECT_0;
if (evt) if (evt)
ResetEvent (evt); ResetEvent (evt);
if (!is_nonblocking ()) if (!is_nonblocking ())
{ {
FILE_PIPE_LOCAL_INFORMATION fpli; FILE_PIPE_LOCAL_INFORMATION fpli;
ULONG reader_count;
ULONG max_len = 64;
WaitForSingleObject (read_mtx, INFINITE); /* If the pipe is empty, don't request more bytes than pipe
buffer size - 1. Pending read lowers WriteQuotaAvailable on
/* If the pipe is empty, don't request more bytes than half the the write side and thus affects select's ability to return
pipe buffer size. Every pending read lowers WriteQuotaAvailable more or less reliable info whether a write succeeds or not. */
on the write side and thus affects select's ability to return ULONG chunk = max_atomic_write - 1;
more or less reliable info whether a write succeeds or not.
Let the size of the request depend on the number of readers
at the time. */
status = NtQueryInformationFile (get_handle (), &io, status = NtQueryInformationFile (get_handle (), &io,
&fpli, sizeof (fpli), &fpli, sizeof (fpli),
FilePipeLocalInformation); FilePipeLocalInformation);
if (NT_SUCCESS (status) && fpli.ReadDataAvailable == 0) if (NT_SUCCESS (status))
{ {
reader_count = get_obj_handle_count (get_handle ()); if (fpli.ReadDataAvailable > 0)
if (reader_count < 10) chunk = left;
max_len = fpli.InboundQuota / (2 * reader_count); else if (nbytes != 0)
if (len > max_len) break;
len = max_len; else
chunk = fpli.InboundQuota - 1;
} }
else if (nbytes != 0)
break;
if (len1 > chunk)
len1 = chunk;
} }
status = NtReadFile (get_handle (), evt, NULL, NULL, &io, ptr, status = NtReadFile (get_handle (), evt, NULL, NULL, &io, ptr,
len, NULL, NULL); len1, NULL, NULL);
if (!is_nonblocking ())
ReleaseMutex (read_mtx);
if (evt && status == STATUS_PENDING) if (evt && status == STATUS_PENDING)
{ {
waitret = cygwait (evt); waitret = cygwait (evt);
@ -292,13 +306,13 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
set_errno (EBADF); set_errno (EBADF);
else else
__seterrno (); __seterrno ();
len = (size_t) -1; nbytes = (size_t) -1;
} }
else if (NT_SUCCESS (status)) else if (NT_SUCCESS (status))
{ {
len = io.Information; nbytes_now = io.Information;
if (len == 0) ptr = ((char *) ptr) + nbytes_now;
keep_looping = true; nbytes += nbytes_now;
} }
else else
{ {
@ -308,40 +322,46 @@ fhandler_pipe::raw_read (void *ptr, size_t& len)
case STATUS_END_OF_FILE: case STATUS_END_OF_FILE:
case STATUS_PIPE_BROKEN: case STATUS_PIPE_BROKEN:
/* This is really EOF. */ /* This is really EOF. */
len = 0;
break; break;
case STATUS_MORE_ENTRIES: case STATUS_MORE_ENTRIES:
case STATUS_BUFFER_OVERFLOW: case STATUS_BUFFER_OVERFLOW:
/* `io.Information' is supposedly valid. */ /* `io.Information' is supposedly valid. */
len = io.Information; nbytes_now = io.Information;
if (len == 0) ptr = ((char *) ptr) + nbytes_now;
keep_looping = true; nbytes += nbytes_now;
break; break;
case STATUS_PIPE_LISTENING: case STATUS_PIPE_LISTENING:
case STATUS_PIPE_EMPTY: case STATUS_PIPE_EMPTY:
if (nbytes != 0)
break;
if (is_nonblocking ()) if (is_nonblocking ())
{ {
set_errno (EAGAIN); set_errno (EAGAIN);
len = (size_t) -1; nbytes = (size_t) -1;
break; break;
} }
fallthrough; fallthrough;
default: default:
__seterrno_from_nt_status (status); __seterrno_from_nt_status (status);
len = (size_t) -1; nbytes = (size_t) -1;
break; break;
} }
} }
} while (keep_looping);
if (nbytes_now == 0)
break;
}
ReleaseMutex (read_mtx);
if (evt) if (evt)
CloseHandle (evt); CloseHandle (evt);
if (status == STATUS_THREAD_SIGNALED) if (status == STATUS_THREAD_SIGNALED && nbytes == 0)
{ {
set_errno (EINTR); set_errno (EINTR);
len = (size_t) -1; nbytes = (size_t) -1;
} }
else if (status == STATUS_THREAD_CANCELED) else if (status == STATUS_THREAD_CANCELED)
pthread::static_cancel_self (); pthread::static_cancel_self ();
len = nbytes;
} }
ssize_t __reg3 ssize_t __reg3