-
Notifications
You must be signed in to change notification settings - Fork 3.6k
Description
Motivation
As we all know, a namespace bundle may contain lots of topic partitions belonging to different topics.
The throughput of these topics may vary greatly. Some topics may have a very high rate/throughput while other topics have a very low rate/throughput.
These partitions with high rate/throughput can cause broker overload and bundle unloading.
At this point, if we split bundle manually with range_equally_divide
or topic_count_equally_divide
split algorithm, there may need many times split before these high rate/through partitions assigned to different bundles.
For convenience, we call these high throughput topics outstanding topic
and their partitions outstanding partition
in this PIP.
Goal
Our goal is to make it easier to split outstanding partition
into new bundles. So we raised up this PIP to introduce a more flexible algorithm to split namespace bundle.
The main idea is, for topics in a bundle, we can get their hash position for every topic first. After getting these hash positions, it's much easier for us to decide the position to split the bundle. We can split the bundle into either two throughput-equally bundles or multi throughput-equally bundles.
For example, there is bundle with boundaries 0x00000000
to 0x00000200
, and four topics : t1
, t2
, t3
, t4
.
Step one. Get the hash position of these topics
t1
with hashcode 10
t2
with hashcode 20
t3
with hashcode 80
t4
with hashcode 90
Step two. Split the bundle
Here we have multi choices, like :
- split the bundle into two topics equally bundles like the
topic_count_equally_divide
way, we can split at position between 21 ~ 80 - split the bundle into four bundles and each bundle has one topic, we can split at the positions 15, 50, 85
- split base on topic's throughput
- ...
API Changes
We need two API changes for this PIP.
- Add a new API to get the positions for one ore more topics
/**
* Get positions for topic list in a bundle.
*
* @param namespace
* @param bundle range of bundle
* @param topicList
* @return hash positions for all topics in topicList
* @throws PulsarAdminException
*/
TopicHashPositions getTopicHashPositions(String namespace, String bundle, List<String> topicList) throws PulsarAdminException;
- Change the bundle split API to supporting split bundle at one or more specified hash positions
/**
* Split namespace bundle.
*
* @param namespace
* @param bundle range of bundle to split
* @param unloadSplitBundles
* @param splitAlgorithmName
* @param splitBoundaries
* @throws PulsarAdminException
*/
void splitNamespaceBundle(String namespace, String bundle, boolean unloadSplitBundles,
String splitAlgorithmName, List<Long> splitBoundaries) throws PulsarAdminException;
Implementation
New API for getting topics positions
Add a new admin command GetTopicHashPositions
for CmdNamespaces
,
private class GetTopicHashPositions extends CliCommand {
@Parameter(
names = { "--bundle", "-b" },
description = "{start-boundary}_{end-boundary} format namespace bundle",
required = false)
private String bundle;
@Parameter(
names = { "--topic-list", "-tl" },
description = "The list of topics to get posisions in this bunel",
required = false)
private List<String> topicList;
}
Add a new GET method getTopicHashPositions
for Namespaces
@GET
@Path("/{tenant}/{namespace}/{bundle}/topicHashPositions")
@ApiOperation(value = "Get hash positions for topics")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Namespace does not exist")})
public TopicHashPositions getTopicHashPositions(
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("topicList") List<String> topicList) {
validateNamespaceName(tenant, namespace);
return internalGetTopicHashPositions(bundleRange, new ArrayList<>(topicList));
}
Add support for the split bundle by specified hash positions
Change the admin API to support split bundle by specified hash positions(split boundaries) in CmdNamespaces
,
private class SplitBundle extends CliCommand {
@Parameter(names = { "--split-algorithm-name", "-san" }, description = "Algorithm name for split "
+ "namespace bundle. Valid options are: [range_equally_divide, topic_count_equally_divide, "
+ "specified_positions_divide]. Use broker side config if absent", required = false)
private String splitAlgorithmName;
@Parameter(names = { "--split-boundaries",
"-sb" }, description = "Specified split boundary for bundle split, will split one bundle "
+ "to multi bundles only works with specified_positions_divide algorithm", required = false)
private List<Long> splitBoundaries;
Change the method of Namespaces
, adding a parameter for split boundaries.
public void splitNamespaceBundle(
@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@QueryParam("unload") @DefaultValue("false") boolean unload,
@QueryParam("splitAlgorithmName") String splitAlgorithmName,
@QueryParam("splitBoundaries") List<Long> splitBoundaries) {
For code consistency, encapsulates all the parameters for bundle split into a new class BundleSplitOption
public class BundleSplitOption {
private NamespaceService service;
private NamespaceBundle bundle;
private List<Long> positions;
}
Then add a new NamespaceBundleSplitAlgorithm
named SpecifiedPositionsBundleSplitAlgorithm
which can valid the split boundaries and return the final split boundaries.
public class SpecifiedPositionsBundleSplitAlgorithm implements NamespaceBundleSplitAlgorithm{
@Override
public CompletableFuture<List<Long>> getSplitBoundary(BundleSplitOption bundleSplitOption) {
NamespaceService service = bundleSplitOption.getService();
NamespaceBundle bundle = bundleSplitOption.getBundle();
List<Long> positions = bundleSplitOption.getPositions();
if (positions == null || positions.size() == 0) {
return CompletableFuture.completedFuture(null);
}
// sort all positions
Collections.sort(positions);
return service.getOwnedTopicListForNamespaceBundle(bundle).thenCompose(topics -> {
if (topics == null || topics.size() <= 1) {
return CompletableFuture.completedFuture(null);
}
List<Long> splitBoundaries = positions
.stream()
.filter(position -> position > bundle.getLowerEndpoint() && position < bundle.getUpperEndpoint())
.collect(Collectors.toList());
if (splitBoundaries.size() == 0) {
return CompletableFuture.completedFuture(null);
}
return CompletableFuture.completedFuture(splitBoundaries);
});
}
}
Also, add the new bundle split algorithm to conf/broker.conf
supportedNamespaceBundleSplitAlgorithms=range_equally_divide,topic_count_equally_divide,specified_positions_divide
Reject Alternatives
Splitting the bundle by outstanding topic
which will split the bundle into two new bundles and each new bundle contains an equally outstanding partition
once a time. This algorithm has a disadvantage, it can only deal with one outstanding topic
.