-
Notifications
You must be signed in to change notification settings - Fork 200
/
Copy pathalias_computation.py
executable file
·144 lines (120 loc) · 4.67 KB
/
alias_computation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python3
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""OSV alias computation."""
import datetime
import logging
from google.cloud import ndb
import osv
import osv.logs
ALIAS_GROUP_VULN_LIMIT = 32
VULN_ALIASES_LIMIT = 5
def _update_group(bug_ids, alias_group):
"""Updates the alias group in the datastore."""
if len(bug_ids) <= 1:
logging.info('Deleting alias group due to too few bugs: %s', bug_ids)
alias_group.key.delete()
return
if len(bug_ids) > ALIAS_GROUP_VULN_LIMIT:
logging.info('Deleting alias group due to too many bugs: %s', bug_ids)
alias_group.key.delete()
return
if bug_ids == alias_group.bug_ids:
return
alias_group.bug_ids = bug_ids
alias_group.last_modified = datetime.datetime.utcnow()
alias_group.put()
def _create_alias_group(bug_ids):
"""Creates a new alias group in the datastore."""
if len(bug_ids) <= 1:
logging.info('Skipping alias group creation due to too few bugs: %s',
bug_ids)
return
if len(bug_ids) > ALIAS_GROUP_VULN_LIMIT:
logging.info('Skipping alias group creation due to too many bugs: %s',
bug_ids)
return
new_group = osv.AliasGroup(bug_ids=bug_ids)
new_group.last_modified = datetime.datetime.utcnow()
new_group.put()
def _compute_aliases(bug_id, visited, bug_aliases):
"""Computes all aliases for the given bug ID.
The returned list contains the bug ID itself, all the IDs from the bug's
raw aliases, all the IDs of bugs that have the current bug as an alias,
and repeat for every bug encountered here."""
to_visit = {bug_id}
bug_ids = []
while to_visit:
bug_id = to_visit.pop()
if bug_id in visited:
continue
visited.add(bug_id)
bug_ids.append(bug_id)
aliases = bug_aliases.get(bug_id, set())
to_visit.update(aliases - visited)
# Returns a sorted list of bug IDs, which ensures deterministic behaviour
# and avoids unnecessary updates to the groups.
return sorted(bug_ids)
def main():
"""Updates all alias groups in the datastore by re-computing existing
AliasGroups and creating new AliasGroups for un-computed bugs."""
# Query for all bugs that have aliases.
# Use (> '' OR < '') instead of (!= '') / (> '') to de-duplicate results
# and avoid datastore emulator problems, see issue #2093
bugs = osv.Bug.query(ndb.OR(osv.Bug.aliases > '', osv.Bug.aliases < ''))
all_alias_group = osv.AliasGroup.query()
allow_list = {
allow_entry.bug_id for allow_entry in osv.AliasAllowListEntry.query()
}
deny_list = {
deny_entry.bug_id for deny_entry in osv.AliasDenyListEntry.query()
}
# Mapping of ID to a set of all aliases for that bug,
# including its raw aliases and bugs that it is referenced in as an alias.
bug_aliases = {}
# For each bug, add its aliases to the maps and ignore invalid bugs.
for bug in bugs:
if bug.db_id in deny_list:
continue
if len(bug.aliases) > VULN_ALIASES_LIMIT and bug.db_id not in allow_list:
logging.info('%s has too many listed aliases, skipping computation.',
bug.db_id)
continue
if bug.status != osv.BugStatus.PROCESSED:
continue
for alias in bug.aliases:
bug_aliases.setdefault(bug.db_id, set()).add(alias)
bug_aliases.setdefault(alias, set()).add(bug.db_id)
visited = set()
# For each alias group, re-compute the bug IDs in the group and update the
# group with the computed bug IDs.
for alias_group in all_alias_group:
bug_id = alias_group.bug_ids[0] # AliasGroups contain more than one bug.
# If the bug has already been counted in a different alias group,
# we delete the original one to merge two alias groups.
if bug_id in visited:
alias_group.key.delete()
continue
bug_ids = _compute_aliases(bug_id, visited, bug_aliases)
_update_group(bug_ids, alias_group)
# For each bug ID that has not been visited, create new alias groups.
for bug_id in bug_aliases:
if bug_id not in visited:
bug_ids = _compute_aliases(bug_id, visited, bug_aliases)
_create_alias_group(bug_ids)
if __name__ == '__main__':
_ndb_client = ndb.Client()
osv.logs.setup_gcp_logging('alias')
with _ndb_client.context():
main()