...

Source file src/runtime/netpoll_aix.go

Documentation: runtime

     1  // Copyright 2018 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package runtime
     6  
     7  import (
     8  	"internal/runtime/atomic"
     9  	"unsafe"
    10  )
    11  
    12  // This is based on the former libgo/runtime/netpoll_select.c implementation
    13  // except that it uses poll instead of select and is written in Go.
    14  // It's also based on Solaris implementation for the arming mechanisms
    15  
    16  //go:cgo_import_dynamic libc_poll poll "libc.a/shr_64.o"
    17  //go:linkname libc_poll libc_poll
    18  
    19  var libc_poll libFunc
    20  
    21  //go:nosplit
    22  func poll(pfds *pollfd, npfds uintptr, timeout uintptr) (int32, int32) {
    23  	r, err := syscall3(&libc_poll, uintptr(unsafe.Pointer(pfds)), npfds, timeout)
    24  	return int32(r), int32(err)
    25  }
    26  
    27  // pollfd represents the poll structure for AIX operating system.
    28  type pollfd struct {
    29  	fd      int32
    30  	events  int16
    31  	revents int16
    32  }
    33  
    34  const _POLLIN = 0x0001
    35  const _POLLOUT = 0x0002
    36  const _POLLHUP = 0x2000
    37  const _POLLERR = 0x4000
    38  
    39  var (
    40  	pfds           []pollfd
    41  	pds            []*pollDesc
    42  	mtxpoll        mutex
    43  	mtxset         mutex
    44  	rdwake         int32
    45  	wrwake         int32
    46  	pendingUpdates int32
    47  
    48  	netpollWakeSig atomic.Uint32 // used to avoid duplicate calls of netpollBreak
    49  )
    50  
    51  func netpollinit() {
    52  	// Create the pipe we use to wakeup poll.
    53  	r, w, errno := nonblockingPipe()
    54  	if errno != 0 {
    55  		throw("netpollinit: failed to create pipe")
    56  	}
    57  	rdwake = r
    58  	wrwake = w
    59  
    60  	// Pre-allocate array of pollfd structures for poll.
    61  	pfds = make([]pollfd, 1, 128)
    62  
    63  	// Poll the read side of the pipe.
    64  	pfds[0].fd = rdwake
    65  	pfds[0].events = _POLLIN
    66  
    67  	pds = make([]*pollDesc, 1, 128)
    68  	pds[0] = nil
    69  }
    70  
    71  func netpollIsPollDescriptor(fd uintptr) bool {
    72  	return fd == uintptr(rdwake) || fd == uintptr(wrwake)
    73  }
    74  
    75  // netpollwakeup writes on wrwake to wakeup poll before any changes.
    76  func netpollwakeup() {
    77  	if pendingUpdates == 0 {
    78  		pendingUpdates = 1
    79  		b := [1]byte{0}
    80  		write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
    81  	}
    82  }
    83  
    84  func netpollopen(fd uintptr, pd *pollDesc) int32 {
    85  	lock(&mtxpoll)
    86  	netpollwakeup()
    87  
    88  	lock(&mtxset)
    89  	unlock(&mtxpoll)
    90  
    91  	// We don't worry about pd.fdseq here,
    92  	// as mtxset protects us from stale pollDescs.
    93  
    94  	pd.user = uint32(len(pfds))
    95  	pfds = append(pfds, pollfd{fd: int32(fd)})
    96  	pds = append(pds, pd)
    97  	unlock(&mtxset)
    98  	return 0
    99  }
   100  
   101  func netpollclose(fd uintptr) int32 {
   102  	lock(&mtxpoll)
   103  	netpollwakeup()
   104  
   105  	lock(&mtxset)
   106  	unlock(&mtxpoll)
   107  
   108  	for i := 0; i < len(pfds); i++ {
   109  		if pfds[i].fd == int32(fd) {
   110  			pfds[i] = pfds[len(pfds)-1]
   111  			pfds = pfds[:len(pfds)-1]
   112  
   113  			pds[i] = pds[len(pds)-1]
   114  			pds[i].user = uint32(i)
   115  			pds = pds[:len(pds)-1]
   116  			break
   117  		}
   118  	}
   119  	unlock(&mtxset)
   120  	return 0
   121  }
   122  
   123  func netpollarm(pd *pollDesc, mode int) {
   124  	lock(&mtxpoll)
   125  	netpollwakeup()
   126  
   127  	lock(&mtxset)
   128  	unlock(&mtxpoll)
   129  
   130  	switch mode {
   131  	case 'r':
   132  		pfds[pd.user].events |= _POLLIN
   133  	case 'w':
   134  		pfds[pd.user].events |= _POLLOUT
   135  	}
   136  	unlock(&mtxset)
   137  }
   138  
   139  // netpollBreak interrupts a poll.
   140  func netpollBreak() {
   141  	// Failing to cas indicates there is an in-flight wakeup, so we're done here.
   142  	if !netpollWakeSig.CompareAndSwap(0, 1) {
   143  		return
   144  	}
   145  
   146  	b := [1]byte{0}
   147  	write(uintptr(wrwake), unsafe.Pointer(&b[0]), 1)
   148  }
   149  
   150  // netpoll checks for ready network connections.
   151  // Returns list of goroutines that become runnable.
   152  // delay < 0: blocks indefinitely
   153  // delay == 0: does not block, just polls
   154  // delay > 0: block for up to that many nanoseconds
   155  //
   156  //go:nowritebarrierrec
   157  func netpoll(delay int64) (gList, int32) {
   158  	var timeout uintptr
   159  	if delay < 0 {
   160  		timeout = ^uintptr(0)
   161  	} else if delay == 0 {
   162  		// TODO: call poll with timeout == 0
   163  		return gList{}, 0
   164  	} else if delay < 1e6 {
   165  		timeout = 1
   166  	} else if delay < 1e15 {
   167  		timeout = uintptr(delay / 1e6)
   168  	} else {
   169  		// An arbitrary cap on how long to wait for a timer.
   170  		// 1e9 ms == ~11.5 days.
   171  		timeout = 1e9
   172  	}
   173  retry:
   174  	lock(&mtxpoll)
   175  	lock(&mtxset)
   176  	pendingUpdates = 0
   177  	unlock(&mtxpoll)
   178  
   179  	n, e := poll(&pfds[0], uintptr(len(pfds)), timeout)
   180  	if n < 0 {
   181  		if e != _EINTR {
   182  			println("errno=", e, " len(pfds)=", len(pfds))
   183  			throw("poll failed")
   184  		}
   185  		unlock(&mtxset)
   186  		// If a timed sleep was interrupted, just return to
   187  		// recalculate how long we should sleep now.
   188  		if timeout > 0 {
   189  			return gList{}, 0
   190  		}
   191  		goto retry
   192  	}
   193  	// Check if some descriptors need to be changed
   194  	if n != 0 && pfds[0].revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
   195  		if delay != 0 {
   196  			// A netpollwakeup could be picked up by a
   197  			// non-blocking poll. Only clear the wakeup
   198  			// if blocking.
   199  			var b [1]byte
   200  			for read(rdwake, unsafe.Pointer(&b[0]), 1) == 1 {
   201  			}
   202  			netpollWakeSig.Store(0)
   203  		}
   204  		// Still look at the other fds even if the mode may have
   205  		// changed, as netpollBreak might have been called.
   206  		n--
   207  	}
   208  	var toRun gList
   209  	delta := int32(0)
   210  	for i := 1; i < len(pfds) && n > 0; i++ {
   211  		pfd := &pfds[i]
   212  
   213  		var mode int32
   214  		if pfd.revents&(_POLLIN|_POLLHUP|_POLLERR) != 0 {
   215  			mode += 'r'
   216  			pfd.events &= ^_POLLIN
   217  		}
   218  		if pfd.revents&(_POLLOUT|_POLLHUP|_POLLERR) != 0 {
   219  			mode += 'w'
   220  			pfd.events &= ^_POLLOUT
   221  		}
   222  		if mode != 0 {
   223  			pds[i].setEventErr(pfd.revents == _POLLERR, 0)
   224  			delta += netpollready(&toRun, pds[i], mode)
   225  			n--
   226  		}
   227  	}
   228  	unlock(&mtxset)
   229  	return toRun, delta
   230  }
   231  

View as plain text