...

Source file src/sync/pool.go

Documentation: sync

     1  // Copyright 2013 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package sync
     6  
     7  import (
     8  	"internal/race"
     9  	"runtime"
    10  	"sync/atomic"
    11  	"unsafe"
    12  )
    13  
    14  // A Pool is a set of temporary objects that may be individually saved and
    15  // retrieved.
    16  //
    17  // Any item stored in the Pool may be removed automatically at any time without
    18  // notification. If the Pool holds the only reference when this happens, the
    19  // item might be deallocated.
    20  //
    21  // A Pool is safe for use by multiple goroutines simultaneously.
    22  //
    23  // Pool's purpose is to cache allocated but unused items for later reuse,
    24  // relieving pressure on the garbage collector. That is, it makes it easy to
    25  // build efficient, thread-safe free lists. However, it is not suitable for all
    26  // free lists.
    27  //
    28  // An appropriate use of a Pool is to manage a group of temporary items
    29  // silently shared among and potentially reused by concurrent independent
    30  // clients of a package. Pool provides a way to amortize allocation overhead
    31  // across many clients.
    32  //
    33  // An example of good use of a Pool is in the fmt package, which maintains a
    34  // dynamically-sized store of temporary output buffers. The store scales under
    35  // load (when many goroutines are actively printing) and shrinks when
    36  // quiescent.
    37  //
    38  // On the other hand, a free list maintained as part of a short-lived object is
    39  // not a suitable use for a Pool, since the overhead does not amortize well in
    40  // that scenario. It is more efficient to have such objects implement their own
    41  // free list.
    42  //
    43  // A Pool must not be copied after first use.
    44  //
    45  // In the terminology of [the Go memory model], a call to Put(x) “synchronizes before”
    46  // a call to [Pool.Get] returning that same value x.
    47  // Similarly, a call to New returning x “synchronizes before”
    48  // a call to Get returning that same value x.
    49  //
    50  // [the Go memory model]: https://go.dev/ref/mem
    51  type Pool struct {
    52  	noCopy noCopy
    53  
    54  	local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    55  	localSize uintptr        // size of the local array
    56  
    57  	victim     unsafe.Pointer // local from previous cycle
    58  	victimSize uintptr        // size of victims array
    59  
    60  	// New optionally specifies a function to generate
    61  	// a value when Get would otherwise return nil.
    62  	// It may not be changed concurrently with calls to Get.
    63  	New func() any
    64  }
    65  
    66  // Local per-P Pool appendix.
    67  type poolLocalInternal struct {
    68  	private any       // Can be used only by the respective P.
    69  	shared  poolChain // Local P can pushHead/popHead; any P can popTail.
    70  }
    71  
    72  type poolLocal struct {
    73  	poolLocalInternal
    74  
    75  	// Prevents false sharing on widespread platforms with
    76  	// 128 mod (cache line size) = 0 .
    77  	pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
    78  }
    79  
    80  // from runtime
    81  //
    82  //go:linkname runtime_randn runtime.randn
    83  func runtime_randn(n uint32) uint32
    84  
    85  var poolRaceHash [128]uint64
    86  
    87  // poolRaceAddr returns an address to use as the synchronization point
    88  // for race detector logic. We don't use the actual pointer stored in x
    89  // directly, for fear of conflicting with other synchronization on that address.
    90  // Instead, we hash the pointer to get an index into poolRaceHash.
    91  // See discussion on golang.org/cl/31589.
    92  func poolRaceAddr(x any) unsafe.Pointer {
    93  	ptr := uintptr((*[2]unsafe.Pointer)(unsafe.Pointer(&x))[1])
    94  	h := uint32((uint64(uint32(ptr)) * 0x85ebca6b) >> 16)
    95  	return unsafe.Pointer(&poolRaceHash[h%uint32(len(poolRaceHash))])
    96  }
    97  
    98  // Put adds x to the pool.
    99  func (p *Pool) Put(x any) {
   100  	if x == nil {
   101  		return
   102  	}
   103  	if race.Enabled {
   104  		if runtime_randn(4) == 0 {
   105  			// Randomly drop x on floor.
   106  			return
   107  		}
   108  		race.ReleaseMerge(poolRaceAddr(x))
   109  		race.Disable()
   110  	}
   111  	l, _ := p.pin()
   112  	if l.private == nil {
   113  		l.private = x
   114  	} else {
   115  		l.shared.pushHead(x)
   116  	}
   117  	runtime_procUnpin()
   118  	if race.Enabled {
   119  		race.Enable()
   120  	}
   121  }
   122  
   123  // Get selects an arbitrary item from the [Pool], removes it from the
   124  // Pool, and returns it to the caller.
   125  // Get may choose to ignore the pool and treat it as empty.
   126  // Callers should not assume any relation between values passed to [Pool.Put] and
   127  // the values returned by Get.
   128  //
   129  // If Get would otherwise return nil and p.New is non-nil, Get returns
   130  // the result of calling p.New.
   131  func (p *Pool) Get() any {
   132  	if race.Enabled {
   133  		race.Disable()
   134  	}
   135  	l, pid := p.pin()
   136  	x := l.private
   137  	l.private = nil
   138  	if x == nil {
   139  		// Try to pop the head of the local shard. We prefer
   140  		// the head over the tail for temporal locality of
   141  		// reuse.
   142  		x, _ = l.shared.popHead()
   143  		if x == nil {
   144  			x = p.getSlow(pid)
   145  		}
   146  	}
   147  	runtime_procUnpin()
   148  	if race.Enabled {
   149  		race.Enable()
   150  		if x != nil {
   151  			race.Acquire(poolRaceAddr(x))
   152  		}
   153  	}
   154  	if x == nil && p.New != nil {
   155  		x = p.New()
   156  	}
   157  	return x
   158  }
   159  
   160  func (p *Pool) getSlow(pid int) any {
   161  	// See the comment in pin regarding ordering of the loads.
   162  	size := runtime_LoadAcquintptr(&p.localSize) // load-acquire
   163  	locals := p.local                            // load-consume
   164  	// Try to steal one element from other procs.
   165  	for i := 0; i < int(size); i++ {
   166  		l := indexLocal(locals, (pid+i+1)%int(size))
   167  		if x, _ := l.shared.popTail(); x != nil {
   168  			return x
   169  		}
   170  	}
   171  
   172  	// Try the victim cache. We do this after attempting to steal
   173  	// from all primary caches because we want objects in the
   174  	// victim cache to age out if at all possible.
   175  	size = atomic.LoadUintptr(&p.victimSize)
   176  	if uintptr(pid) >= size {
   177  		return nil
   178  	}
   179  	locals = p.victim
   180  	l := indexLocal(locals, pid)
   181  	if x := l.private; x != nil {
   182  		l.private = nil
   183  		return x
   184  	}
   185  	for i := 0; i < int(size); i++ {
   186  		l := indexLocal(locals, (pid+i)%int(size))
   187  		if x, _ := l.shared.popTail(); x != nil {
   188  			return x
   189  		}
   190  	}
   191  
   192  	// Mark the victim cache as empty for future gets don't bother
   193  	// with it.
   194  	atomic.StoreUintptr(&p.victimSize, 0)
   195  
   196  	return nil
   197  }
   198  
   199  // pin pins the current goroutine to P, disables preemption and
   200  // returns poolLocal pool for the P and the P's id.
   201  // Caller must call runtime_procUnpin() when done with the pool.
   202  func (p *Pool) pin() (*poolLocal, int) {
   203  	// Check whether p is nil to get a panic.
   204  	// Otherwise the nil dereference happens while the m is pinned,
   205  	// causing a fatal error rather than a panic.
   206  	if p == nil {
   207  		panic("nil Pool")
   208  	}
   209  
   210  	pid := runtime_procPin()
   211  	// In pinSlow we store to local and then to localSize, here we load in opposite order.
   212  	// Since we've disabled preemption, GC cannot happen in between.
   213  	// Thus here we must observe local at least as large localSize.
   214  	// We can observe a newer/larger local, it is fine (we must observe its zero-initialized-ness).
   215  	s := runtime_LoadAcquintptr(&p.localSize) // load-acquire
   216  	l := p.local                              // load-consume
   217  	if uintptr(pid) < s {
   218  		return indexLocal(l, pid), pid
   219  	}
   220  	return p.pinSlow()
   221  }
   222  
   223  func (p *Pool) pinSlow() (*poolLocal, int) {
   224  	// Retry under the mutex.
   225  	// Can not lock the mutex while pinned.
   226  	runtime_procUnpin()
   227  	allPoolsMu.Lock()
   228  	defer allPoolsMu.Unlock()
   229  	pid := runtime_procPin()
   230  	// poolCleanup won't be called while we are pinned.
   231  	s := p.localSize
   232  	l := p.local
   233  	if uintptr(pid) < s {
   234  		return indexLocal(l, pid), pid
   235  	}
   236  	if p.local == nil {
   237  		allPools = append(allPools, p)
   238  	}
   239  	// If GOMAXPROCS changes between GCs, we re-allocate the array and lose the old one.
   240  	size := runtime.GOMAXPROCS(0)
   241  	local := make([]poolLocal, size)
   242  	atomic.StorePointer(&p.local, unsafe.Pointer(&local[0])) // store-release
   243  	runtime_StoreReluintptr(&p.localSize, uintptr(size))     // store-release
   244  	return &local[pid], pid
   245  }
   246  
   247  // poolCleanup should be an internal detail,
   248  // but widely used packages access it using linkname.
   249  // Notable members of the hall of shame include:
   250  //   - github.com/bytedance/gopkg
   251  //   - github.com/songzhibin97/gkit
   252  //
   253  // Do not remove or change the type signature.
   254  // See go.dev/issue/67401.
   255  //
   256  //go:linkname poolCleanup
   257  func poolCleanup() {
   258  	// This function is called with the world stopped, at the beginning of a garbage collection.
   259  	// It must not allocate and probably should not call any runtime functions.
   260  
   261  	// Because the world is stopped, no pool user can be in a
   262  	// pinned section (in effect, this has all Ps pinned).
   263  
   264  	// Drop victim caches from all pools.
   265  	for _, p := range oldPools {
   266  		p.victim = nil
   267  		p.victimSize = 0
   268  	}
   269  
   270  	// Move primary cache to victim cache.
   271  	for _, p := range allPools {
   272  		p.victim = p.local
   273  		p.victimSize = p.localSize
   274  		p.local = nil
   275  		p.localSize = 0
   276  	}
   277  
   278  	// The pools with non-empty primary caches now have non-empty
   279  	// victim caches and no pools have primary caches.
   280  	oldPools, allPools = allPools, nil
   281  }
   282  
   283  var (
   284  	allPoolsMu Mutex
   285  
   286  	// allPools is the set of pools that have non-empty primary
   287  	// caches. Protected by either 1) allPoolsMu and pinning or 2)
   288  	// STW.
   289  	allPools []*Pool
   290  
   291  	// oldPools is the set of pools that may have non-empty victim
   292  	// caches. Protected by STW.
   293  	oldPools []*Pool
   294  )
   295  
   296  func init() {
   297  	runtime_registerPoolCleanup(poolCleanup)
   298  }
   299  
   300  func indexLocal(l unsafe.Pointer, i int) *poolLocal {
   301  	lp := unsafe.Pointer(uintptr(l) + uintptr(i)*unsafe.Sizeof(poolLocal{}))
   302  	return (*poolLocal)(lp)
   303  }
   304  
   305  // Implemented in runtime.
   306  func runtime_registerPoolCleanup(cleanup func())
   307  func runtime_procPin() int
   308  func runtime_procUnpin()
   309  
   310  // The below are implemented in internal/runtime/atomic and the
   311  // compiler also knows to intrinsify the symbol we linkname into this
   312  // package.
   313  
   314  //go:linkname runtime_LoadAcquintptr internal/runtime/atomic.LoadAcquintptr
   315  func runtime_LoadAcquintptr(ptr *uintptr) uintptr
   316  
   317  //go:linkname runtime_StoreReluintptr internal/runtime/atomic.StoreReluintptr
   318  func runtime_StoreReluintptr(ptr *uintptr, val uintptr) uintptr
   319  

View as plain text