-
Notifications
You must be signed in to change notification settings - Fork 6.2k
/
Copy pathutils.py
185 lines (157 loc) · 6.66 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import platform
from typing import List
import tree # pip install dm_tree
import ray
from ray.rllib.algorithms.algorithm_config import AlgorithmConfig
from ray.rllib.policy.sample_batch import MultiAgentBatch, SampleBatch
from ray.rllib.utils.actor_manager import FaultAwareApply
from ray.rllib.utils.framework import try_import_torch
from ray.rllib.utils.metrics.metrics_logger import MetricsLogger
from ray.rllib.utils.typing import EpisodeType
from ray.util.annotations import DeveloperAPI
torch, _ = try_import_torch()
@DeveloperAPI(stability="alpha")
class AggregatorActor(FaultAwareApply):
"""Runs episode lists through ConnectorV2 pipeline and creates train batches.
The actor should be co-located with a Learner worker. Ideally, there should be one
or two aggregator actors per Learner worker (having even more per Learner probably
won't help. Then the main process driving the RL algo can perform the following
execution logic:
- query n EnvRunners to sample the environment and return n lists of episodes as
Ray.ObjectRefs.
- remote call the set of aggregator actors (in round-robin fashion) with these
list[episodes] refs in async fashion.
- gather the results asynchronously, as each actor returns refs pointing to
ready-to-go train batches.
- as soon as we have at least one train batch per Learner, call the LearnerGroup
with the (already sharded) refs.
- an aggregator actor - when receiving p refs to List[EpisodeType] - does:
-- ray.get() the actual p lists and concatenate the p lists into one
List[EpisodeType].
-- pass the lists of episodes through its LearnerConnector pipeline
-- buffer the output batches of this pipeline until enough batches have been
collected for creating one train batch (matching the config's
`train_batch_size_per_learner`).
-- concatenate q batches into a train batch and return that train batch.
- the algo main process then passes the ray.ObjectRef to the ready-to-go train batch
to the LearnerGroup for calling each Learner with one train batch.
"""
def __init__(self, config: AlgorithmConfig, rl_module_spec):
self.config = config
# Set device and node.
self._node = platform.node()
self._device = torch.device("cpu")
# TODO (sven): Activate this when Ray has figured out GPU pre-loading.
# self._device = torch.device(
# f"cuda:{ray.get_gpu_ids()[0]}"
# if self.config.num_gpus_per_learner > 0
# else "cpu"
# )
self.metrics = MetricsLogger()
# Create the RLModule.
# TODO (sven): For now, this RLModule (its weights) never gets updated.
# The reason the module is needed is for the connector to know, which
# sub-modules are stateful (and what their initial state tensors are), and
# which IDs the submodules have (to figure out, whether its multi-agent or
# not).
self._module = rl_module_spec.build()
self._module = self._module.as_multi_rl_module()
# Create the Learner connector pipeline.
self._learner_connector = self.config.build_learner_connector(
input_observation_space=None,
input_action_space=None,
device=self._device,
)
def get_batch(self, episode_refs: List[ray.ObjectRef]):
episodes: List[EpisodeType] = []
# It's possible that individual refs are invalid due to the EnvRunner
# that produced the ref has crashed or had its entire node go down.
# In this case, try each ref individually and collect only valid results.
try:
episodes = tree.flatten(ray.get(episode_refs))
except ray.exceptions.OwnerDiedError:
for ref in episode_refs:
try:
episodes.extend(ray.get(ref))
except ray.exceptions.OwnerDiedError:
pass
env_steps = sum(len(e) for e in episodes)
# If we have enough episodes collected to create a single train batch, pass
# them at once through the connector to recieve a single train batch.
batch = self._learner_connector(
episodes=episodes,
rl_module=self._module,
metrics=self.metrics,
)
# Convert to a dict into a `MultiAgentBatch`.
# TODO (sven): Try to get rid of dependency on MultiAgentBatch (once our mini-
# batch iterators support splitting over a dict).
ma_batch = MultiAgentBatch(
policy_batches={
pid: SampleBatch(pol_batch) for pid, pol_batch in batch.items()
},
env_steps=env_steps,
)
return ma_batch
def get_metrics(self):
return self.metrics.reduce()
def _get_env_runner_bundles(config):
return [
{
"CPU": config.num_cpus_per_env_runner,
"GPU": config.num_gpus_per_env_runner,
**config.custom_resources_per_env_runner,
}
for _ in range(config.num_env_runners)
]
def _get_offline_eval_runner_bundles(config):
return [
{
"CPU": config.num_cpus_per_offline_eval_runner,
"GPU": config.num_gpus_per_offline_eval_runner,
**config.custom_resources_per_offline_eval_runner,
}
for _ in range(config.num_offline_eval_runners)
]
def _get_learner_bundles(config):
try:
from ray.rllib.extensions.algorithm_utils import _get_learner_bundles as func
return func(config)
except Exception:
pass
if config.num_learners == 0:
if config.num_aggregator_actors_per_learner > 0:
return [{"CPU": config.num_aggregator_actors_per_learner}]
else:
return []
num_cpus_per_learner = (
config.num_cpus_per_learner
if config.num_cpus_per_learner != "auto"
else 1
if config.num_gpus_per_learner == 0
else 0
)
bundles = [
{
"CPU": config.num_learners
* (num_cpus_per_learner + config.num_aggregator_actors_per_learner),
"GPU": config.num_learners * config.num_gpus_per_learner,
}
]
return bundles
def _get_main_process_bundle(config):
if config.num_learners == 0:
num_cpus_per_learner = (
config.num_cpus_per_learner
if config.num_cpus_per_learner != "auto"
else 1
if config.num_gpus_per_learner == 0
else 0
)
bundle = {
"CPU": max(num_cpus_per_learner, config.num_cpus_for_main_process),
"GPU": config.num_gpus_per_learner,
}
else:
bundle = {"CPU": config.num_cpus_for_main_process, "GPU": 0}
return bundle