Skip to content

Commit 74cb1ff

Browse files
committed
[SPARK-22340][PYTHON][FOLLOW-UP] Add a better message and improve documentation for pinned thread mode
### What changes were proposed in this pull request? This PR proposes to show different warning message when the pinned thread mode is enabled: When enabled: > PYSPARK_PIN_THREAD feature is enabled. However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. > To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread. When disabled: > Currently, 'setLocalProperty' (set to local properties) with multiple threads does not properly work. > Internally threads on PVM and JVM are not synced, and JVM thread can be reused for multiple threads on PVM, which fails to isolate local properties for each thread on PVM. > To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. > To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread. ### Why are the changes needed? Currently, it shows the same warning message regardless of PYSPARK_PIN_THREAD being set. In the warning message it says "you can set PYSPARK_PIN_THREAD to true ..." which is confusing. ### Does this PR introduce any user-facing change? Documentation and warning message as shown above. ### How was this patch tested? Manually tested. ```bash $ PYSPARK_PIN_THREAD=true ./bin/pyspark ``` ```python sc.setJobGroup("a", "b") ``` ``` .../pyspark/util.py:141: UserWarning: PYSPARK_PIN_THREAD feature is enabled. However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread. warnings.warn(msg, UserWarning) ``` ```bash $ ./bin/pyspark ``` ```python sc.setJobGroup("a", "b") ``` ``` .../pyspark/util.py:141: UserWarning: Currently, 'setJobGroup' (set to local properties) with multiple threads does not properly work. Internally threads on PVM and JVM are not synced, and JVM thread can be reused for multiple threads on PVM, which fails to isolate local properties for each thread on PVM. To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). However, note that it cannot inherit the local properties from the parent thread although it isolates each thread on PVM and JVM with its own local properties. To work around this, you should manually copy and set the local properties from the parent thread to the child thread when you create another thread. warnings.warn(msg, UserWarning) ``` Closes apache#26588 from HyukjinKwon/SPARK-22340. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: HyukjinKwon <[email protected]>
1 parent 7a70670 commit 74cb1ff

File tree

2 files changed

+59
-58
lines changed

2 files changed

+59
-58
lines changed

python/pyspark/context.py

Lines changed: 30 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
from pyspark.traceback_utils import CallSite, first_spark_call
4141
from pyspark.status import StatusTracker
4242
from pyspark.profiler import ProfilerCollector, BasicProfiler
43+
from pyspark.util import _warn_pin_thread
4344

4445
if sys.version > '3':
4546
xrange = range
@@ -1008,60 +1009,41 @@ def setJobGroup(self, groupId, description, interruptOnCancel=False):
10081009
ensure that the tasks are actually stopped in a timely manner, but is off by default due
10091010
to HDFS-1208, where HDFS may respond to Thread.interrupt() by marking nodes as dead.
10101011
1011-
.. note:: Currently, setting a group ID (set to local properties) with a thread does
1012-
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
1013-
can be reused for multiple threads on PVM, which fails to isolate local properties
1014-
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
1012+
.. note:: Currently, setting a group ID (set to local properties) with multiple threads
1013+
does not properly work. Internally threads on PVM and JVM are not synced, and JVM
1014+
thread can be reused for multiple threads on PVM, which fails to isolate local
1015+
properties for each thread on PVM.
1016+
1017+
To work around this, you can set `PYSPARK_PIN_THREAD` to
10151018
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
10161019
from the parent thread although it isolates each thread on PVM and JVM with its own
1017-
local properties. To work around this, you should manually copy and set the local
1020+
local properties.
1021+
1022+
To work around this, you should manually copy and set the local
10181023
properties from the parent thread to the child thread when you create another thread.
10191024
"""
1020-
warnings.warn(
1021-
"Currently, setting a group ID (set to local properties) with a thread does "
1022-
"not properly work. "
1023-
"\n"
1024-
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
1025-
"for multiple threads on PVM, which fails to isolate local properties for each "
1026-
"thread on PVM. "
1027-
"\n"
1028-
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
1029-
"However, note that it cannot inherit the local properties from the parent thread "
1030-
"although it isolates each thread on PVM and JVM with its own local properties. "
1031-
"\n"
1032-
"To work around this, you should manually copy and set the local properties from "
1033-
"the parent thread to the child thread when you create another thread.",
1034-
UserWarning)
1025+
_warn_pin_thread("setJobGroup")
10351026
self._jsc.setJobGroup(groupId, description, interruptOnCancel)
10361027

10371028
def setLocalProperty(self, key, value):
10381029
"""
10391030
Set a local property that affects jobs submitted from this thread, such as the
10401031
Spark fair scheduler pool.
10411032
1042-
.. note:: Currently, setting a local property with a thread does
1043-
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
1033+
.. note:: Currently, setting a local property with multiple threads does not properly work.
1034+
Internally threads on PVM and JVM are not synced, and JVM thread
10441035
can be reused for multiple threads on PVM, which fails to isolate local properties
1045-
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
1036+
for each thread on PVM.
1037+
1038+
To work around this, you can set `PYSPARK_PIN_THREAD` to
10461039
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
10471040
from the parent thread although it isolates each thread on PVM and JVM with its own
1048-
local properties. To work around this, you should manually copy and set the local
1041+
local properties.
1042+
1043+
To work around this, you should manually copy and set the local
10491044
properties from the parent thread to the child thread when you create another thread.
10501045
"""
1051-
warnings.warn(
1052-
"Currently, setting a local property with a thread does not properly work. "
1053-
"\n"
1054-
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
1055-
"for multiple threads on PVM, which fails to isolate local properties for each "
1056-
"thread on PVM. "
1057-
"\n"
1058-
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
1059-
"However, note that it cannot inherit the local properties from the parent thread "
1060-
"although it isolates each thread on PVM and JVM with its own local properties. "
1061-
"\n"
1062-
"To work around this, you should manually copy and set the local properties from "
1063-
"the parent thread to the child thread when you create another thread.",
1064-
UserWarning)
1046+
_warn_pin_thread("setLocalProperty")
10651047
self._jsc.setLocalProperty(key, value)
10661048

10671049
def getLocalProperty(self, key):
@@ -1075,30 +1057,20 @@ def setJobDescription(self, value):
10751057
"""
10761058
Set a human readable description of the current job.
10771059
1078-
.. note:: Currently, setting a job description (set to local properties) with a thread does
1079-
not properly work. Internally threads on PVM and JVM are not synced, and JVM thread
1080-
can be reused for multiple threads on PVM, which fails to isolate local properties
1081-
for each thread on PVM. To work around this, you can set `PYSPARK_PIN_THREAD` to
1060+
.. note:: Currently, setting a job description (set to local properties) with multiple
1061+
threads does not properly work. Internally threads on PVM and JVM are not synced,
1062+
and JVM thread can be reused for multiple threads on PVM, which fails to isolate
1063+
local properties for each thread on PVM.
1064+
1065+
To work around this, you can set `PYSPARK_PIN_THREAD` to
10821066
`'true'` (see SPARK-22340). However, note that it cannot inherit the local properties
10831067
from the parent thread although it isolates each thread on PVM and JVM with its own
1084-
local properties. To work around this, you should manually copy and set the local
1068+
local properties.
1069+
1070+
To work around this, you should manually copy and set the local
10851071
properties from the parent thread to the child thread when you create another thread.
10861072
"""
1087-
warnings.warn(
1088-
"Currently, setting a job description (set to local properties) with a thread does "
1089-
"not properly work. "
1090-
"\n"
1091-
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
1092-
"for multiple threads on PVM, which fails to isolate local properties for each "
1093-
"thread on PVM. "
1094-
"\n"
1095-
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
1096-
"However, note that it cannot inherit the local properties from the parent thread "
1097-
"although it isolates each thread on PVM and JVM with its own local properties. "
1098-
"\n"
1099-
"To work around this, you should manually copy and set the local properties from "
1100-
"the parent thread to the child thread when you create another thread.",
1101-
UserWarning)
1073+
_warn_pin_thread("setJobDescription")
11021074
self._jsc.setJobDescription(value)
11031075

11041076
def sparkUser(self):

python/pyspark/util.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919
import re
2020
import sys
2121
import traceback
22+
import os
23+
import warnings
2224
import inspect
2325
from py4j.protocol import Py4JJavaError
2426

@@ -112,6 +114,33 @@ def wrapper(*args, **kwargs):
112114
return wrapper
113115

114116

117+
def _warn_pin_thread(name):
118+
if os.environ.get("PYSPARK_PIN_THREAD", "false").lower() == "true":
119+
msg = (
120+
"PYSPARK_PIN_THREAD feature is enabled. "
121+
"However, note that it cannot inherit the local properties from the parent thread "
122+
"although it isolates each thread on PVM and JVM with its own local properties. "
123+
"\n"
124+
"To work around this, you should manually copy and set the local properties from "
125+
"the parent thread to the child thread when you create another thread.")
126+
else:
127+
msg = (
128+
"Currently, '%s' (set to local properties) with multiple threads does "
129+
"not properly work. "
130+
"\n"
131+
"Internally threads on PVM and JVM are not synced, and JVM thread can be reused "
132+
"for multiple threads on PVM, which fails to isolate local properties for each "
133+
"thread on PVM. "
134+
"\n"
135+
"To work around this, you can set PYSPARK_PIN_THREAD to true (see SPARK-22340). "
136+
"However, note that it cannot inherit the local properties from the parent thread "
137+
"although it isolates each thread on PVM and JVM with its own local properties. "
138+
"\n"
139+
"To work around this, you should manually copy and set the local properties from "
140+
"the parent thread to the child thread when you create another thread." % name)
141+
warnings.warn(msg, UserWarning)
142+
143+
115144
def _print_missing_jar(lib_name, pkg_name, jar_name, spark_version):
116145
print("""
117146
________________________________________________________________________________________________

0 commit comments

Comments
 (0)