storage: add MultiEngineCompactionScheduler#165577
storage: add MultiEngineCompactionScheduler#165577sumeerbhola wants to merge 1 commit intocockroachdb:masterfrom
Conversation
|
Your pull request contains more than 1000 changes. It is strongly encouraged to split big PRs into smaller chunks. 🦉 Hoot! I am a Blathers, a bot for CockroachDB. My owner is dev-inf. |
|
Merging to
|
e820a41 to
0d2e028
Compare
| func (s *MultiEngineCompactionScheduler) Close() { | ||
| if !s.opts.testingDisablePeriodicGranter { | ||
| s.stopPeriodicGranterCh <- struct{}{} |
There was a problem hiding this comment.
Potential deadlock: When RunAsyncTask fails (line 111-118), the periodicGranter goroutine never starts, so no goroutine reads from the unbuffered stopPeriodicGranterCh. Close() unconditionally sends on this channel (line 142) before acquiring s.mu to check s.mu.closed, causing a permanent block. The comment on line 117 ("Close will be a no-op") does not match the implementation.
Suggested fix: track whether the goroutine was started and guard the send accordingly, or use close(s.stopPeriodicGranterCh) which is idiomatic for shutdown signaling and won't block without a reader.
AI Review: Potential Issue(s) DetectedAn inline comment has been added to Summary: When the stopper is already quiescing, If helpful: add |
0d2e028 to
cbefe2a
Compare
cbefe2a to
183831f
Compare
pav-kv
left a comment
There was a problem hiding this comment.
First pass. Could you please run this PR through the readability review agent that @tbg is adding in #165700 - it was making some good suggestions in a test run, e.g.
1. [readability] tryGrantLockedAndUnlock lacks an algorithm roadmap — multi_compaction_scheduler.go:244-329
1. This is the algorithmic core of the PR (85 lines with nested loops, lock
release/re-acquire cycles, snapshot-based optimism for waitingState, and
candidate sorting). The one-line doc says what but not how or why. A reader
new to the subsystem must understand four non-obvious things simultaneously:
why the outer loop re-collects candidates, why mu is released mid-loop, why
only one grant per iteration, and what the snapshot pattern does.
1. Suggestion: Add a brief algorithm sketch before the outer loop:
// Algorithm: In each iteration of the outer loop, we collect one candidate
// per waiting engine (releasing mu to call GetWaitingCompaction), sort them
// by priority, then try them in order until one accepts. We re-collect
// each iteration because granting changes running counts and the picked
// compaction may have been invalidated. The waitingState snapshot prevents
// a concurrent TrySchedule from being silently dropped.
|
|
||
| const ( | ||
| // LogEngine is the engine used for the raft log. | ||
| LogEngine EngineType = iota |
There was a problem hiding this comment.
I have a draft in the work that uses LogEngine and StateEngine names in this package for subsets of Engine interface, to document/constrain how both are used, and better annotate the code [#97618].
Consider using less attractive names for this enum (likely to be used less frequently), like EngineTypeLog/EngineTypeState. I can resolve naming later though if that draft goes forward.
| // granting to complete. | ||
| func (s *MultiEngineCompactionScheduler) Close() { | ||
| if !s.opts.testingDisablePeriodicGranter { | ||
| s.stopPeriodicGranterCh <- struct{}{} |
There was a problem hiding this comment.
AI is right about the bug in this line (deadlock possible).
Closing the channel should be more idiomatic. It also can be done unconditionally (without checking testingDisablePeriodicGranter), unlike the send.
| s := e.scheduler | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| if !e.registered { |
There was a problem hiding this comment.
Can !e.registered happen? From Claude, I gather all pebble.CompactionScheduler calls are enclosed between Register and Unregister, is that right?
Might want to assert here. Also, UpdateGetAllowedWithoutPermission sidesteps this check - is it needed there for consistency?
| e.waiting.value = true | ||
| e.waiting.tryGetCount++ |
There was a problem hiding this comment.
nit: this duplicates the code in the isGranting branch above. Worth adding a small helper to waitingState type:
func (w *waitingState) wait() {
w.value = true
w.tryGetCount++
}| return "" | ||
|
|
||
| case "set-allowed": | ||
| var name string |
There was a problem hiding this comment.
Use dd helpers like the ones here:
cockroach/pkg/kv/kvserver/replica_lifecycle_datadriven_test.go
Lines 191 to 192 in 0c68d18
| @@ -0,0 +1,452 @@ | |||
| # Basic: both engines register, TrySchedule respects limits. | |||
|
|
|||
| init max-concurrency=3 deprioritization=0.5 log-allowed=2 state-allowed=2 | |||
There was a problem hiding this comment.
Mind splitting each test into a separate file?
| if !ok { | ||
| t.Fatalf("unknown engine %s", name) | ||
| } |
There was a problem hiding this comment.
Use require package here and throughout.
| d.log = nil | ||
| } | ||
|
|
||
| func TestMultiCompactionScheduler(t *testing.T) { |
There was a problem hiding this comment.
No comments, and a wall of code in this test makes it hardly reviewable. Could you make it more readable?
Sorry for lack of concrete suggestions, treat it as a challenge and please do your best: tests are code. I made a few below, and one more here: could you write a "spec" comment for the commands, like here.
The readability review agent linked in the review message might help.
There was a problem hiding this comment.
Added a bunch of concrete suggestions, mainly about making the code concise.
| // Started implements pebble.CompactionGrantHandle. | ||
| func (e *engineState) Started() {} | ||
|
|
||
| // MeasureCPU implements pebble.CompactionGrantHandle. | ||
| func (e *engineState) MeasureCPU(pebble.CompactionGoroutineKind) {} | ||
|
|
||
| // CumulativeStats implements pebble.CompactionGrantHandle. | ||
| func (e *engineState) CumulativeStats(stats pebble.CompactionGrantHandleStats) {} |
There was a problem hiding this comment.
Are these to be implemented, or will be no-ops? Comment accordingly:
- to be implemented -> TODO the plan
- no-ops -> comment why
| for i := range candidates { | ||
| c := candidates[i] |
There was a problem hiding this comment.
nit: for _, c := range candidates
| // trySetWaitingToFalse sets waiting.value to false only if tryGetCount hasn't | ||
| // changed since the snapshot, meaning no new TrySchedule arrived. | ||
| func (e *engineState) trySetWaitingToFalse(snapshot waitingState) { |
There was a problem hiding this comment.
Is this prone to ABA race? E.g. someone increments tryGetCount and decrements it back - should we treat it as no change, or a change?
There was a problem hiding this comment.
From later review pass, I realized there is no ABA because tryGetCount is only incremented. See that other comment - consolidating the mini- state machine of waitingState next to its definition should make things clear (2 methods, for each possible transition; plus a comment to solidify the contract).
| // | ||
| // Each engine gets an engineState that implements pebble.CompactionScheduler | ||
| // (passed to pebble.Options.CompactionScheduler) and pebble.CompactionGrantHandle. | ||
| type MultiEngineCompactionScheduler struct { |
There was a problem hiding this comment.
Worth explaining the "design" / "how" of the flow in a paragraph or two. What is the general composition and who does what/when.
| value bool | ||
| tryGetCount int |
There was a problem hiding this comment.
What is the invariant / flow here?
Is it true that value == true iff tryGetCount > 0?
Is tryGetCount ever decremented (seems not)? So is it true that the transitions are:
1. (false, 0) -> (true, 1)
2. (true, n) -> (true, n+1)
3. (true, n) -> (false, 0)
Do we need the separate value variable then, given the first invariant?
Though from the code it seems more like:
1. (false, n) -> (true, n+1)
2. (true, n) -> (false, n)
Either way, needs clarification, and maybe consolidation of this type's methods next to its definition to make things obvious.
| // compareCompactions returns a negative value if a is better (higher | ||
| // priority) than b, for use with slices.SortFunc. | ||
| // Ordering: Optional (false before true), Priority (desc), Score (desc). | ||
| func compareCompactions(a, b pebble.WaitingCompaction) int { |
There was a problem hiding this comment.
Is there no such code elsewhere? Since this is an existing infra, I imagine there should be already code that compares these priorities to honour them.
Logically, this belongs as a method of the WaitingCompaction type. TODO to move to Pebble?
|
|
||
| // nolint:deferunlockcheck | ||
| func (s *MultiEngineCompactionScheduler) periodicGranter(quiesce <-chan struct{}) { | ||
| ticker := time.NewTicker(100 * time.Millisecond) |
There was a problem hiding this comment.
Do we want to always be paying for this ticking? Is it possible that there isn't work to do, and we should wait?
Stepping back: why do we need this backstop mechanism? Would some dirty bit pattern (maybe a slight extension of the "poke") allow making this completely reactive?
Or is this a way to batch up more work before we execute on it?
| s := e.scheduler | ||
| s.mu.Lock() | ||
| defer s.mu.Unlock() | ||
| e.registered = false |
There was a problem hiding this comment.
registered can be false before Register and after Unregister. It makes it harder to reason about the current state of the engine. When we see !registered in other code, we don't know unless we employ other fields/assumptions.
Consider straightening up the state machine, so that the transitions are one-way. E.g.
-> init
-> registered
-> {running|waiting|granting} // interchangeably / some clear rules here
-> unregistered
| // engines. Must be called with s.mu held; returns with s.mu unlocked. | ||
| // | ||
| // nolint:deferunlockcheck | ||
| func (s *MultiEngineCompactionScheduler) tryGrantLockedAndUnlock() { |
There was a problem hiding this comment.
It seems this method is concerned about concurrency only because it can be called from Done(). Otherwise, it looks single-threaded (isGranting bool acts like a critical section).
Should this be avoidable by poking the granter loop from Done? When the previous tryGrant is done, the next one will be invoked immediately.
Or there is merit in making the Done() caller wait and do this work? Plus forcing multiple Done() callers to serialize.
There was a problem hiding this comment.
I realize there might be some micro-optimization going on in Done(), but it is not clearly stated. For example, it's possible that this code attempts to avoid Go scheduling latency in a common uncontended case: jump straight to tryGrant instead of signalling another goroutine to do so. If so, there might be other ways to implement this optimization (if it is needed in the first place).
As a reviewer, and as a general principle, my preference is to start from a minimal implementation that works, and optimize in separate commits. This gives the reviewer the opportunity to trace the author's intent incrementally, and discuss optimizations (which might be premature) in a well-contained context. One big commit that "solves it all" is not review-friendly.
After a bit of reflection on this code, I think the minimal viable implementation is:
- the reactive granter goroutine, calling
tryGrantin a loop - some "dirty bit" way of signalling this goroutine that it needs to wake and do work or terminate (signals come mainly from
TrySchedule/Doneplus a few more "lifecycle" places likeUpdateGetAllowedWithoutPermission/Unregister/Close).
This is a "standard" approach to schedulers. For example, the raft kvserver/scheduler.go does exactly that. Arguably, it handles a higher "load" (CPU-wise) than the compaction scheduler does (tens of thousands of ranges, units of work are more short-lived and frequent, esp things like en-masse ticks; vs a few tens/hundreds of compactions/s). So if it works well, something similar/simple should work well here to start from.
| // as waiting so the granter picks this up. If this tryGrant misses it, | ||
| // the periodic call to tryGrant will pick it up within 100ms. |
There was a problem hiding this comment.
Should we exclude the "misses it" slow path bit by poking the granter? Then it wouldn't need to wait 100ms.
With this done, would we still need the timer in the granter loop?
pav-kv
left a comment
There was a problem hiding this comment.
A round of readability nits in the datadriven test. Started to review the datadriven output.
| } | ||
| collectLogs := func() string { | ||
| var lines []string | ||
| for _, name := range []string{"log", "state"} { |
There was a problem hiding this comment.
nit: iterate the map directly, so that this line doesn't need to "know" about "log" and "state". To make it deterministic, sort the keys:
for _, name := range slices.Sorted(maps.Keys(dbs)) {
db := dbs[name]
lines = append(lines, db.log...)
db.reset()
}| if len(lines) == 0 { | ||
| return "" | ||
| } | ||
| return strings.Join(lines, "\n") |
There was a problem hiding this comment.
nit: drop the if, since strings.Join will anyway return an empty string if lines is empty.
| var result string | ||
| if granted { | ||
| handles[name].h = append(handles[name].h, handle) | ||
| result = "granted" | ||
| } else { | ||
| result = "not granted" | ||
| } |
There was a problem hiding this comment.
nit: squash this
result := "not granted"
if granted {
...
result = "granted"
}| if logs != "" { | ||
| return logs + "\n" | ||
| } |
There was a problem hiding this comment.
nit: this addition of \n is copy-pasted in all callers of collectLogs() -> move it to collectLogs() definition
| return "" | ||
|
|
||
| case "close": | ||
| for _, name := range []string{"log", "state"} { |
There was a problem hiding this comment.
similar nit: make this line agnostic to the knowledge about "log" and "state" - iterate everything that we have in the map
| db.remainingScheduleAccepts = 1 | ||
| if d.HasArg("accept-count") { | ||
| d.ScanArgs(t, "accept-count", &db.remainingScheduleAccepts) | ||
| } |
There was a problem hiding this comment.
db.remainingScheduleAccepts = dd.ScanArgOr(t, d, "accept-count", 1)| stateEngine.Register(1, stateDB) | ||
|
|
||
| for i := 0; i < numGoroutines; i++ { | ||
| wg.Add(1) |
There was a problem hiding this comment.
nit: move this right before the go invocation, visibly close to the corresponding wg.Done()
| var name string | ||
| var idx int | ||
| d.ScanArgs(t, "engine", &name) | ||
| d.ScanArgs(t, "idx", &idx) |
There was a problem hiding this comment.
nit: make the name more descriptive, e.g. handle
| clear-waiting engine=log | ||
| ---- | ||
|
|
||
| compaction-done engine=log idx=0 |
There was a problem hiding this comment.
Worth adding a comment above where idx=0 is created, so that it's easy to understand which wait this corresponds to. Is this the first try-schedule engine=log?
Better: make those commands print the handle index in the output, so that you don't have to manually add comments, e.g.: granted to handle=0.
Same applies to all commands that refer to a handle.
| d.log = nil | ||
| } | ||
|
|
||
| func TestMultiCompactionScheduler(t *testing.T) { |
There was a problem hiding this comment.
Added a bunch of concrete suggestions, mainly about making the code concise.
| for i := range candidates { | ||
| c := candidates[i] | ||
| s.mu.Unlock() | ||
| accepted := c.engine.db.Schedule(c.engine) |
There was a problem hiding this comment.
Suppose e.waiting.tryGetCount was 3, and Schedule returns false. In this case we trySetWaitingToFalse, which then puts this engine in non-waiting state.
The Schedule comment doesn't explain the false case: does it mean the DB changed its mind about all those pending compactions, and has no more?
| // have no waiting compactions. | ||
| type waitingState struct { | ||
| value bool | ||
| tryGetCount int |
There was a problem hiding this comment.
Slight preference to prevent unlimited growth of this var. I think there are only 3 interesting states:
00 - not waiting
01 - waiting
11 - waiting (dirty)
TrySchedule: {00,01,11} -> 11 // new request
tryGrant(1): {01,11} -> 01 // reset dirty bit
tryGrant(2): 01 -> 00 // stop waiting if no new requests
Could be a couple of bools, or int with accessors.
This should work assuming tryGrant(1),(2) can only be interleaved by a new request, which is true by design since isGranting is a critical section.
| wg.Add(2) | ||
| go func() { |
| d.wg.Add(1) | ||
| go func() { | ||
| defer d.wg.Done() |
| if s.mu.closed || !s.hasRegisteredEnginesLocked() { | ||
| return | ||
| } | ||
| // Wait for turn to grant. | ||
| for s.mu.isGranting { | ||
| s.mu.isGrantingCond.Wait() | ||
| if s.mu.closed || !s.hasRegisteredEnginesLocked() { | ||
| return | ||
| } | ||
| } |
There was a problem hiding this comment.
Would it be inconsequential to drop some of this fast-pathing? It optimizes for the shutdown case, and incurs (low but non-zero, under mutex) cost in normal operation. (and distracts the reader - e.g. the closed/isGranted invariant takes some time to extract, esp. that it's not documented)
Something like:
// Wait for our turn.
for s.mu.isGranting {
s.mu.isGrantingCond.Wait()
}
// Invariant: closed prevents isGranting from a false->true transition.
if s.mu.closed {
return
}
s.mu.isGranting = true
// Checking !s.hasRegisteredEnginesLocked() also seems unnecessary,
// because the code below returns fast if all engines are unregistered.| pokePeriodicGranterCh: make(chan struct{}, 1), | ||
| } | ||
| s.mu.isGrantingCond = sync.NewCond(&s.mu.Mutex) | ||
| if !opts.testingDisablePeriodicGranter { |
There was a problem hiding this comment.
This bool is not needed, if we decompose the NewMultiEngineCompactionScheduler method in two (which is also idiomatic):
sch := NewMultiEngineCompactionScheduler(opts)
if err := sch.Start(ctx, stopper); err != nil {
return err
}Then the test code can skip calling Start if it needs to.
| s.mu.isGranting = false | ||
| s.mu.isGrantingCond.Broadcast() |
There was a problem hiding this comment.
Should this be a defer right after isGranting = true? For same reasons that we usually defer mu.Unlock(): less error prone to early returns (which are tempting in the loop above), panic safety.
| wc pebble.WaitingCompaction | ||
| snapshot waitingState | ||
| } | ||
| var candidates []candidate |
There was a problem hiding this comment.
This allocates on each iteration with a candidate. Move above the for? Could also have a local fixed-size array [2]candidate backing this slice, so that it never allocates when there are <= 2 engines.
type candidate struct {...}
var backing [2]candidates // avoid allocations if <= 2 engines
for ... {
candidates := backing[:0]
}e659dea to
7b0fa12
Compare
Add a shared CompactionScheduler that enforces a global compaction concurrency limit across multiple Pebble engines (e.g. log and state engines in a CockroachDB store). The scheduler respects per-engine soft limits from GetAllowedWithoutPermission and deprioritizes log engine compactions via a configurable score multiplier. The implementation follows the same patterns as Pebble's ConcurrencyLimitScheduler: engineState serves as both the per-engine CompactionScheduler and the CompactionGrantHandle, a periodic granter goroutine (managed by stop.Stopper) samples for waiting work, and a waitingState trick avoids unnecessary GetWaitingCompaction calls. Three tests are included: - Datadriven test covering basic scheduling, global/per-engine limits, log deprioritization, tick-triggered grants, Schedule rejection fallback, and UpdateGetAllowedWithoutPermission. - Concurrency stress test verifying no limit violations or deadlocks across both TrySchedule and Schedule (granter) paths. - Integration test with two real Pebble instances sharing a scheduler. Informs: cockroachdb#156778 Release note: None Co-Authored-By: roachdev-claude <roachdev-claude-bot@cockroachlabs.com>
7b0fa12 to
2f6737f
Compare
Add a shared CompactionScheduler that enforces a global compaction concurrency limit across multiple Pebble engines (e.g. log and state engines in a CockroachDB store). The scheduler respects per-engine soft limits from GetAllowedWithoutPermission and deprioritizes log engine compactions via a configurable score multiplier.
The implementation follows the same patterns as Pebble's ConcurrencyLimitScheduler: engineState serves as both the per-engine CompactionScheduler and the CompactionGrantHandle, a periodic granter goroutine (managed by stop.Stopper) samples for waiting work, and a waitingState trick avoids unnecessary GetWaitingCompaction calls.
Three tests are included:
Informs: #156778
Release note: None