Skip to content

Fix parallel CLI execution #112

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Fix parallel CLI execution
When running the CLI in parallel, you can't use `||` on outputs because
that will potentially not evaluate the RHS of the expression. Instead,
we'll use `|` to combine outputs.
  • Loading branch information
kddnewton committed Jul 15, 2022
commit bfb4a8424a3d73a70f56aff1cc147c777593ec96
50 changes: 25 additions & 25 deletions lib/syntax_tree/cli.rb
Original file line number Diff line number Diff line change
Expand Up @@ -315,23 +315,7 @@ def run(argv)

# At the end, we're going to return whether or not this worker ever
# encountered an error.
errored =
with_workers(queue) do |item|
action.run(item)
false
rescue Parser::ParseError => error
warn("Error: #{error.message}")
highlight_error(error, item.source)
true
rescue Check::UnformattedError, Debug::NonIdempotentFormatError
true
rescue StandardError => error
warn(error.message)
warn(error.backtrace)
true
end

if errored
if process_queue(queue, action)
action.failure
1
else
Expand All @@ -342,13 +326,11 @@ def run(argv)

private

def with_workers(queue)
# If the queue is just 1 item, then we're not going to bother going
# through the whole ceremony of parallelizing the work.
return yield queue.shift if queue.size == 1

# Processes each item in the queue with the given action. Returns whether
# or not any errors were encountered.
def process_queue(queue, action)
workers =
Etc.nprocessors.times.map do
[Etc.nprocessors, queue.size].min.times.map do
Thread.new do
# Propagate errors in the worker threads up to the parent thread.
Thread.current.abort_on_exception = true
Expand All @@ -360,15 +342,33 @@ def with_workers(queue)

# While there is still work left to do, shift off the queue and
# process the item.
(errored ||= yield queue.shift) until queue.empty?
until queue.empty?
item = queue.shift
errored |=
begin
action.run(item)
false
rescue Parser::ParseError => error
warn("Error: #{error.message}")
highlight_error(error, item.source)
true
rescue Check::UnformattedError,
Debug::NonIdempotentFormatError
true
rescue StandardError => error
warn(error.message)
warn(error.backtrace)
true
end
end

# At the end, we're going to return whether or not this worker
# ever encountered an error.
errored
end
end

workers.inject(false) { |accum, thread| accum || thread.value }
workers.map(&:value).inject(:|)
end

# Highlights a snippet from a source and parse error.
Expand Down