-
Notifications
You must be signed in to change notification settings - Fork 158
/
Copy pathrequester_pays_test.py
73 lines (57 loc) · 2.56 KB
/
requester_pays_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
# Copyright 2017 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import backoff
import os
import tempfile
from google.api_core.exceptions import GoogleAPIError
from google.cloud import storage
import pytest
import storage_disable_requester_pays
import storage_download_file_requester_pays
import storage_enable_requester_pays
import storage_get_requester_pays_status
# We use a different bucket from other tests.
# The service account for the test needs to have Billing Project Manager role
# in order to make changes on buckets with requester pays enabled.
BUCKET = os.environ["REQUESTER_PAYS_TEST_BUCKET"]
PROJECT = os.environ["GOOGLE_CLOUD_PROJECT"]
@backoff.on_exception(backoff.expo, GoogleAPIError, max_time=60)
def test_enable_requester_pays(capsys):
storage_enable_requester_pays.enable_requester_pays(BUCKET)
out, _ = capsys.readouterr()
assert f"Requester Pays has been enabled for {BUCKET}" in out
@backoff.on_exception(backoff.expo, GoogleAPIError, max_time=60)
def test_disable_requester_pays(capsys):
storage_disable_requester_pays.disable_requester_pays(BUCKET)
out, _ = capsys.readouterr()
assert f"Requester Pays has been disabled for {BUCKET}" in out
@backoff.on_exception(backoff.expo, GoogleAPIError, max_time=60)
def test_get_requester_pays_status(capsys):
storage_get_requester_pays_status.get_requester_pays_status(BUCKET)
out, _ = capsys.readouterr()
assert f"Requester Pays is disabled for {BUCKET}" in out
@pytest.fixture
def test_blob():
"""Provides a pre-existing blob in the test bucket."""
bucket = storage.Client().bucket(BUCKET)
blob = bucket.blob("storage_snippets_test_sigil")
blob.upload_from_string("Hello, is it me you're looking for?")
return blob
@backoff.on_exception(backoff.expo, GoogleAPIError, max_time=60)
def test_download_file_requester_pays(test_blob, capsys):
with tempfile.NamedTemporaryFile() as dest_file:
storage_download_file_requester_pays.download_file_requester_pays(
BUCKET, PROJECT, test_blob.name, dest_file.name
)
assert dest_file.read()