From 10c8faccc2dae2a8177ff30ab16d67413df9f536 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Tue, 9 Sep 2025 22:28:58 -0400 Subject: [PATCH 1/2] fix(storage): free buffers in Bidi Reader (#12839) Fixes a bug where in the gRPC Bidi Reader code path, allocated buffers were not being freed back to the buffer pool after the messages were read in some cases. Also adds an emulator test to check for leaks in the codec code for Reader and MultiRangeDownloader --- storage/client_test.go | 161 +++++++++++++++++++++++++++++++++++++++++ storage/grpc_client.go | 6 ++ 2 files changed, 167 insertions(+) diff --git a/storage/client_test.go b/storage/client_test.go index ebfc523daba6..189bc31c7392 100644 --- a/storage/client_test.go +++ b/storage/client_test.go @@ -29,6 +29,7 @@ import ( "slices" "strconv" "strings" + "sync" "testing" "time" @@ -43,6 +44,7 @@ import ( "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/codes" + expgrpc "google.golang.org/grpc/experimental" "google.golang.org/grpc/mem" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" @@ -2825,6 +2827,165 @@ func TestWriterChunkRetryDeadlineEmulated(t *testing.T) { }) } +// Used to test gRPC buffer pool allocs and frees. +// See https://pkg.go.dev/google.golang.org/grpc/mem +type testBufferPool struct { + allocs int64 + frees int64 + sync.Mutex // mutex needed becuase Get/Put can be called in parallel. +} + +func (bp *testBufferPool) Get(length int) *[]byte { + bp.Lock() + bp.allocs += int64(length) + bp.Unlock() + return mem.DefaultBufferPool().Get(length) +} + +func (bp *testBufferPool) Put(b *[]byte) { + if b != nil { + bp.Lock() + bp.frees += int64(len(*b)) + bp.Unlock() + } + mem.DefaultBufferPool().Put(b) +} + +func (bp *testBufferPool) getAllocsAndFrees() (int64, int64) { + bp.Lock() + defer bp.Unlock() + return bp.allocs, bp.frees +} + +// Test that successful downloads using Reader and MultiRangeDownloader free +// all of their allocated buffers. +func TestReadCodecLeaksEmulated(t *testing.T) { + checkEmulatorEnvironment(t) + ctx := context.Background() + var bp testBufferPool + client, err := NewGRPCClient(ctx, option.WithGRPCDialOption(expgrpc.WithBufferPool(&bp)), experimental.WithZonalBucketAPIs()) + if err != nil { + t.Fatalf("NewGRPCClient: %v", err) + } + var ( + contents = randomBytes9MiB + prefix = time.Now().Nanosecond() + bucketName = fmt.Sprintf("bucket-%d", prefix) + objName = fmt.Sprintf("%d-object", prefix) + bkt = client.Bucket(bucketName) + obj = bkt.Object(objName) + ) + + // Upload object. + if err := bkt.Create(ctx, "project", nil); err != nil { + t.Fatalf("creating bucket: %v", err) + } + w := obj.NewWriter(ctx) + if _, err := io.Copy(w, bytes.NewReader(contents)); err != nil { + t.Fatalf("uploading object: %v", err) + } + if err := w.Close(); err != nil { + t.Fatalf("closing writer: %v", err) + } + if bp.allocs != bp.frees { + t.Errorf("upload: alloc'd bytes %v not equal to freed bytes %v", bp.allocs, bp.frees) + } + + // Test multiple download paths. + testCases := []struct { + name string + downloadFunc func(obj *ObjectHandle) ([]byte, error) + }{ + { + name: "Reader.Read", + downloadFunc: func(obj *ObjectHandle) ([]byte, error) { + r, err := obj.NewReader(ctx) + defer r.Close() + if err != nil { + return nil, err + } + gotContents, err := io.ReadAll(r) + return gotContents, err + }, + }, + { + name: "Reader.WriteTo", + downloadFunc: func(obj *ObjectHandle) ([]byte, error) { + r, err := obj.NewReader(ctx) + defer r.Close() + if err != nil { + return nil, err + } + buf := bytes.NewBuffer([]byte{}) + _, err = r.WriteTo(buf) + return buf.Bytes(), err + }, + }, + { + name: "MultiRangeDownloader 3MiB ranges", + downloadFunc: func(obj *ObjectHandle) ([]byte, error) { + mrd, err := obj.NewMultiRangeDownloader(ctx) + var bufs []*bytes.Buffer + var currOff int64 + var increment int64 = 3 * MiB + for range 3 { + buf := bytes.NewBuffer([]byte{}) + mrd.Add(buf, currOff, increment, func(int64, int64, error) {}) + bufs = append(bufs, buf) + currOff += increment + } + mrd.Wait() + if err := mrd.Close(); err != nil { + return nil, err + } + var b []byte + for _, buf := range bufs { + b = append(b, buf.Bytes()...) + } + return b, err + }}, + { + name: "MultiRangeDownloader 256k ranges", + downloadFunc: func(obj *ObjectHandle) ([]byte, error) { + mrd, err := obj.NewMultiRangeDownloader(ctx) + var bufs []*bytes.Buffer + var currOff int64 + var increment int64 = 256 * 1024 + for range 36 { + buf := bytes.NewBuffer([]byte{}) + mrd.Add(buf, currOff, increment, func(int64, int64, error) {}) + bufs = append(bufs, buf) + currOff += increment + } + mrd.Wait() + if err := mrd.Close(); err != nil { + return nil, err + } + var b []byte + for _, buf := range bufs { + b = append(b, buf.Bytes()...) + } + return b, err + }}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + gotContents, err := tc.downloadFunc(obj) + if err != nil { + t.Fatalf("downloading content: %v", err) + } + if !bytes.Equal(gotContents, contents) { + t.Errorf("downloaded bytes did not match; got %v bytes, want %v", len(gotContents), len(contents)) + } + allocs, frees := bp.getAllocsAndFrees() + if allocs != frees { + t.Errorf("download: alloc'd bytes %v not equal to freed bytes %v", allocs, frees) + } + }) + } +} + // createRetryTest creates a bucket in the emulator and sets up a test using the // Retry Test API for the given instructions. This is intended for emulator tests // of retry behavior that are not covered by conformance tests. diff --git a/storage/grpc_client.go b/storage/grpc_client.go index 5850bcc1c080..2ba4d5bbeb88 100644 --- a/storage/grpc_client.go +++ b/storage/grpc_client.go @@ -2028,6 +2028,10 @@ func (r *gRPCReader) Read(p []byte) (int, error) { n, found := r.currMsg.readAndUpdateCRC(p, 1, func(b []byte) { r.updateCRC(b) }) + // If we are done reading the current msg, free buffers. + if r.currMsg.done { + r.currMsg.databufs.Free() + } // If data for our readID was found, we can update `seen` and return. if found { @@ -2080,6 +2084,8 @@ func (r *gRPCReader) WriteTo(w io.Writer) (int64, error) { r.updateCRC(b) }) r.seen += written + // We have processed the message, so free the buffer + r.currMsg.databufs.Free() if err != nil { return r.seen - alreadySeen, err } From bda509de8efbccadb53d2c346c881ca266179c53 Mon Sep 17 00:00:00 2001 From: Chris Cotter Date: Fri, 12 Sep 2025 17:20:02 -0400 Subject: [PATCH 2/2] chore(release-storage-1.56.2): release storage 1.56.2 (#12873) :robot: I have created a release *beep* *boop* --- ## [1.56.2](https://github.com/googleapis/google-cloud-go/compare/storage/v1.56.1...storage/v1.56.2) (2025-09-12) ### Bug Fixes * **storage:** Free buffers in Bidi Reader ([#12839](https://github.com/googleapis/google-cloud-go/issues/12839)) ([10c8fac](https://github.com/googleapis/google-cloud-go/commit/10c8faccc2dae2a8177ff30ab16d67413df9f536)) --- This PR was generated with [Release Please](https://github.com/googleapis/release-please). See [documentation](https://github.com/googleapis/release-please#release-please). --- .release-please-manifest-individual.json | 2 +- storage/CHANGES.md | 7 +++++++ storage/internal/version.go | 2 +- 3 files changed, 9 insertions(+), 2 deletions(-) diff --git a/.release-please-manifest-individual.json b/.release-please-manifest-individual.json index 80f5ed86d0e9..628a2924775f 100644 --- a/.release-please-manifest-individual.json +++ b/.release-please-manifest-individual.json @@ -13,6 +13,6 @@ "pubsub/v2": "2.0.0", "pubsublite": "1.8.2", "spanner": "1.84.1", - "storage": "1.56.1", + "storage": "1.56.2", "vertexai": "0.15.0" } diff --git a/storage/CHANGES.md b/storage/CHANGES.md index adb9cc85371c..f91215387528 100644 --- a/storage/CHANGES.md +++ b/storage/CHANGES.md @@ -1,6 +1,13 @@ # Changes +## [1.56.2](https://github.com/googleapis/google-cloud-go/compare/storage/v1.56.1...storage/v1.56.2) (2025-09-12) + + +### Bug Fixes + +* **storage:** Free buffers in Bidi Reader ([#12839](https://github.com/googleapis/google-cloud-go/issues/12839)) ([10c8fac](https://github.com/googleapis/google-cloud-go/commit/10c8faccc2dae2a8177ff30ab16d67413df9f536)) + ## [1.56.1](https://github.com/googleapis/google-cloud-go/compare/storage/v1.56.0...storage/v1.56.1) (2025-08-19) diff --git a/storage/internal/version.go b/storage/internal/version.go index 54603c25c471..c69bbfcc8905 100644 --- a/storage/internal/version.go +++ b/storage/internal/version.go @@ -15,4 +15,4 @@ package internal // Version is the current tagged release of the library. -const Version = "1.56.1" +const Version = "1.56.2"