-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathdbschemegen.py
executable file
·162 lines (140 loc) · 6.27 KB
/
dbschemegen.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
"""
dbscheme file generation
`generate(opts, renderer)` will generate a `dbscheme` file out of a `yml` schema file.
Each final class in the schema file will get a corresponding defining DB table with the id and single properties as
columns.
Moreover:
* single properties in non-final classes will also trigger generation of a table with an id reference and all single
properties as columns
* each optional property will trigger generation of a table with an id reference and the property value as columns
* each repeated property will trigger generation of a table with an id reference, an `int` index and the property value
as columns
The type hierarchy will be translated to corresponding `union` declarations.
"""
import typing
import inflection
from misc.codegen.lib import schema
from misc.codegen.loaders import schemaloader
from misc.codegen.lib.dbscheme import *
log = logging.getLogger(__name__)
class Error(Exception):
pass
def dbtype(typename: str, add_or_none_except: typing.Optional[str] = None) -> str:
""" translate a type to a dbscheme counterpart, using `@lower_underscore` format for classes.
For class types, appends an underscore followed by `null` if provided
"""
if typename[0].isupper():
underscored = inflection.underscore(typename)
if add_or_none_except is not None and typename != add_or_none_except:
suffix = "_or_none"
else:
suffix = ""
return f"@{underscored}{suffix}"
return typename
def cls_to_dbscheme(cls: schema.Class, lookup: typing.Dict[str, schema.Class], add_or_none_except: typing.Optional[str] = None):
""" Yield all dbscheme entities needed to model class `cls` """
if cls.synth:
return
if cls.derived:
yield Union(dbtype(cls.name), (dbtype(c) for c in cls.derived if not lookup[c].synth))
dir = pathlib.Path(cls.group) if cls.group else None
# output a table specific to a class only if it is a leaf class or it has 1-to-1 properties
# Leaf classes need a table to bind the `@` ids
# 1-to-1 properties are added to a class specific table
# in other cases, separate tables are used for the properties, and a class specific table is unneeded
if not cls.derived or any(f.is_single and not f.synth for f in cls.properties):
binding = not cls.derived
keyset = KeySet(["id"]) if cls.derived else None
yield Table(
keyset=keyset,
name=inflection.tableize(cls.name),
columns=[
Column("id", type=dbtype(cls.name), binding=binding),
] + [
Column(f.name, dbtype(f.type, add_or_none_except))
for f in cls.properties if f.is_single and not f.synth
],
dir=dir,
)
# use property-specific tables for 1-to-many and 1-to-at-most-1 properties
for f in cls.properties:
overridden_table_name = f.pragmas.get("ql_db_table_name")
if f.synth:
continue
if f.is_unordered:
yield Table(
name=overridden_table_name or inflection.tableize(f"{cls.name}_{f.name}"),
columns=[
Column("id", type=dbtype(cls.name)),
Column(inflection.singularize(f.name), dbtype(f.type, add_or_none_except)),
],
dir=dir,
)
elif f.is_repeated:
yield Table(
keyset=KeySet(["id", "index"]),
name=overridden_table_name or inflection.tableize(f"{cls.name}_{f.name}"),
columns=[
Column("id", type=dbtype(cls.name)),
Column("index", type="int"),
Column(inflection.singularize(f.name), dbtype(f.type, add_or_none_except)),
],
dir=dir,
)
elif f.is_optional:
yield Table(
keyset=KeySet(["id"]),
name=overridden_table_name or inflection.tableize(f"{cls.name}_{f.name}"),
columns=[
Column("id", type=dbtype(cls.name)),
Column(f.name, dbtype(f.type, add_or_none_except)),
],
dir=dir,
)
elif f.is_predicate:
yield Table(
keyset=KeySet(["id"]),
name=overridden_table_name or inflection.underscore(f"{cls.name}_{f.name}"),
columns=[
Column("id", type=dbtype(cls.name)),
],
dir=dir,
)
def check_name_conflicts(decls: list[Table | Union]):
names = set()
for decl in decls:
match decl:
case Table(name=name):
if name in names:
raise Error(f"Duplicate table name: {
name}, you can use `@ql.db_table_name` on a property to resolve this")
names.add(name)
def get_declarations(data: schema.Schema):
add_or_none_except = data.root_class.name if data.null else None
declarations = [d for cls in data.classes.values() if not cls.imported for d in cls_to_dbscheme(cls,
data.classes, add_or_none_except)]
if data.null:
property_classes = {
prop.type for cls in data.classes.values() for prop in cls.properties
if cls.name != data.null and prop.type and prop.type[0].isupper()
}
declarations += [
Union(dbtype(t, data.null), [dbtype(t), dbtype(data.null)]) for t in sorted(property_classes)
]
check_name_conflicts(declarations)
return declarations
def get_includes(data: schema.Schema, include_dir: pathlib.Path, root_dir: pathlib.Path):
includes = []
for inc in data.includes:
inc = include_dir / inc
with open(inc) as inclusion:
includes.append(SchemeInclude(src=inc.relative_to(root_dir), data=inclusion.read()))
return includes
def generate(opts, renderer):
input = opts.schema
out = opts.dbscheme
data = schemaloader.load_file(input)
dbscheme = Scheme(src=input.name,
includes=get_includes(data, include_dir=input.parent, root_dir=input.parent),
declarations=get_declarations(data))
renderer.render(dbscheme, out)