Source file
src/runtime/select.go
Documentation: runtime
1
2
3
4
5 package runtime
6
7
8
9 import (
10 "internal/abi"
11 "unsafe"
12 )
13
14 const debugSelect = false
15
16
17
18
19 type scase struct {
20 c *hchan
21 elem unsafe.Pointer
22 }
23
24 var (
25 chansendpc = abi.FuncPCABIInternal(chansend)
26 chanrecvpc = abi.FuncPCABIInternal(chanrecv)
27 )
28
29 func selectsetpc(pc *uintptr) {
30 *pc = getcallerpc()
31 }
32
33 func sellock(scases []scase, lockorder []uint16) {
34 var c *hchan
35 for _, o := range lockorder {
36 c0 := scases[o].c
37 if c0 != c {
38 c = c0
39 lock(&c.lock)
40 }
41 }
42 }
43
44 func selunlock(scases []scase, lockorder []uint16) {
45
46
47
48
49
50
51
52
53 for i := len(lockorder) - 1; i >= 0; i-- {
54 c := scases[lockorder[i]].c
55 if i > 0 && c == scases[lockorder[i-1]].c {
56 continue
57 }
58 unlock(&c.lock)
59 }
60 }
61
62 func selparkcommit(gp *g, _ unsafe.Pointer) bool {
63
64
65
66
67
68 gp.activeStackChans = true
69
70
71
72 gp.parkingOnChan.Store(false)
73
74
75
76
77
78
79
80
81
82
83 var lastc *hchan
84 for sg := gp.waiting; sg != nil; sg = sg.waitlink {
85 if sg.c != lastc && lastc != nil {
86
87
88
89
90
91
92 unlock(&lastc.lock)
93 }
94 lastc = sg.c
95 }
96 if lastc != nil {
97 unlock(&lastc.lock)
98 }
99 return true
100 }
101
102 func block() {
103 gopark(nil, nil, waitReasonSelectNoCases, traceBlockForever, 1)
104 }
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121 func selectgo(cas0 *scase, order0 *uint16, pc0 *uintptr, nsends, nrecvs int, block bool) (int, bool) {
122 if debugSelect {
123 print("select: cas0=", cas0, "\n")
124 }
125
126
127
128 cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
129 order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))
130
131 ncases := nsends + nrecvs
132 scases := cas1[:ncases:ncases]
133 pollorder := order1[:ncases:ncases]
134 lockorder := order1[ncases:][:ncases:ncases]
135
136
137
138
139
140 var pcs []uintptr
141 if raceenabled && pc0 != nil {
142 pc1 := (*[1 << 16]uintptr)(unsafe.Pointer(pc0))
143 pcs = pc1[:ncases:ncases]
144 }
145 casePC := func(casi int) uintptr {
146 if pcs == nil {
147 return 0
148 }
149 return pcs[casi]
150 }
151
152 var t0 int64
153 if blockprofilerate > 0 {
154 t0 = cputicks()
155 }
156
157
158
159
160
161
162
163
164
165
166 norder := 0
167 for i := range scases {
168 cas := &scases[i]
169
170
171 if cas.c == nil {
172 cas.elem = nil
173 continue
174 }
175
176 if cas.c.timer != nil {
177 cas.c.timer.maybeRunChan()
178 }
179
180 j := cheaprandn(uint32(norder + 1))
181 pollorder[norder] = pollorder[j]
182 pollorder[j] = uint16(i)
183 norder++
184 }
185 pollorder = pollorder[:norder]
186 lockorder = lockorder[:norder]
187
188
189
190 for i := range lockorder {
191 j := i
192
193 c := scases[pollorder[i]].c
194 for j > 0 && scases[lockorder[(j-1)/2]].c.sortkey() < c.sortkey() {
195 k := (j - 1) / 2
196 lockorder[j] = lockorder[k]
197 j = k
198 }
199 lockorder[j] = pollorder[i]
200 }
201 for i := len(lockorder) - 1; i >= 0; i-- {
202 o := lockorder[i]
203 c := scases[o].c
204 lockorder[i] = lockorder[0]
205 j := 0
206 for {
207 k := j*2 + 1
208 if k >= i {
209 break
210 }
211 if k+1 < i && scases[lockorder[k]].c.sortkey() < scases[lockorder[k+1]].c.sortkey() {
212 k++
213 }
214 if c.sortkey() < scases[lockorder[k]].c.sortkey() {
215 lockorder[j] = lockorder[k]
216 j = k
217 continue
218 }
219 break
220 }
221 lockorder[j] = o
222 }
223
224 if debugSelect {
225 for i := 0; i+1 < len(lockorder); i++ {
226 if scases[lockorder[i]].c.sortkey() > scases[lockorder[i+1]].c.sortkey() {
227 print("i=", i, " x=", lockorder[i], " y=", lockorder[i+1], "\n")
228 throw("select: broken sort")
229 }
230 }
231 }
232
233
234 sellock(scases, lockorder)
235
236 var (
237 gp *g
238 sg *sudog
239 c *hchan
240 k *scase
241 sglist *sudog
242 sgnext *sudog
243 qp unsafe.Pointer
244 nextp **sudog
245 )
246
247
248 var casi int
249 var cas *scase
250 var caseSuccess bool
251 var caseReleaseTime int64 = -1
252 var recvOK bool
253 for _, casei := range pollorder {
254 casi = int(casei)
255 cas = &scases[casi]
256 c = cas.c
257
258 if casi >= nsends {
259 sg = c.sendq.dequeue()
260 if sg != nil {
261 goto recv
262 }
263 if c.qcount > 0 {
264 goto bufrecv
265 }
266 if c.closed != 0 {
267 goto rclose
268 }
269 } else {
270 if raceenabled {
271 racereadpc(c.raceaddr(), casePC(casi), chansendpc)
272 }
273 if c.closed != 0 {
274 goto sclose
275 }
276 sg = c.recvq.dequeue()
277 if sg != nil {
278 goto send
279 }
280 if c.qcount < c.dataqsiz {
281 goto bufsend
282 }
283 }
284 }
285
286 if !block {
287 selunlock(scases, lockorder)
288 casi = -1
289 goto retc
290 }
291
292
293 gp = getg()
294 if gp.waiting != nil {
295 throw("gp.waiting != nil")
296 }
297 nextp = &gp.waiting
298 for _, casei := range lockorder {
299 casi = int(casei)
300 cas = &scases[casi]
301 c = cas.c
302 sg := acquireSudog()
303 sg.g = gp
304 sg.isSelect = true
305
306
307 sg.elem = cas.elem
308 sg.releasetime = 0
309 if t0 != 0 {
310 sg.releasetime = -1
311 }
312 sg.c = c
313
314 *nextp = sg
315 nextp = &sg.waitlink
316
317 if casi < nsends {
318 c.sendq.enqueue(sg)
319 } else {
320 c.recvq.enqueue(sg)
321 }
322
323 if c.timer != nil {
324 blockTimerChan(c)
325 }
326 }
327
328
329 gp.param = nil
330
331
332
333
334 gp.parkingOnChan.Store(true)
335 gopark(selparkcommit, nil, waitReasonSelect, traceBlockSelect, 1)
336 gp.activeStackChans = false
337
338 sellock(scases, lockorder)
339
340 gp.selectDone.Store(0)
341 sg = (*sudog)(gp.param)
342 gp.param = nil
343
344
345
346
347
348 casi = -1
349 cas = nil
350 caseSuccess = false
351 sglist = gp.waiting
352
353 for sg1 := gp.waiting; sg1 != nil; sg1 = sg1.waitlink {
354 sg1.isSelect = false
355 sg1.elem = nil
356 sg1.c = nil
357 }
358 gp.waiting = nil
359
360 for _, casei := range lockorder {
361 k = &scases[casei]
362 if k.c.timer != nil {
363 unblockTimerChan(k.c)
364 }
365 if sg == sglist {
366
367 casi = int(casei)
368 cas = k
369 caseSuccess = sglist.success
370 if sglist.releasetime > 0 {
371 caseReleaseTime = sglist.releasetime
372 }
373 } else {
374 c = k.c
375 if int(casei) < nsends {
376 c.sendq.dequeueSudoG(sglist)
377 } else {
378 c.recvq.dequeueSudoG(sglist)
379 }
380 }
381 sgnext = sglist.waitlink
382 sglist.waitlink = nil
383 releaseSudog(sglist)
384 sglist = sgnext
385 }
386
387 if cas == nil {
388 throw("selectgo: bad wakeup")
389 }
390
391 c = cas.c
392
393 if debugSelect {
394 print("wait-return: cas0=", cas0, " c=", c, " cas=", cas, " send=", casi < nsends, "\n")
395 }
396
397 if casi < nsends {
398 if !caseSuccess {
399 goto sclose
400 }
401 } else {
402 recvOK = caseSuccess
403 }
404
405 if raceenabled {
406 if casi < nsends {
407 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
408 } else if cas.elem != nil {
409 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
410 }
411 }
412 if msanenabled {
413 if casi < nsends {
414 msanread(cas.elem, c.elemtype.Size_)
415 } else if cas.elem != nil {
416 msanwrite(cas.elem, c.elemtype.Size_)
417 }
418 }
419 if asanenabled {
420 if casi < nsends {
421 asanread(cas.elem, c.elemtype.Size_)
422 } else if cas.elem != nil {
423 asanwrite(cas.elem, c.elemtype.Size_)
424 }
425 }
426
427 selunlock(scases, lockorder)
428 goto retc
429
430 bufrecv:
431
432 if raceenabled {
433 if cas.elem != nil {
434 raceWriteObjectPC(c.elemtype, cas.elem, casePC(casi), chanrecvpc)
435 }
436 racenotify(c, c.recvx, nil)
437 }
438 if msanenabled && cas.elem != nil {
439 msanwrite(cas.elem, c.elemtype.Size_)
440 }
441 if asanenabled && cas.elem != nil {
442 asanwrite(cas.elem, c.elemtype.Size_)
443 }
444 recvOK = true
445 qp = chanbuf(c, c.recvx)
446 if cas.elem != nil {
447 typedmemmove(c.elemtype, cas.elem, qp)
448 }
449 typedmemclr(c.elemtype, qp)
450 c.recvx++
451 if c.recvx == c.dataqsiz {
452 c.recvx = 0
453 }
454 c.qcount--
455 selunlock(scases, lockorder)
456 goto retc
457
458 bufsend:
459
460 if raceenabled {
461 racenotify(c, c.sendx, nil)
462 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
463 }
464 if msanenabled {
465 msanread(cas.elem, c.elemtype.Size_)
466 }
467 if asanenabled {
468 asanread(cas.elem, c.elemtype.Size_)
469 }
470 typedmemmove(c.elemtype, chanbuf(c, c.sendx), cas.elem)
471 c.sendx++
472 if c.sendx == c.dataqsiz {
473 c.sendx = 0
474 }
475 c.qcount++
476 selunlock(scases, lockorder)
477 goto retc
478
479 recv:
480
481 recv(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
482 if debugSelect {
483 print("syncrecv: cas0=", cas0, " c=", c, "\n")
484 }
485 recvOK = true
486 goto retc
487
488 rclose:
489
490 selunlock(scases, lockorder)
491 recvOK = false
492 if cas.elem != nil {
493 typedmemclr(c.elemtype, cas.elem)
494 }
495 if raceenabled {
496 raceacquire(c.raceaddr())
497 }
498 goto retc
499
500 send:
501
502 if raceenabled {
503 raceReadObjectPC(c.elemtype, cas.elem, casePC(casi), chansendpc)
504 }
505 if msanenabled {
506 msanread(cas.elem, c.elemtype.Size_)
507 }
508 if asanenabled {
509 asanread(cas.elem, c.elemtype.Size_)
510 }
511 send(c, sg, cas.elem, func() { selunlock(scases, lockorder) }, 2)
512 if debugSelect {
513 print("syncsend: cas0=", cas0, " c=", c, "\n")
514 }
515 goto retc
516
517 retc:
518 if caseReleaseTime > 0 {
519 blockevent(caseReleaseTime-t0, 1)
520 }
521 return casi, recvOK
522
523 sclose:
524
525 selunlock(scases, lockorder)
526 panic(plainError("send on closed channel"))
527 }
528
529 func (c *hchan) sortkey() uintptr {
530 return uintptr(unsafe.Pointer(c))
531 }
532
533
534
535 type runtimeSelect struct {
536 dir selectDir
537 typ unsafe.Pointer
538 ch *hchan
539 val unsafe.Pointer
540 }
541
542
543 type selectDir int
544
545 const (
546 _ selectDir = iota
547 selectSend
548 selectRecv
549 selectDefault
550 )
551
552
553 func reflect_rselect(cases []runtimeSelect) (int, bool) {
554 if len(cases) == 0 {
555 block()
556 }
557 sel := make([]scase, len(cases))
558 orig := make([]int, len(cases))
559 nsends, nrecvs := 0, 0
560 dflt := -1
561 for i, rc := range cases {
562 var j int
563 switch rc.dir {
564 case selectDefault:
565 dflt = i
566 continue
567 case selectSend:
568 j = nsends
569 nsends++
570 case selectRecv:
571 nrecvs++
572 j = len(cases) - nrecvs
573 }
574
575 sel[j] = scase{c: rc.ch, elem: rc.val}
576 orig[j] = i
577 }
578
579
580 if nsends+nrecvs == 0 {
581 return dflt, false
582 }
583
584
585 if nsends+nrecvs < len(cases) {
586 copy(sel[nsends:], sel[len(cases)-nrecvs:])
587 copy(orig[nsends:], orig[len(cases)-nrecvs:])
588 }
589
590 order := make([]uint16, 2*(nsends+nrecvs))
591 var pc0 *uintptr
592 if raceenabled {
593 pcs := make([]uintptr, nsends+nrecvs)
594 for i := range pcs {
595 selectsetpc(&pcs[i])
596 }
597 pc0 = &pcs[0]
598 }
599
600 chosen, recvOK := selectgo(&sel[0], &order[0], pc0, nsends, nrecvs, dflt == -1)
601
602
603 if chosen < 0 {
604 chosen = dflt
605 } else {
606 chosen = orig[chosen]
607 }
608 return chosen, recvOK
609 }
610
611 func (q *waitq) dequeueSudoG(sgp *sudog) {
612 x := sgp.prev
613 y := sgp.next
614 if x != nil {
615 if y != nil {
616
617 x.next = y
618 y.prev = x
619 sgp.next = nil
620 sgp.prev = nil
621 return
622 }
623
624 x.next = nil
625 q.last = x
626 sgp.prev = nil
627 return
628 }
629 if y != nil {
630
631 y.prev = nil
632 q.first = y
633 sgp.next = nil
634 return
635 }
636
637
638
639 if q.first == sgp {
640 q.first = nil
641 q.last = nil
642 }
643 }
644
View as plain text