-
Notifications
You must be signed in to change notification settings - Fork 158
/
Copy pathencryption_test.py
128 lines (103 loc) · 4.15 KB
/
encryption_test.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# Copyright 2016 Google, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import base64
import os
import tempfile
import uuid
from google.api_core.exceptions import NotFound
from google.cloud import storage
from google.cloud.storage import Blob
import pytest
import storage_download_encrypted_file
import storage_generate_encryption_key
import storage_object_csek_to_cmek
import storage_rotate_encryption_key
import storage_upload_encrypted_file
BUCKET = os.environ["CLOUD_STORAGE_BUCKET"]
KMS_KEY = os.environ["MAIN_CLOUD_KMS_KEY"]
TEST_ENCRYPTION_KEY = "brtJUWneL92g5q0N2gyDSnlPSYAiIVZ/cWgjyZNeMy0="
TEST_ENCRYPTION_KEY_DECODED = base64.b64decode(TEST_ENCRYPTION_KEY)
TEST_ENCRYPTION_KEY_2 = "o4OD7SWCaPjfeEGhAY+YCgMdY9UW+OJ8mvfWD9lNtO4="
TEST_ENCRYPTION_KEY_2_DECODED = base64.b64decode(TEST_ENCRYPTION_KEY_2)
def test_generate_encryption_key(capsys):
storage_generate_encryption_key.generate_encryption_key()
out, _ = capsys.readouterr()
encoded_key = out.split(":", 1).pop().strip()
key = base64.b64decode(encoded_key)
assert len(key) == 32, "Returned key should be 32 bytes"
def test_upload_encrypted_blob():
blob_name = f"test_upload_encrypted_{uuid.uuid4().hex}"
with tempfile.NamedTemporaryFile() as source_file:
source_file.write(b"test")
storage_upload_encrypted_file.upload_encrypted_blob(
BUCKET,
source_file.name,
blob_name,
TEST_ENCRYPTION_KEY,
)
bucket = storage.Client().bucket(BUCKET)
bucket.delete_blob(blob_name)
@pytest.fixture(scope="module")
def test_blob():
"""Provides a pre-existing blob in the test bucket."""
bucket = storage.Client().bucket(BUCKET)
blob_name = f"test_blob_{uuid.uuid4().hex}"
blob = Blob(
blob_name,
bucket,
encryption_key=TEST_ENCRYPTION_KEY_DECODED,
)
content = "Hello, is it me you're looking for?"
blob.upload_from_string(content)
yield blob.name, content
# To delete an encrypted blob, you have to provide the same key
# used for the blob. When you provide a wrong key, you'll get
# NotFound.
try:
# Clean up for the case that the rotation didn't occur.
blob.delete()
except NotFound as e:
# For the case that the rotation succeeded.
print(f"Ignoring 404, detail: {e}")
blob = Blob(
blob_name,
bucket,
encryption_key=TEST_ENCRYPTION_KEY_2_DECODED
)
blob.delete()
def test_download_blob(test_blob):
test_blob_name, test_blob_content = test_blob
with tempfile.NamedTemporaryFile() as dest_file:
storage_download_encrypted_file.download_encrypted_blob(
BUCKET, test_blob_name, dest_file.name, TEST_ENCRYPTION_KEY
)
downloaded_content = dest_file.read().decode("utf-8")
assert downloaded_content == test_blob_content
def test_rotate_encryption_key(test_blob):
test_blob_name, test_blob_content = test_blob
storage_rotate_encryption_key.rotate_encryption_key(
BUCKET, test_blob_name, TEST_ENCRYPTION_KEY, TEST_ENCRYPTION_KEY_2
)
with tempfile.NamedTemporaryFile() as dest_file:
storage_download_encrypted_file.download_encrypted_blob(
BUCKET, test_blob_name, dest_file.name, TEST_ENCRYPTION_KEY_2
)
downloaded_content = dest_file.read().decode("utf-8")
assert downloaded_content == test_blob_content
def test_object_csek_to_cmek(test_blob):
test_blob_name, test_blob_content = test_blob
cmek_blob = storage_object_csek_to_cmek.object_csek_to_cmek(
BUCKET, test_blob_name, TEST_ENCRYPTION_KEY_2, KMS_KEY
)
assert cmek_blob.download_as_bytes(), test_blob_content