Skip to content

Commit b6574aa

Browse files
authored
fix(storage): retry errors from last recv on uploads (#9616)
* fix(storage): retry errors from last recv on uploads * comment
1 parent 26c7ce1 commit b6574aa

File tree

1 file changed

+14
-8
lines changed

1 file changed

+14
-8
lines changed

storage/grpc_client.go

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1872,6 +1872,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
18721872

18731873
// Send a request with as many bytes as possible.
18741874
// Loop until all bytes are sent.
1875+
sendBytes: // label this loop so that we can use a continue statement from a nested block
18751876
for {
18761877
bytesNotYetSent := recvd - sent
18771878
remainingDataFitsInSingleReq := bytesNotYetSent <= maxPerMessageWriteSize
@@ -1949,10 +1950,6 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
19491950
// we retry.
19501951
w.stream = nil
19511952

1952-
// Drop the stream reference as a new one will need to be created if
1953-
// we can retry the upload
1954-
w.stream = nil
1955-
19561953
// Retriable errors mean we should start over and attempt to
19571954
// resend the entire buffer via a new stream.
19581955
// If not retriable, falling through will return the error received.
@@ -1966,7 +1963,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
19661963

19671964
// Continue sending requests, opening a new stream and resending
19681965
// any bytes not yet persisted as per QueryWriteStatus
1969-
continue
1966+
continue sendBytes
19701967
}
19711968
}
19721969
if err != nil {
@@ -1981,7 +1978,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
19811978
// Not done sending data, do not attempt to commit it yet, loop around
19821979
// and send more data.
19831980
if recvd-sent > 0 {
1984-
continue
1981+
continue sendBytes
19851982
}
19861983

19871984
// The buffer has been uploaded and there is still more data to be
@@ -2012,7 +2009,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
20122009
// Drop the stream reference as a new one will need to be created.
20132010
w.stream = nil
20142011

2015-
continue
2012+
continue sendBytes
20162013
}
20172014
if err != nil {
20182015
return nil, 0, err
@@ -2022,7 +2019,7 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
20222019
// Retry if not all bytes were persisted.
20232020
writeOffset = resp.GetPersistedSize()
20242021
sent = int(writeOffset) - int(start)
2025-
continue
2022+
continue sendBytes
20262023
}
20272024
} else {
20282025
// If the object is done uploading, close the send stream to signal
@@ -2042,6 +2039,15 @@ func (w *gRPCWriter) uploadBuffer(recvd int, start int64, doneReading bool) (*st
20422039
var obj *storagepb.Object
20432040
for obj == nil {
20442041
resp, err := w.stream.Recv()
2042+
if shouldRetry(err) {
2043+
writeOffset, err = w.determineOffset(start)
2044+
if err != nil {
2045+
return nil, 0, err
2046+
}
2047+
sent = int(writeOffset) - int(start)
2048+
w.stream = nil
2049+
continue sendBytes
2050+
}
20452051
if err != nil {
20462052
return nil, 0, err
20472053
}

0 commit comments

Comments
 (0)