-
Notifications
You must be signed in to change notification settings - Fork 796
/
Copy pathprepare_env.py
200 lines (154 loc) · 5.89 KB
/
prepare_env.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
"""Logic for installing dependencies in Pyodide.
Mostly taken from https://github.com/pydantic/pydantic.run/blob/main/src/frontend/src/prepare_env.py
"""
from __future__ import annotations as _annotations
import importlib
import logging
import re
import sys
import traceback
from collections.abc import Iterable, Iterator
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal, TypedDict
import micropip
import pyodide_js
import tomllib
from pyodide.code import find_imports
__all__ = 'prepare_env', 'dump_json'
class File(TypedDict):
name: str
content: str
active: bool
@dataclass
class Success:
dependencies: list[str] | None
kind: Literal['success'] = 'success'
@dataclass
class Error:
message: str
kind: Literal['error'] = 'error'
async def prepare_env(files: list[File]) -> Success | Error:
sys.setrecursionlimit(400)
cwd = Path.cwd()
for file in files:
(cwd / file['name']).write_text(file['content'])
active: File | None = next((f for f in files if f['active']), None)
dependencies: list[str] | None = None
if active:
python_code = active['content']
dependencies = _find_pep723_dependencies(python_code)
if dependencies is None:
dependencies = await _find_import_dependencies(python_code)
if dependencies:
dependencies = _add_extra_dependencies(dependencies)
with _micropip_logging() as logs_filename:
try:
await micropip.install(dependencies, keep_going=True)
importlib.invalidate_caches()
except Exception:
with open(logs_filename) as f:
logs = f.read()
return Error(message=f'{logs} {traceback.format_exc()}')
return Success(dependencies=dependencies)
def dump_json(value: Any) -> str | None:
from pydantic_core import to_json
if value is None:
return None
if isinstance(value, str):
return value
else:
return to_json(value, indent=2, fallback=_json_fallback).decode()
def _json_fallback(value: Any) -> Any:
tp: Any = type(value)
module = tp.__module__
if module == 'numpy':
if tp.__name__ in {'ndarray', 'matrix'}:
return value.tolist()
else:
return value.item()
elif module == 'pyodide.ffi':
return value.to_py()
else:
return repr(value)
def _add_extra_dependencies(dependencies: list[str]) -> list[str]:
"""Add extra dependencies we know some packages need.
Workaround for micropip not installing some required transitive dependencies.
See https://github.com/pyodide/micropip/issues/204
pygments seems to be required to get rich to work properly, ssl is required for FastAPI and HTTPX,
pydantic_ai requires newest typing_extensions.
"""
extras: list[str] = []
for d in dependencies:
if d.startswith(('logfire', 'rich')):
extras.append('pygments')
elif d.startswith(('fastapi', 'httpx', 'pydantic_ai')):
extras.append('ssl')
if d.startswith('pydantic_ai'):
extras.append('typing_extensions>=4.12')
if len(extras) == 3:
break
return dependencies + extras
@contextmanager
def _micropip_logging() -> Iterator[str]:
from micropip import logging as micropip_logging
micropip_logging.setup_logging()
logger = logging.getLogger('micropip')
logger.handlers.clear()
logger.setLevel(logging.INFO)
file_name = 'micropip.log'
handler = logging.FileHandler(file_name)
handler.setLevel(logging.INFO)
handler.setFormatter(logging.Formatter('%(message)s'))
logger.addHandler(handler)
try:
yield file_name
finally:
logger.removeHandler(handler)
def _find_pep723_dependencies(code: str) -> list[str] | None:
"""Extract dependencies from a script with PEP 723 metadata."""
metadata = _read_pep723_metadata(code)
dependencies: list[str] | None = metadata.get('dependencies')
if dependencies is None:
return None
else:
assert isinstance(dependencies, list), 'dependencies must be a list'
assert all(isinstance(dep, str) for dep in dependencies), 'dependencies must be a list of strings'
return dependencies
def _read_pep723_metadata(code: str) -> dict[str, Any]:
"""Read PEP 723 script metadata.
Copied from https://packaging.python.org/en/latest/specifications/inline-script-metadata/#reference-implementation
"""
name = 'script'
magic_comment_regex = r'(?m)^# /// (?P<type>[a-zA-Z0-9-]+)$\s(?P<content>(^#(| .*)$\s)+)^# ///$'
matches = list(filter(lambda m: m.group('type') == name, re.finditer(magic_comment_regex, code)))
if len(matches) > 1:
raise ValueError(f'Multiple {name} blocks found')
elif len(matches) == 1:
content = ''.join(
line[2:] if line.startswith('# ') else line[1:]
for line in matches[0].group('content').splitlines(keepends=True)
)
return tomllib.loads(content)
else:
return {}
async def _find_import_dependencies(code: str) -> list[str] | None:
"""Find dependencies in imports."""
try:
imports: list[str] = find_imports(code)
except SyntaxError:
return None
else:
return list(_find_imports_to_install(imports))
TO_PACKAGE_NAME: dict[str, str] = pyodide_js._api._import_name_to_package_name.to_py() # pyright: ignore[reportPrivateUsage]
def _find_imports_to_install(imports: list[str]) -> Iterable[str]:
"""Given a list of module names being imported, return packages that are not installed."""
for module in imports:
try:
importlib.import_module(module)
except ModuleNotFoundError:
if package_name := TO_PACKAGE_NAME.get(module):
yield package_name
elif '.' not in module:
yield module