|
16 | 16 | # under the License. |
17 | 17 |
|
18 | 18 | import json |
19 | | -import logging |
20 | | -import os |
21 | | -import tempfile |
22 | | -import zipfile |
| 19 | +from typing import Any, Union |
23 | 20 |
|
24 | 21 |
|
25 | | -LOGGER = logging.getLogger(__name__) |
26 | | - |
27 | | - |
28 | | -def format_json(json_struct): |
29 | | - return json.dumps(json_struct, indent=4) |
30 | | - |
31 | | - |
32 | | -def dump_json(json_struct): |
| 22 | +def dump_json(json_struct: Any) -> str: |
33 | 23 | return json.dumps(json_struct) |
34 | 24 |
|
35 | 25 |
|
36 | | -def load_json(s): |
| 26 | +def load_json(s: Union[str, bytes]) -> Any: |
37 | 27 | return json.loads(s) |
38 | | - |
39 | | - |
40 | | -def unzip_to_temp_dir(zip_file_name): |
41 | | - """Unzip zipfile to a temporary directory. |
42 | | -
|
43 | | - The directory of the unzipped files is returned if success, |
44 | | - otherwise None is returned. """ |
45 | | - if not zip_file_name or not os.path.exists(zip_file_name): |
46 | | - return None |
47 | | - |
48 | | - zf = zipfile.ZipFile(zip_file_name) |
49 | | - |
50 | | - if zf.testzip(): |
51 | | - return None |
52 | | - |
53 | | - # Unzip the files into a temporary directory |
54 | | - LOGGER.info("Extracting zipped file: %s" % zip_file_name) |
55 | | - tempdir = tempfile.mkdtemp() |
56 | | - |
57 | | - try: |
58 | | - # Create directories that don't exist |
59 | | - for zip_name in zf.namelist(): |
60 | | - # We have no knowledge on the os where the zipped file was |
61 | | - # created, so we restrict to zip files with paths without |
62 | | - # character "\" and "/". |
63 | | - name = (zip_name.replace("\\", os.path.sep). |
64 | | - replace("/", os.path.sep)) |
65 | | - dest = os.path.join(tempdir, name) |
66 | | - if (name.endswith(os.path.sep) and not os.path.exists(dest)): |
67 | | - os.mkdir(dest) |
68 | | - LOGGER.debug("Directory %s created." % dest) |
69 | | - |
70 | | - # Copy files |
71 | | - for zip_name in zf.namelist(): |
72 | | - # We have no knowledge on the os where the zipped file was |
73 | | - # created, so we restrict to zip files with paths without |
74 | | - # character "\" and "/". |
75 | | - name = (zip_name.replace("\\", os.path.sep). |
76 | | - replace("/", os.path.sep)) |
77 | | - dest = os.path.join(tempdir, name) |
78 | | - if not (name.endswith(os.path.sep)): |
79 | | - LOGGER.debug("Copying file %s......" % dest) |
80 | | - outfile = open(dest, 'wb') |
81 | | - outfile.write(zf.read(zip_name)) |
82 | | - outfile.close() |
83 | | - LOGGER.debug("File %s copied." % dest) |
84 | | - |
85 | | - LOGGER.info("Unzipped file can be found at %s" % tempdir) |
86 | | - return tempdir |
87 | | - |
88 | | - except IOError as err: |
89 | | - LOGGER.error("Error in extracting webdriver.xpi: %s" % err) |
90 | | - return None |
0 commit comments