1
2
3
4
5 package poll
6
7 import (
8 "internal/syscall/unix"
9 "runtime"
10 "sync"
11 "sync/atomic"
12 "syscall"
13 "unsafe"
14 )
15
16 const (
17
18 spliceNonblock = 0x2
19
20
21
22 maxSpliceSize = 4 << 20
23 )
24
25
26
27
28
29
30
31
32 func Splice(dst, src *FD, remain int64) (written int64, handled bool, sc string, err error) {
33 p, sc, err := getPipe()
34 if err != nil {
35 return 0, false, sc, err
36 }
37 defer putPipe(p)
38 var inPipe, n int
39 for err == nil && remain > 0 {
40 max := maxSpliceSize
41 if int64(max) > remain {
42 max = int(remain)
43 }
44 inPipe, err = spliceDrain(p.wfd, src, max)
45
46
47
48
49
50
51
52
53
54
55
56 handled = handled || (err != syscall.EINVAL)
57 if err != nil || inPipe == 0 {
58 break
59 }
60 p.data += inPipe
61
62 n, err = splicePump(dst, p.rfd, inPipe)
63 if n > 0 {
64 written += int64(n)
65 remain -= int64(n)
66 p.data -= n
67 }
68 }
69 if err != nil {
70 return written, handled, "splice", err
71 }
72 return written, true, "", nil
73 }
74
75
76
77
78
79
80
81
82
83
84
85 func spliceDrain(pipefd int, sock *FD, max int) (int, error) {
86 if err := sock.readLock(); err != nil {
87 return 0, err
88 }
89 defer sock.readUnlock()
90 if err := sock.pd.prepareRead(sock.isFile); err != nil {
91 return 0, err
92 }
93 for {
94 n, err := splice(pipefd, sock.Sysfd, max, spliceNonblock)
95 if err == syscall.EINTR {
96 continue
97 }
98 if err != syscall.EAGAIN {
99 return n, err
100 }
101 if err := sock.pd.waitRead(sock.isFile); err != nil {
102 return n, err
103 }
104 }
105 }
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120 func splicePump(sock *FD, pipefd int, inPipe int) (int, error) {
121 if err := sock.writeLock(); err != nil {
122 return 0, err
123 }
124 defer sock.writeUnlock()
125 if err := sock.pd.prepareWrite(sock.isFile); err != nil {
126 return 0, err
127 }
128 written := 0
129 for inPipe > 0 {
130 n, err := splice(sock.Sysfd, pipefd, inPipe, spliceNonblock)
131
132
133 if n > 0 {
134 inPipe -= n
135 written += n
136 continue
137 }
138 if err != syscall.EAGAIN {
139 return written, err
140 }
141 if err := sock.pd.waitWrite(sock.isFile); err != nil {
142 return written, err
143 }
144 }
145 return written, nil
146 }
147
148
149
150
151
152 func splice(out int, in int, max int, flags int) (int, error) {
153 n, err := syscall.Splice(in, nil, out, nil, max, flags)
154 return int(n), err
155 }
156
157 type splicePipe struct {
158 rfd int
159 wfd int
160 data int
161 }
162
163
164
165
166 var splicePipePool = sync.Pool{New: newPoolPipe}
167
168 func newPoolPipe() interface{} {
169
170
171 p := newPipe()
172 if p == nil {
173 return nil
174 }
175 runtime.SetFinalizer(p, destroyPipe)
176 return p
177 }
178
179
180
181
182
183 func getPipe() (*splicePipe, string, error) {
184 v := splicePipePool.Get()
185 if v == nil {
186 return nil, "splice", syscall.EINVAL
187 }
188 return v.(*splicePipe), "", nil
189 }
190
191 func putPipe(p *splicePipe) {
192
193
194 if p.data != 0 {
195 runtime.SetFinalizer(p, nil)
196 destroyPipe(p)
197 return
198 }
199 splicePipePool.Put(p)
200 }
201
202 var disableSplice unsafe.Pointer
203
204
205 func newPipe() (sp *splicePipe) {
206 p := (*bool)(atomic.LoadPointer(&disableSplice))
207 if p != nil && *p {
208 return nil
209 }
210
211 var fds [2]int
212
213
214
215
216 const flags = syscall.O_CLOEXEC | syscall.O_NONBLOCK
217 if err := syscall.Pipe2(fds[:], flags); err != nil {
218 return nil
219 }
220
221 sp = &splicePipe{rfd: fds[0], wfd: fds[1]}
222
223 if p == nil {
224 p = new(bool)
225 defer atomic.StorePointer(&disableSplice, unsafe.Pointer(p))
226
227
228 if _, _, errno := syscall.Syscall(unix.FcntlSyscall, uintptr(fds[0]), syscall.F_GETPIPE_SZ, 0); errno != 0 {
229 *p = true
230 destroyPipe(sp)
231 return nil
232 }
233 }
234
235 return
236 }
237
238
239 func destroyPipe(p *splicePipe) {
240 CloseFunc(p.rfd)
241 CloseFunc(p.wfd)
242 }
243
View as plain text