forked from pytorch/ci-infra
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsend_scale_message.py
More file actions
92 lines (79 loc) · 2.35 KB
/
Copy pathsend_scale_message.py
File metadata and controls
92 lines (79 loc) · 2.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
import argparse
import json
import uuid
import itertools
from dataclasses import dataclass
from random import randint
import boto3
@dataclass
class ScaleQueue:
installation_id: str
queue_name: str
region: str
SUPPORTED_SCALE_QUEUES = {
"pytorch/pytorch": ScaleQueue(
installation_id="51323823",
queue_name="ghci-lf-queued-builds",
region="us-east-1",
),
"pytorch/pytorch-canary": ScaleQueue(
installation_id="51321276",
queue_name="ghci-lf-c-queued-builds",
region="us-east-1",
),
}
def send_scale_message(queue, repository: str, runner_type: str, installation_id: str):
message_body = json.dumps(
{
"id": str(uuid.uuid4()),
"eventType": "workflow_job",
"repositoryName": repository.split("/")[1],
"repositoryOwner": repository.split("/")[0],
"installationId": installation_id,
"runnerLabels": [runner_type],
"callbackUrl": "MANUAL SCALE EVENT",
}
)
print(f"Sending message: {message_body}")
queue.send_message(
MessageBody=message_body,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Clear offline self hosted runners for Github repositories"
)
parser.add_argument(
"repo",
help="Repository to remove offline self hosted runners for, (ex. pytorch/pytorch)",
type=str,
choices=SUPPORTED_SCALE_QUEUES.keys(),
)
parser.add_argument(
"runner_type",
help="Runner type to scale for",
type=str,
)
parser.add_argument(
"--scale-by",
help="Number of scale messages to send",
type=int,
default=10,
)
options = parser.parse_args()
return options
def main():
options = parse_args()
scale_queue_info = SUPPORTED_SCALE_QUEUES.get(options.repo)
if scale_queue_info is None:
exit(1)
sqs = boto3.resource("sqs", region_name=scale_queue_info.region)
queue = sqs.get_queue_by_name(QueueName=scale_queue_info.queue_name)
for _ in range(options.scale_by):
send_scale_message(
queue=queue,
repository=options.repo,
runner_type=options.runner_type,
installation_id=scale_queue_info.installation_id,
)
if __name__ == "__main__":
main()