Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 1 | # Copyright 2023 The Chromium Authors |
| 2 | # Use of this source code is governed by a BSD-style license that can be |
| 3 | # found in the LICENSE file. |
| 4 | """Helper functions for dealing with .zip files.""" |
| 5 | |
| 6 | import os |
| 7 | import pathlib |
| 8 | import posixpath |
| 9 | import stat |
| 10 | import time |
| 11 | import zipfile |
| 12 | |
| 13 | _FIXED_ZIP_HEADER_LEN = 30 |
| 14 | |
| 15 | |
| 16 | def _set_alignment(zip_obj, zip_info, alignment): |
| 17 | """Sets a ZipInfo's extra field such that the file will be aligned. |
| 18 | |
| 19 | Args: |
| 20 | zip_obj: The ZipFile object that is being written. |
| 21 | zip_info: The ZipInfo object about to be written. |
| 22 | alignment: The amount of alignment (e.g. 4, or 4*1024). |
| 23 | """ |
| 24 | header_size = _FIXED_ZIP_HEADER_LEN + len(zip_info.filename) |
| 25 | pos = zip_obj.fp.tell() + header_size |
| 26 | padding_needed = (alignment - (pos % alignment)) % alignment |
| 27 | |
| 28 | # Python writes |extra| to both the local file header and the central |
| 29 | # directory's file header. Android's zipalign tool writes only to the |
| 30 | # local file header, so there is more overhead in using Python to align. |
| 31 | zip_info.extra = b'\0' * padding_needed |
| 32 | |
| 33 | |
| 34 | def _hermetic_date_time(timestamp=None): |
| 35 | if not timestamp: |
| 36 | return (2001, 1, 1, 0, 0, 0) |
| 37 | utc_time = time.gmtime(timestamp) |
| 38 | return (utc_time.tm_year, utc_time.tm_mon, utc_time.tm_mday, utc_time.tm_hour, |
| 39 | utc_time.tm_min, utc_time.tm_sec) |
| 40 | |
| 41 | |
| 42 | def add_to_zip_hermetic(zip_file, |
| 43 | zip_path, |
| 44 | *, |
| 45 | src_path=None, |
| 46 | data=None, |
| 47 | compress=None, |
| 48 | alignment=None, |
| 49 | timestamp=None): |
| 50 | """Adds a file to the given ZipFile with a hard-coded modified time. |
| 51 | |
| 52 | Args: |
| 53 | zip_file: ZipFile instance to add the file to. |
| 54 | zip_path: Destination path within the zip file (or ZipInfo instance). |
| 55 | src_path: Path of the source file. Mutually exclusive with |data|. |
| 56 | data: File data as a string. |
| 57 | compress: Whether to enable compression. Default is taken from ZipFile |
| 58 | constructor. |
| 59 | alignment: If set, align the data of the entry to this many bytes. |
| 60 | timestamp: The last modification date and time for the archive member. |
| 61 | """ |
| 62 | assert (src_path is None) != (data is None), ( |
| 63 | '|src_path| and |data| are mutually exclusive.') |
| 64 | if isinstance(zip_path, zipfile.ZipInfo): |
| 65 | zipinfo = zip_path |
| 66 | zip_path = zipinfo.filename |
| 67 | else: |
| 68 | zipinfo = zipfile.ZipInfo(filename=zip_path) |
| 69 | zipinfo.external_attr = 0o644 << 16 |
| 70 | |
| 71 | zipinfo.date_time = _hermetic_date_time(timestamp) |
| 72 | |
| 73 | if alignment: |
| 74 | _set_alignment(zip_file, zipinfo, alignment) |
| 75 | |
| 76 | # Filenames can contain backslashes, but it is more likely that we've |
| 77 | # forgotten to use forward slashes as a directory separator. |
| 78 | assert '\\' not in zip_path, 'zip_path should not contain \\: ' + zip_path |
| 79 | assert not posixpath.isabs(zip_path), 'Absolute zip path: ' + zip_path |
| 80 | assert not zip_path.startswith('..'), 'Should not start with ..: ' + zip_path |
| 81 | assert posixpath.normpath(zip_path) == zip_path, ( |
| 82 | f'Non-canonical zip_path: {zip_path} vs: {posixpath.normpath(zip_path)}') |
| 83 | assert zip_path not in zip_file.namelist(), ( |
| 84 | 'Tried to add a duplicate zip entry: ' + zip_path) |
| 85 | |
| 86 | if src_path and os.path.islink(src_path): |
| 87 | zipinfo.external_attr |= stat.S_IFLNK << 16 # mark as a symlink |
| 88 | zip_file.writestr(zipinfo, os.readlink(src_path)) |
| 89 | return |
| 90 | |
| 91 | # Maintain the executable bit. |
| 92 | if src_path: |
| 93 | st = os.stat(src_path) |
| 94 | for mode in (stat.S_IXUSR, stat.S_IXGRP, stat.S_IXOTH): |
| 95 | if st.st_mode & mode: |
| 96 | zipinfo.external_attr |= mode << 16 |
| 97 | |
| 98 | if src_path: |
| 99 | with open(src_path, 'rb') as f: |
| 100 | data = f.read() |
| 101 | |
| 102 | # zipfile will deflate even when it makes the file bigger. To avoid |
| 103 | # growing files, disable compression at an arbitrary cut off point. |
| 104 | if len(data) < 16: |
| 105 | compress = False |
| 106 | |
| 107 | # None converts to ZIP_STORED, when passed explicitly rather than the |
| 108 | # default passed to the ZipFile constructor. |
| 109 | compress_type = zip_file.compression |
| 110 | if compress is not None: |
| 111 | compress_type = zipfile.ZIP_DEFLATED if compress else zipfile.ZIP_STORED |
| 112 | zip_file.writestr(zipinfo, data, compress_type) |
| 113 | |
| 114 | |
| 115 | def add_files_to_zip(inputs, |
| 116 | output, |
| 117 | *, |
| 118 | base_dir=None, |
Andrew Grieve | 8618069 | 2025-04-24 13:50:10 | [diff] [blame] | 119 | path_transform=None, |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 120 | compress=None, |
| 121 | zip_prefix_path=None, |
| 122 | timestamp=None): |
| 123 | """Creates a zip file from a list of files. |
| 124 | |
| 125 | Args: |
| 126 | inputs: A list of paths to zip, or a list of (zip_path, fs_path) tuples. |
| 127 | output: Path, fileobj, or ZipFile instance to add files to. |
| 128 | base_dir: Prefix to strip from inputs. |
Andrew Grieve | 8618069 | 2025-04-24 13:50:10 | [diff] [blame] | 129 | path_transform: Called for each entry path. Returns a new zip path, or None |
| 130 | to skip the file. |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 131 | compress: Whether to compress |
| 132 | zip_prefix_path: Path prepended to file path in zip file. |
| 133 | timestamp: Unix timestamp to use for files in the archive. |
| 134 | """ |
| 135 | if base_dir is None: |
| 136 | base_dir = '.' |
| 137 | input_tuples = [] |
| 138 | for tup in inputs: |
| 139 | if isinstance(tup, str): |
| 140 | src_path = tup |
| 141 | zip_path = os.path.relpath(src_path, base_dir) |
| 142 | # Zip files always use / as path separator. |
| 143 | if os.path.sep != posixpath.sep: |
| 144 | zip_path = str(pathlib.Path(zip_path).as_posix()) |
| 145 | tup = (zip_path, src_path) |
| 146 | input_tuples.append(tup) |
| 147 | |
| 148 | # Sort by zip path to ensure stable zip ordering. |
| 149 | input_tuples.sort(key=lambda tup: tup[0]) |
| 150 | |
| 151 | out_zip = output |
| 152 | if not isinstance(output, zipfile.ZipFile): |
| 153 | out_zip = zipfile.ZipFile(output, 'w') |
| 154 | |
| 155 | try: |
| 156 | for zip_path, fs_path in input_tuples: |
| 157 | if zip_prefix_path: |
| 158 | zip_path = posixpath.join(zip_prefix_path, zip_path) |
Andrew Grieve | 8618069 | 2025-04-24 13:50:10 | [diff] [blame] | 159 | if path_transform: |
| 160 | zip_path = path_transform(zip_path) |
| 161 | if zip_path is None: |
| 162 | continue |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 163 | add_to_zip_hermetic(out_zip, |
| 164 | zip_path, |
| 165 | src_path=fs_path, |
| 166 | compress=compress, |
| 167 | timestamp=timestamp) |
| 168 | finally: |
| 169 | if output is not out_zip: |
| 170 | out_zip.close() |
| 171 | |
| 172 | |
| 173 | def zip_directory(output, base_dir, **kwargs): |
| 174 | """Zips all files in the given directory.""" |
| 175 | inputs = [] |
| 176 | for root, _, files in os.walk(base_dir): |
| 177 | for f in files: |
| 178 | inputs.append(os.path.join(root, f)) |
| 179 | |
| 180 | add_files_to_zip(inputs, output, base_dir=base_dir, **kwargs) |
| 181 | |
| 182 | |
| 183 | def merge_zips(output, input_zips, path_transform=None, compress=None): |
| 184 | """Combines all files from |input_zips| into |output|. |
| 185 | |
| 186 | Args: |
| 187 | output: Path, fileobj, or ZipFile instance to add files to. |
| 188 | input_zips: Iterable of paths to zip files to merge. |
Andrew Grieve | 8618069 | 2025-04-24 13:50:10 | [diff] [blame] | 189 | path_transform: Called for each entry path. Returns a new zip path, or None |
| 190 | to skip the file. |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 191 | compress: Overrides compression setting from origin zip entries. |
| 192 | """ |
| 193 | assert not isinstance(input_zips, str) # Easy mistake to make. |
Andrew Grieve | 60efd26f | 2023-04-17 16:19:07 | [diff] [blame] | 194 | if isinstance(output, zipfile.ZipFile): |
| 195 | out_zip = output |
| 196 | out_filename = output.filename |
| 197 | else: |
| 198 | assert isinstance(output, str), 'Was: ' + repr(output) |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 199 | out_zip = zipfile.ZipFile(output, 'w') |
Andrew Grieve | 60efd26f | 2023-04-17 16:19:07 | [diff] [blame] | 200 | out_filename = output |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 201 | |
| 202 | # Include paths in the existing zip here to avoid adding duplicate files. |
Andrew Grieve | 60efd26f | 2023-04-17 16:19:07 | [diff] [blame] | 203 | crc_by_name = {i.filename: (out_filename, i.CRC) for i in out_zip.infolist()} |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 204 | |
| 205 | try: |
| 206 | for in_file in input_zips: |
| 207 | with zipfile.ZipFile(in_file, 'r') as in_zip: |
| 208 | for info in in_zip.infolist(): |
| 209 | # Ignore directories. |
| 210 | if info.filename[-1] == '/': |
| 211 | continue |
| 212 | if path_transform: |
| 213 | dst_name = path_transform(info.filename) |
| 214 | if dst_name is None: |
| 215 | continue |
| 216 | else: |
| 217 | dst_name = info.filename |
| 218 | |
Andrew Grieve | 60efd26f | 2023-04-17 16:19:07 | [diff] [blame] | 219 | data = in_zip.read(info) |
| 220 | |
| 221 | # If there's a duplicate file, ensure contents is the same and skip |
| 222 | # adding it multiple times. |
| 223 | if dst_name in crc_by_name: |
| 224 | orig_filename, orig_crc = crc_by_name[dst_name] |
| 225 | new_crc = zipfile.crc32(data) |
| 226 | if new_crc == orig_crc: |
| 227 | continue |
| 228 | msg = f"""File appeared in multiple inputs with differing contents. |
| 229 | File: {dst_name} |
| 230 | Input1: {orig_filename} |
| 231 | Input2: {in_file}""" |
| 232 | raise Exception(msg) |
| 233 | |
| 234 | if compress is not None: |
| 235 | compress_entry = compress |
| 236 | else: |
| 237 | compress_entry = info.compress_type != zipfile.ZIP_STORED |
| 238 | add_to_zip_hermetic(out_zip, |
| 239 | dst_name, |
| 240 | data=data, |
| 241 | compress=compress_entry) |
| 242 | crc_by_name[dst_name] = (in_file, out_zip.getinfo(dst_name).CRC) |
Andrew Grieve | 3dec346 | 2023-03-31 20:31:29 | [diff] [blame] | 243 | finally: |
| 244 | if output is not out_zip: |
| 245 | out_zip.close() |