...
Source file
src/internal/poll/splice_linux_test.go
1
2
3
4
5 package poll_test
6
7 import (
8 "internal/poll"
9 "runtime"
10 "sync"
11 "sync/atomic"
12 "testing"
13 "time"
14 )
15
16 var closeHook atomic.Value
17
18 func init() {
19 closeFunc := poll.CloseFunc
20 poll.CloseFunc = func(fd int) (err error) {
21 if v := closeHook.Load(); v != nil {
22 if hook := v.(func(int)); hook != nil {
23 hook(fd)
24 }
25 }
26 return closeFunc(fd)
27 }
28 }
29
30 func TestSplicePipePool(t *testing.T) {
31 const N = 64
32 var (
33 p *poll.SplicePipe
34 ps []*poll.SplicePipe
35 allFDs []int
36 pendingFDs sync.Map
37 err error
38 )
39
40 closeHook.Store(func(fd int) { pendingFDs.Delete(fd) })
41 t.Cleanup(func() { closeHook.Store((func(int))(nil)) })
42
43 for i := 0; i < N; i++ {
44 p, err = poll.GetPipe()
45 if err != nil {
46 t.Skipf("failed to create pipe due to error(%v), skip this test", err)
47 }
48 _, pwfd := poll.GetPipeFds(p)
49 allFDs = append(allFDs, pwfd)
50 pendingFDs.Store(pwfd, struct{}{})
51 ps = append(ps, p)
52 }
53 for _, p = range ps {
54 poll.PutPipe(p)
55 }
56 ps = nil
57 p = nil
58
59
60 timeout := 5 * time.Minute
61 if deadline, ok := t.Deadline(); ok {
62 timeout = deadline.Sub(time.Now())
63 timeout -= timeout / 10
64 }
65 expiredTime := time.NewTimer(timeout)
66 defer expiredTime.Stop()
67
68
69
70 for {
71 runtime.GC()
72 time.Sleep(10 * time.Millisecond)
73
74
75 var leakedFDs []int
76 pendingFDs.Range(func(k, v any) bool {
77 leakedFDs = append(leakedFDs, k.(int))
78 return true
79 })
80 if len(leakedFDs) == 0 {
81 break
82 }
83
84 select {
85 case <-expiredTime.C:
86 t.Logf("all descriptors: %v", allFDs)
87 t.Fatalf("leaked descriptors: %v", leakedFDs)
88 default:
89 }
90 }
91 }
92
93 func BenchmarkSplicePipe(b *testing.B) {
94 b.Run("SplicePipeWithPool", func(b *testing.B) {
95 for i := 0; i < b.N; i++ {
96 p, err := poll.GetPipe()
97 if err != nil {
98 continue
99 }
100 poll.PutPipe(p)
101 }
102 })
103 b.Run("SplicePipeWithoutPool", func(b *testing.B) {
104 for i := 0; i < b.N; i++ {
105 p := poll.NewPipe()
106 if p == nil {
107 b.Skip("newPipe returned nil")
108 }
109 poll.DestroyPipe(p)
110 }
111 })
112 }
113
114 func BenchmarkSplicePipePoolParallel(b *testing.B) {
115 b.RunParallel(func(pb *testing.PB) {
116 for pb.Next() {
117 p, err := poll.GetPipe()
118 if err != nil {
119 continue
120 }
121 poll.PutPipe(p)
122 }
123 })
124 }
125
126 func BenchmarkSplicePipeNativeParallel(b *testing.B) {
127 b.RunParallel(func(pb *testing.PB) {
128 for pb.Next() {
129 p := poll.NewPipe()
130 if p == nil {
131 b.Skip("newPipe returned nil")
132 }
133 poll.DestroyPipe(p)
134 }
135 })
136 }
137
View as plain text