Skip to content

Commit 7197814

Browse files
authored
Merge 8835b50 into 3cf020a
2 parents 3cf020a + 8835b50 commit 7197814

File tree

4 files changed

+312
-1
lines changed

4 files changed

+312
-1
lines changed

firebase-common/src/main/java/com/google/firebase/concurrent/ExecutorsRegistrar.java

-1
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import java.util.concurrent.ScheduledExecutorService;
3333
import java.util.concurrent.ThreadFactory;
3434

35-
/** @hide */
3635
@SuppressLint("ThreadPoolCreation")
3736
public class ExecutorsRegistrar implements ComponentRegistrar {
3837
private static final Lazy<ScheduledExecutorService> BG_EXECUTOR =
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2022 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package com.google.firebase.concurrent;
16+
17+
import java.util.concurrent.Executor;
18+
19+
/** Provides commonly useful executors. */
20+
public class FirebaseExecutors {
21+
private FirebaseExecutors() {}
22+
23+
/**
24+
* Creates a sequential executor.
25+
*
26+
* <p>Executes tasks sequentially and provides memory synchronization guarantees for any mutations
27+
* of shared state.
28+
*
29+
* <p>For details see:
30+
* https://guava.dev/releases/31.1-jre/api/docs/com/google/common/util/concurrent/MoreExecutors.html#newSequentialExecutor(java.util.concurrent.Executor)
31+
*/
32+
public static Executor newSequentialExecutor(Executor delegate) {
33+
return new SequentialExecutor(delegate);
34+
}
35+
36+
/** Returns a direct executor. */
37+
public static Executor directExecutor() {
38+
return DirectExecutor.INSTANCE;
39+
}
40+
41+
private enum DirectExecutor implements Executor {
42+
INSTANCE;
43+
44+
@Override
45+
public void execute(Runnable command) {
46+
command.run();
47+
}
48+
}
49+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
package com.google.firebase.concurrent;
2+
3+
import static com.google.android.gms.common.internal.Preconditions.checkNotNull;
4+
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.IDLE;
5+
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.QUEUED;
6+
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.QUEUING;
7+
import static com.google.firebase.concurrent.SequentialExecutor.WorkerRunningState.RUNNING;
8+
import static java.lang.System.identityHashCode;
9+
10+
import androidx.annotation.GuardedBy;
11+
import java.util.ArrayDeque;
12+
import java.util.Deque;
13+
import java.util.concurrent.Executor;
14+
import java.util.concurrent.RejectedExecutionException;
15+
import java.util.logging.Level;
16+
import java.util.logging.Logger;
17+
import javax.annotation.CheckForNull;
18+
19+
/**
20+
* Executor ensuring that all Runnables submitted are executed in order, using the provided
21+
* Executor, and sequentially such that no two will ever be running at the same time.
22+
*
23+
* <p>Tasks submitted to {@link #execute(Runnable)} are executed in FIFO order.
24+
*
25+
* <p>The execution of tasks is done by one thread as long as there are tasks left in the queue.
26+
* When a task is {@linkplain Thread#interrupt interrupted}, execution of subsequent tasks
27+
* continues. See {@link QueueWorker#workOnQueue} for details.
28+
*
29+
* <p>{@code RuntimeException}s thrown by tasks are simply logged and the executor keeps trucking.
30+
* If an {@code Error} is thrown, the error will propagate and execution will stop until it is
31+
* restarted by a call to {@link #execute}.
32+
*/
33+
final class SequentialExecutor implements Executor {
34+
private static final Logger log = Logger.getLogger(SequentialExecutor.class.getName());
35+
36+
enum WorkerRunningState {
37+
/** Runnable is not running and not queued for execution */
38+
IDLE,
39+
/** Runnable is not running, but is being queued for execution */
40+
QUEUING,
41+
/** runnable has been submitted but has not yet begun execution */
42+
QUEUED,
43+
RUNNING,
44+
}
45+
46+
/** Underlying executor that all submitted Runnable objects are run on. */
47+
private final Executor executor;
48+
49+
@GuardedBy("queue")
50+
private final Deque<Runnable> queue = new ArrayDeque<>();
51+
52+
/** see {@link WorkerRunningState} */
53+
@GuardedBy("queue")
54+
private WorkerRunningState workerRunningState = IDLE;
55+
56+
/**
57+
* This counter prevents an ABA issue where a thread may successfully schedule the worker, the
58+
* worker runs and exhausts the queue, another thread enqueues a task and fails to schedule the
59+
* worker, and then the first thread's call to delegate.execute() returns. Without this counter,
60+
* it would observe the QUEUING state and set it to QUEUED, and the worker would never be
61+
* scheduled again for future submissions.
62+
*/
63+
@GuardedBy("queue")
64+
private long workerRunCount = 0;
65+
66+
private final QueueWorker worker = new QueueWorker();
67+
68+
SequentialExecutor(Executor executor) {
69+
this.executor = checkNotNull(executor);
70+
}
71+
72+
/**
73+
* Adds a task to the queue and makes sure a worker thread is running.
74+
*
75+
* <p>If this method throws, e.g. a {@code RejectedExecutionException} from the delegate executor,
76+
* execution of tasks will stop until a call to this method is made.
77+
*/
78+
@Override
79+
public void execute(Runnable task) {
80+
checkNotNull(task);
81+
Runnable submittedTask;
82+
long oldRunCount;
83+
synchronized (queue) {
84+
// If the worker is already running (or execute() on the delegate returned successfully, and
85+
// the worker has yet to start) then we don't need to start the worker.
86+
if (workerRunningState == RUNNING || workerRunningState == QUEUED) {
87+
queue.add(task);
88+
return;
89+
}
90+
91+
oldRunCount = workerRunCount;
92+
93+
// If the worker is not yet running, the delegate Executor might reject our attempt to start
94+
// it. To preserve FIFO order and failure atomicity of rejected execution when the same
95+
// Runnable is executed more than once, allocate a wrapper that we know is safe to remove by
96+
// object identity.
97+
// A data structure that returned a removal handle from add() would allow eliminating this
98+
// allocation.
99+
submittedTask =
100+
new Runnable() {
101+
@Override
102+
public void run() {
103+
task.run();
104+
}
105+
106+
@Override
107+
public String toString() {
108+
return task.toString();
109+
}
110+
};
111+
queue.add(submittedTask);
112+
workerRunningState = QUEUING;
113+
}
114+
115+
try {
116+
executor.execute(worker);
117+
} catch (RuntimeException | Error t) {
118+
synchronized (queue) {
119+
boolean removed =
120+
(workerRunningState == IDLE || workerRunningState == QUEUING)
121+
&& queue.removeLastOccurrence(submittedTask);
122+
// If the delegate is directExecutor(), the submitted runnable could have thrown a REE. But
123+
// that's handled by the log check that catches RuntimeExceptions in the queue worker.
124+
if (!(t instanceof RejectedExecutionException) || removed) {
125+
throw t;
126+
}
127+
}
128+
return;
129+
}
130+
131+
/*
132+
* This is an unsynchronized read! After the read, the function returns immediately or acquires
133+
* the lock to check again. Since an IDLE state was observed inside the preceding synchronized
134+
* block, and reference field assignment is atomic, this may save reacquiring the lock when
135+
* another thread or the worker task has cleared the count and set the state.
136+
*
137+
* <p>When {@link #executor} is a directExecutor(), the value written to
138+
* {@code workerRunningState} will be available synchronously, and behaviour will be
139+
* deterministic.
140+
*/
141+
@SuppressWarnings("GuardedBy")
142+
boolean alreadyMarkedQueued = workerRunningState != QUEUING;
143+
if (alreadyMarkedQueued) {
144+
return;
145+
}
146+
synchronized (queue) {
147+
if (workerRunCount == oldRunCount && workerRunningState == QUEUING) {
148+
workerRunningState = QUEUED;
149+
}
150+
}
151+
}
152+
153+
/** Worker that runs tasks from {@link #queue} until it is empty. */
154+
private final class QueueWorker implements Runnable {
155+
@CheckForNull Runnable task;
156+
157+
@Override
158+
public void run() {
159+
try {
160+
workOnQueue();
161+
} catch (Error e) {
162+
synchronized (queue) {
163+
workerRunningState = IDLE;
164+
}
165+
throw e;
166+
// The execution of a task has ended abnormally.
167+
// We could have tasks left in the queue, so should perhaps try to restart a worker,
168+
// but then the Error will get delayed if we are using a direct (same thread) executor.
169+
}
170+
}
171+
172+
/**
173+
* Continues executing tasks from {@link #queue} until it is empty.
174+
*
175+
* <p>The thread's interrupt bit is cleared before execution of each task.
176+
*
177+
* <p>If the Thread in use is interrupted before or during execution of the tasks in {@link
178+
* #queue}, the Executor will complete its tasks, and then restore the interruption. This means
179+
* that once the Thread returns to the Executor that this Executor composes, the interruption
180+
* will still be present. If the composed Executor is an ExecutorService, it can respond to
181+
* shutdown() by returning tasks queued on that Thread after {@link #worker} drains the queue.
182+
*/
183+
private void workOnQueue() {
184+
boolean interruptedDuringTask = false;
185+
boolean hasSetRunning = false;
186+
try {
187+
while (true) {
188+
synchronized (queue) {
189+
// Choose whether this thread will run or not after acquiring the lock on the first
190+
// iteration
191+
if (!hasSetRunning) {
192+
if (workerRunningState == RUNNING) {
193+
// Don't want to have two workers pulling from the queue.
194+
return;
195+
} else {
196+
// Increment the run counter to avoid the ABA problem of a submitter marking the
197+
// thread as QUEUED after it already ran and exhausted the queue before returning
198+
// from execute().
199+
workerRunCount++;
200+
workerRunningState = RUNNING;
201+
hasSetRunning = true;
202+
}
203+
}
204+
task = queue.poll();
205+
if (task == null) {
206+
workerRunningState = IDLE;
207+
return;
208+
}
209+
}
210+
// Remove the interrupt bit before each task. The interrupt is for the "current task" when
211+
// it is sent, so subsequent tasks in the queue should not be caused to be interrupted
212+
// by a previous one in the queue being interrupted.
213+
interruptedDuringTask |= Thread.interrupted();
214+
try {
215+
task.run();
216+
} catch (RuntimeException e) {
217+
log.log(Level.SEVERE, "Exception while executing runnable " + task, e);
218+
} finally {
219+
task = null;
220+
}
221+
}
222+
} finally {
223+
// Ensure that if the thread was interrupted at all while processing the task queue, it
224+
// is returned to the delegate Executor interrupted so that it may handle the
225+
// interruption if it likes.
226+
if (interruptedDuringTask) {
227+
Thread.currentThread().interrupt();
228+
}
229+
}
230+
}
231+
232+
@SuppressWarnings("GuardedBy")
233+
@Override
234+
public String toString() {
235+
Runnable currentlyRunning = task;
236+
if (currentlyRunning != null) {
237+
return "SequentialExecutorWorker{running=" + currentlyRunning + "}";
238+
}
239+
return "SequentialExecutorWorker{state=" + workerRunningState + "}";
240+
}
241+
}
242+
243+
@Override
244+
public String toString() {
245+
return "SequentialExecutor@" + identityHashCode(this) + "{" + executor + "}";
246+
}
247+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright 2018 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/** @hide */
16+
package com.google.firebase.concurrent;

0 commit comments

Comments
 (0)