...
1
2
3
4
5 package par
6
7 import "fmt"
8
9
10
11 type Queue struct {
12 maxActive int
13 st chan queueState
14 }
15
16 type queueState struct {
17 active int
18 backlog []func()
19 idle chan struct{}
20 }
21
22
23
24
25 func NewQueue(maxActive int) *Queue {
26 if maxActive < 1 {
27 panic(fmt.Sprintf("par.NewQueue called with nonpositive limit (%d)", maxActive))
28 }
29
30 q := &Queue{
31 maxActive: maxActive,
32 st: make(chan queueState, 1),
33 }
34 q.st <- queueState{}
35 return q
36 }
37
38
39
40
41
42 func (q *Queue) Add(f func()) {
43 st := <-q.st
44 if st.active == q.maxActive {
45 st.backlog = append(st.backlog, f)
46 q.st <- st
47 return
48 }
49 if st.active == 0 {
50
51 st.idle = nil
52 }
53 st.active++
54 q.st <- st
55
56 go func() {
57 for {
58 f()
59
60 st := <-q.st
61 if len(st.backlog) == 0 {
62 if st.active--; st.active == 0 && st.idle != nil {
63 close(st.idle)
64 }
65 q.st <- st
66 return
67 }
68 f, st.backlog = st.backlog[0], st.backlog[1:]
69 q.st <- st
70 }
71 }()
72 }
73
74
75
76 func (q *Queue) Idle() <-chan struct{} {
77 st := <-q.st
78 defer func() { q.st <- st }()
79
80 if st.idle == nil {
81 st.idle = make(chan struct{})
82 if st.active == 0 {
83 close(st.idle)
84 }
85 }
86
87 return st.idle
88 }
89
View as plain text