...

Source file src/internal/poll/splice_linux.go

Documentation: internal/poll

     1  // Copyright 2018 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package poll
     6  
     7  import (
     8  	"internal/syscall/unix"
     9  	"runtime"
    10  	"sync"
    11  	"syscall"
    12  	"unsafe"
    13  )
    14  
    15  const (
    16  	// spliceNonblock doesn't make the splice itself necessarily nonblocking
    17  	// (because the actual file descriptors that are spliced from/to may block
    18  	// unless they have the O_NONBLOCK flag set), but it makes the splice pipe
    19  	// operations nonblocking.
    20  	spliceNonblock = 0x2
    21  
    22  	// maxSpliceSize is the maximum amount of data Splice asks
    23  	// the kernel to move in a single call to splice(2).
    24  	// We use 1MB as Splice writes data through a pipe, and 1MB is the default maximum pipe buffer size,
    25  	// which is determined by /proc/sys/fs/pipe-max-size.
    26  	maxSpliceSize = 1 << 20
    27  )
    28  
    29  // Splice transfers at most remain bytes of data from src to dst, using the
    30  // splice system call to minimize copies of data from and to userspace.
    31  //
    32  // Splice gets a pipe buffer from the pool or creates a new one if needed, to serve as a buffer for the data transfer.
    33  // src and dst must both be stream-oriented sockets.
    34  func Splice(dst, src *FD, remain int64) (written int64, handled bool, err error) {
    35  	p, err := getPipe()
    36  	if err != nil {
    37  		return 0, false, err
    38  	}
    39  	defer putPipe(p)
    40  	var inPipe, n int
    41  	for err == nil && remain > 0 {
    42  		max := maxSpliceSize
    43  		if int64(max) > remain {
    44  			max = int(remain)
    45  		}
    46  		inPipe, err = spliceDrain(p.wfd, src, max)
    47  		// The operation is considered handled if splice returns no
    48  		// error, or an error other than EINVAL. An EINVAL means the
    49  		// kernel does not support splice for the socket type of src.
    50  		// The failed syscall does not consume any data so it is safe
    51  		// to fall back to a generic copy.
    52  		//
    53  		// spliceDrain should never return EAGAIN, so if err != nil,
    54  		// Splice cannot continue.
    55  		//
    56  		// If inPipe == 0 && err == nil, src is at EOF, and the
    57  		// transfer is complete.
    58  		handled = handled || (err != syscall.EINVAL)
    59  		if err != nil || inPipe == 0 {
    60  			break
    61  		}
    62  		p.data += inPipe
    63  
    64  		n, err = splicePump(dst, p.rfd, inPipe)
    65  		if n > 0 {
    66  			written += int64(n)
    67  			remain -= int64(n)
    68  			p.data -= n
    69  		}
    70  	}
    71  	if err != nil {
    72  		return written, handled, err
    73  	}
    74  	return written, true, nil
    75  }
    76  
    77  // spliceDrain moves data from a socket to a pipe.
    78  //
    79  // Invariant: when entering spliceDrain, the pipe is empty. It is either in its
    80  // initial state, or splicePump has emptied it previously.
    81  //
    82  // Given this, spliceDrain can reasonably assume that the pipe is ready for
    83  // writing, so if splice returns EAGAIN, it must be because the socket is not
    84  // ready for reading.
    85  //
    86  // If spliceDrain returns (0, nil), src is at EOF.
    87  func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
    88  	if err := sock.readLock(); err != nil {
    89  		return 0, err
    90  	}
    91  	defer sock.readUnlock()
    92  	if err := sock.pd.prepareRead(sock.isFile); err != nil {
    93  		return 0, err
    94  	}
    95  	for {
    96  		// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
    97  		// because it could return EAGAIN ceaselessly when the write end of the pipe is full,
    98  		// but this shouldn't be a concern here, since the pipe buffer must be sufficient for
    99  		// this data transmission on the basis of the workflow in Splice.
   100  		n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
   101  		if err == syscall.EINTR {
   102  			continue
   103  		}
   104  		if err != syscall.EAGAIN {
   105  			return n, err
   106  		}
   107  		if sock.pd.pollable() {
   108  			if err := sock.pd.waitRead(sock.isFile); err != nil {
   109  				return n, err
   110  			}
   111  		}
   112  	}
   113  }
   114  
   115  // splicePump moves all the buffered data from a pipe to a socket.
   116  //
   117  // Invariant: when entering splicePump, there are exactly inPipe
   118  // bytes of data in the pipe, from a previous call to spliceDrain.
   119  //
   120  // By analogy to the condition from spliceDrain, splicePump
   121  // only needs to poll the socket for readiness, if splice returns
   122  // EAGAIN.
   123  //
   124  // If splicePump cannot move all the data in a single call to
   125  // splice(2), it loops over the buffered data until it has written
   126  // all of it to the socket. This behavior is similar to the Write
   127  // step of an io.Copy in userspace.
   128  func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
   129  	if err := sock.writeLock(); err != nil {
   130  		return 0, err
   131  	}
   132  	defer sock.writeUnlock()
   133  	if err := sock.pd.prepareWrite(sock.isFile); err != nil {
   134  		return 0, err
   135  	}
   136  	written := 0
   137  	for inPipe > 0 {
   138  		// In theory calling splice(2) with SPLICE_F_NONBLOCK could end up an infinite loop here,
   139  		// because it could return EAGAIN ceaselessly when the read end of the pipe is empty,
   140  		// but this shouldn't be a concern here, since the pipe buffer must contain inPipe size of
   141  		// data on the basis of the workflow in Splice.
   142  		n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
   143  		if err == syscall.EINTR {
   144  			continue
   145  		}
   146  		// Here, the condition n == 0 && err == nil should never be
   147  		// observed, since Splice controls the write side of the pipe.
   148  		if n > 0 {
   149  			inPipe -= n
   150  			written += n
   151  			continue
   152  		}
   153  		if err != syscall.EAGAIN {
   154  			return written, err
   155  		}
   156  		if sock.pd.pollable() {
   157  			if err := sock.pd.waitWrite(sock.isFile); err != nil {
   158  				return written, err
   159  			}
   160  		}
   161  	}
   162  	return written, nil
   163  }
   164  
   165  // splice wraps the splice system call. Since the current implementation
   166  // only uses splice on sockets and pipes, the offset arguments are unused.
   167  // splice returns int instead of int64, because callers never ask it to
   168  // move more data in a single call than can fit in an int32.
   169  func splice(out int, in int, max int, flags int) (int, error) {
   170  	n, err := syscall.Splice(in, nil, out, nil, max, flags)
   171  	return int(n), err
   172  }
   173  
   174  type splicePipeFields struct {
   175  	rfd  int
   176  	wfd  int
   177  	data int
   178  }
   179  
   180  type splicePipe struct {
   181  	splicePipeFields
   182  
   183  	// We want to use a finalizer, so ensure that the size is
   184  	// large enough to not use the tiny allocator.
   185  	_ [24 - unsafe.Sizeof(splicePipeFields{})%24]byte
   186  }
   187  
   188  // splicePipePool caches pipes to avoid high-frequency construction and destruction of pipe buffers.
   189  // The garbage collector will free all pipes in the sync.Pool periodically, thus we need to set up
   190  // a finalizer for each pipe to close its file descriptors before the actual GC.
   191  var splicePipePool = sync.Pool{New: newPoolPipe}
   192  
   193  func newPoolPipe() any {
   194  	// Discard the error which occurred during the creation of pipe buffer,
   195  	// redirecting the data transmission to the conventional way utilizing read() + write() as a fallback.
   196  	p := newPipe()
   197  	if p == nil {
   198  		return nil
   199  	}
   200  	runtime.SetFinalizer(p, destroyPipe)
   201  	return p
   202  }
   203  
   204  // getPipe tries to acquire a pipe buffer from the pool or create a new one with newPipe() if it gets nil from the cache.
   205  func getPipe() (*splicePipe, error) {
   206  	v := splicePipePool.Get()
   207  	if v == nil {
   208  		return nil, syscall.EINVAL
   209  	}
   210  	return v.(*splicePipe), nil
   211  }
   212  
   213  func putPipe(p *splicePipe) {
   214  	// If there is still data left in the pipe,
   215  	// then close and discard it instead of putting it back into the pool.
   216  	if p.data != 0 {
   217  		runtime.SetFinalizer(p, nil)
   218  		destroyPipe(p)
   219  		return
   220  	}
   221  	splicePipePool.Put(p)
   222  }
   223  
   224  // newPipe sets up a pipe for a splice operation.
   225  func newPipe() *splicePipe {
   226  	var fds [2]int
   227  	if err := syscall.Pipe2(fds[:], syscall.O_CLOEXEC|syscall.O_NONBLOCK); err != nil {
   228  		return nil
   229  	}
   230  
   231  	// Splice will loop writing maxSpliceSize bytes from the source to the pipe,
   232  	// and then write those bytes from the pipe to the destination.
   233  	// Set the pipe buffer size to maxSpliceSize to optimize that.
   234  	// Ignore errors here, as a smaller buffer size will work,
   235  	// although it will require more system calls.
   236  	unix.Fcntl(fds[0], syscall.F_SETPIPE_SZ, maxSpliceSize)
   237  
   238  	return &splicePipe{splicePipeFields: splicePipeFields{rfd: fds[0], wfd: fds[1]}}
   239  }
   240  
   241  // destroyPipe destroys a pipe.
   242  func destroyPipe(p *splicePipe) {
   243  	CloseFunc(p.rfd)
   244  	CloseFunc(p.wfd)
   245  }
   246  

View as plain text