@@ -161,7 +161,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
161
161
}
162
162
scMaxEachPostBytes := int (h .ln .config .GetNormalizedScMaxEachPostBytes ().To )
163
163
164
- if request .Method == "POST" && sessionId != "" {
164
+ if request .Method == "POST" && sessionId != "" { // stream-up, packet-up
165
165
seq := ""
166
166
if len (subpath ) > 1 {
167
167
seq = subpath [1 ]
@@ -173,8 +173,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
173
173
writer .WriteHeader (http .StatusBadRequest )
174
174
return
175
175
}
176
+ uploadDone := done .New ()
176
177
err = currentSession .uploadQueue .Push (Packet {
177
- Reader : request .Body ,
178
+ Reader : & httpRequestBodyReader {
179
+ requestReader : request .Body ,
180
+ uploadDone : uploadDone ,
181
+ },
178
182
})
179
183
if err != nil {
180
184
errors .LogInfoInner (context .Background (), err , "failed to upload (PushReader)" )
@@ -199,8 +203,12 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
199
203
}
200
204
}()
201
205
}
202
- <- request .Context ().Done ()
206
+ select {
207
+ case <- request .Context ().Done ():
208
+ case <- uploadDone .Wait ():
209
+ }
203
210
}
211
+ uploadDone .Close ()
204
212
return
205
213
}
206
214
@@ -243,7 +251,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
243
251
}
244
252
245
253
writer .WriteHeader (http .StatusOK )
246
- } else if request .Method == "GET" || sessionId == "" {
254
+ } else if request .Method == "GET" || sessionId == "" { // stream-down, stream-one
247
255
responseFlusher , ok := writer .(http.Flusher )
248
256
if ! ok {
249
257
panic ("expected http.ResponseWriter to be an http.Flusher" )
@@ -283,7 +291,7 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
283
291
reader : request .Body ,
284
292
remoteAddr : remoteAddr ,
285
293
}
286
- if sessionId != "" {
294
+ if sessionId != "" { // if not stream-one
287
295
conn .reader = currentSession .uploadQueue
288
296
}
289
297
@@ -302,6 +310,20 @@ func (h *requestHandler) ServeHTTP(writer http.ResponseWriter, request *http.Req
302
310
}
303
311
}
304
312
313
+ type httpRequestBodyReader struct {
314
+ requestReader io.ReadCloser
315
+ uploadDone * done.Instance
316
+ }
317
+
318
+ func (c * httpRequestBodyReader ) Read (b []byte ) (int , error ) {
319
+ return c .requestReader .Read (b )
320
+ }
321
+
322
+ func (c * httpRequestBodyReader ) Close () error {
323
+ defer c .uploadDone .Close ()
324
+ return c .requestReader .Close ()
325
+ }
326
+
305
327
type httpResponseBodyWriter struct {
306
328
sync.Mutex
307
329
responseWriter http.ResponseWriter
0 commit comments