...

Source file src/sync/waitgroup.go

Documentation: sync

     1  // Copyright 2011 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package sync
     6  
     7  import (
     8  	"internal/race"
     9  	"sync/atomic"
    10  	"unsafe"
    11  )
    12  
    13  // A WaitGroup waits for a collection of goroutines to finish.
    14  // The main goroutine calls Add to set the number of
    15  // goroutines to wait for. Then each of the goroutines
    16  // runs and calls Done when finished. At the same time,
    17  // Wait can be used to block until all goroutines have finished.
    18  //
    19  // A WaitGroup must not be copied after first use.
    20  //
    21  // In the terminology of the Go memory model, a call to Done
    22  // “synchronizes before” the return of any Wait call that it unblocks.
    23  type WaitGroup struct {
    24  	noCopy noCopy
    25  
    26  	state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
    27  	sema  uint32
    28  }
    29  
    30  // Add adds delta, which may be negative, to the WaitGroup counter.
    31  // If the counter becomes zero, all goroutines blocked on Wait are released.
    32  // If the counter goes negative, Add panics.
    33  //
    34  // Note that calls with a positive delta that occur when the counter is zero
    35  // must happen before a Wait. Calls with a negative delta, or calls with a
    36  // positive delta that start when the counter is greater than zero, may happen
    37  // at any time.
    38  // Typically this means the calls to Add should execute before the statement
    39  // creating the goroutine or other event to be waited for.
    40  // If a WaitGroup is reused to wait for several independent sets of events,
    41  // new Add calls must happen after all previous Wait calls have returned.
    42  // See the WaitGroup example.
    43  func (wg *WaitGroup) Add(delta int) {
    44  	if race.Enabled {
    45  		if delta < 0 {
    46  			// Synchronize decrements with Wait.
    47  			race.ReleaseMerge(unsafe.Pointer(wg))
    48  		}
    49  		race.Disable()
    50  		defer race.Enable()
    51  	}
    52  	state := wg.state.Add(uint64(delta) << 32)
    53  	v := int32(state >> 32)
    54  	w := uint32(state)
    55  	if race.Enabled && delta > 0 && v == int32(delta) {
    56  		// The first increment must be synchronized with Wait.
    57  		// Need to model this as a read, because there can be
    58  		// several concurrent wg.counter transitions from 0.
    59  		race.Read(unsafe.Pointer(&wg.sema))
    60  	}
    61  	if v < 0 {
    62  		panic("sync: negative WaitGroup counter")
    63  	}
    64  	if w != 0 && delta > 0 && v == int32(delta) {
    65  		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    66  	}
    67  	if v > 0 || w == 0 {
    68  		return
    69  	}
    70  	// This goroutine has set counter to 0 when waiters > 0.
    71  	// Now there can't be concurrent mutations of state:
    72  	// - Adds must not happen concurrently with Wait,
    73  	// - Wait does not increment waiters if it sees counter == 0.
    74  	// Still do a cheap sanity check to detect WaitGroup misuse.
    75  	if wg.state.Load() != state {
    76  		panic("sync: WaitGroup misuse: Add called concurrently with Wait")
    77  	}
    78  	// Reset waiters count to 0.
    79  	wg.state.Store(0)
    80  	for ; w != 0; w-- {
    81  		runtime_Semrelease(&wg.sema, false, 0)
    82  	}
    83  }
    84  
    85  // Done decrements the WaitGroup counter by one.
    86  func (wg *WaitGroup) Done() {
    87  	wg.Add(-1)
    88  }
    89  
    90  // Wait blocks until the WaitGroup counter is zero.
    91  func (wg *WaitGroup) Wait() {
    92  	if race.Enabled {
    93  		race.Disable()
    94  	}
    95  	for {
    96  		state := wg.state.Load()
    97  		v := int32(state >> 32)
    98  		w := uint32(state)
    99  		if v == 0 {
   100  			// Counter is 0, no need to wait.
   101  			if race.Enabled {
   102  				race.Enable()
   103  				race.Acquire(unsafe.Pointer(wg))
   104  			}
   105  			return
   106  		}
   107  		// Increment waiters count.
   108  		if wg.state.CompareAndSwap(state, state+1) {
   109  			if race.Enabled && w == 0 {
   110  				// Wait must be synchronized with the first Add.
   111  				// Need to model this is as a write to race with the read in Add.
   112  				// As a consequence, can do the write only for the first waiter,
   113  				// otherwise concurrent Waits will race with each other.
   114  				race.Write(unsafe.Pointer(&wg.sema))
   115  			}
   116  			runtime_Semacquire(&wg.sema)
   117  			if wg.state.Load() != 0 {
   118  				panic("sync: WaitGroup is reused before previous Wait has returned")
   119  			}
   120  			if race.Enabled {
   121  				race.Enable()
   122  				race.Acquire(unsafe.Pointer(wg))
   123  			}
   124  			return
   125  		}
   126  	}
   127  }
   128  

View as plain text