Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions onedal/_device_offload.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,22 +122,25 @@ def wrapper_impl(*args, **kwargs):
else:
self = None

if len(args) == 0 and len(kwargs) == 0:
# no arguments, there's nothing we can deduce from them -> just call the function
return invoke_func(self, *args, **kwargs)
if "queue" not in kwargs and "queue" in inspect.signature(func).parameters:
if usm_iface := getattr(args[0], "__sycl_usm_array_interface__", None):
kwargs["queue"] = usm_iface["syclobj"]

data = (*args, *kwargs.values())[0]
# get and set the global queue from the kwarg or data
with QM.manage_global_queue(kwargs.get("queue"), *args) as queue:
hostargs, hostkwargs = _get_host_inputs(*args, **kwargs)
if "queue" in inspect.signature(func).parameters:
# set the queue if it's expected by func
hostkwargs["queue"] = queue
result = invoke_func(self, *hostargs, **hostkwargs)
if kwargs.get("queue") is not None:
# Device path — function accepts queue, pass device data directly
result = invoke_func(self, *args, **kwargs)
else:
# Host path — sklearn function or host data, transfer to host
if len(args) == 0 and len(kwargs) == 0:
return invoke_func(self, *args, **kwargs)

if queue and hasattr(data, "__sycl_usm_array_interface__"):
return copy_to_dpnp(queue, result)
with QM.manage_global_queue(None, *args) as queue:
hostargs, hostkwargs = _get_host_inputs(*args, **kwargs)
result = invoke_func(self, *hostargs, **hostkwargs)
if queue and hasattr(args[0], "__sycl_usm_array_interface__"):
return copy_to_dpnp(queue, result)

data = (*args, *kwargs.values())[0]
if get_config().get("transform_output") in ("default", None):
input_array_api = getattr(data, "__array_namespace__", lambda: None)()
if input_array_api and not _is_numpy_namespace(input_array_api):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_generate_statistic_data,
_get_local_tensor,
Expand Down Expand Up @@ -83,8 +84,11 @@ def test_basic_stats_spmd_gold(dataframe, queue):
"dataframe,queue",
get_dataframes_and_queues(dataframe_filter_="dpnp", device_filter_="gpu"),
)
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_basic_stats_spmd_synthetic(n_samples, n_features, dataframe, queue, dtype):
def test_basic_stats_spmd_synthetic(
n_samples, n_features, dataframe, queue, dtype, array_api_dispatch
):
# Import spmd and batch algo
from onedal.basic_statistics import BasicStatistics as BasicStatistics_Batch
from sklearnex.spmd.basic_statistics import BasicStatistics as BasicStatistics_SPMD
Expand All @@ -97,7 +101,9 @@ def test_basic_stats_spmd_synthetic(n_samples, n_features, dataframe, queue, dty
)

# Ensure results of batch algo match spmd
spmd_result = BasicStatistics_SPMD().fit(local_dpt_data)
# Configure array API dispatch status for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
spmd_result = BasicStatistics_SPMD().fit(local_dpt_data)
batch_result = BasicStatistics_Batch().fit(data)

tol = 1e-5 if dtype == np.float32 else 1e-7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_generate_statistic_data,
_get_local_tensor,
Expand Down Expand Up @@ -253,9 +254,17 @@ def test_incremental_basic_statistics_single_option_partial_fit_spmd_gold(
@pytest.mark.parametrize("n_samples", [100, 10000])
@pytest.mark.parametrize("n_features", [10, 100])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_incremental_basic_statistics_partial_fit_spmd_synthetic(
dataframe, queue, num_blocks, weighted, n_samples, n_features, dtype
dataframe,
queue,
num_blocks,
weighted,
n_samples,
n_features,
dtype,
array_api_dispatch,
):
# Import spmd and batch algo
from sklearnex.basic_statistics import IncrementalBasicStatistics
Expand Down Expand Up @@ -295,9 +304,11 @@ def test_incremental_basic_statistics_partial_fit_spmd_synthetic(
dpt_weights = _convert_to_dataframe(
split_weights[i], sycl_queue=queue, target_df=dataframe
)
incbs_spmd.partial_fit(
local_dpt_data, sample_weight=local_dpt_weights if weighted else None
)
# Configure array API dispatch for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
incbs_spmd.partial_fit(
local_dpt_data, sample_weight=local_dpt_weights if weighted else None
)
incbs.partial_fit(dpt_data, sample_weight=dpt_weights if weighted else None)

for option in options_and_tests:
Expand Down
7 changes: 6 additions & 1 deletion sklearnex/spmd/cluster/tests/test_dbscan_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_generate_clustering_data,
_get_local_tensor,
Expand Down Expand Up @@ -69,6 +70,7 @@ def test_dbscan_spmd_gold(dataframe, queue):
get_dataframes_and_queues(dataframe_filter_="dpnp", device_filter_="gpu"),
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_dbscan_spmd_synthetic(
n_samples,
Expand All @@ -78,6 +80,7 @@ def test_dbscan_spmd_synthetic(
dataframe,
queue,
dtype,
array_api_dispatch,
):
n_features, eps = n_features_and_eps
# Import spmd and batch algo
Expand All @@ -93,7 +96,9 @@ def test_dbscan_spmd_synthetic(
)

# Ensure labels from fit of batch algo matches spmd
spmd_model = DBSCAN_SPMD(eps=eps, min_samples=min_samples).fit(local_dpt_data)
# Configure array API dispatch for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
spmd_model = DBSCAN_SPMD(eps=eps, min_samples=min_samples).fit(local_dpt_data)
batch_model = DBSCAN_Batch(eps=eps, min_samples=min_samples).fit(data)

_spmd_assert_allclose(spmd_model.labels_, batch_model.labels_)
Expand Down
25 changes: 18 additions & 7 deletions sklearnex/spmd/cluster/tests/test_kmeans_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_as_numpy,
_assert_kmeans_labels_allclose,
_assert_unordered_allclose,
_generate_clustering_data,
Expand Down Expand Up @@ -108,9 +110,10 @@ def test_kmeans_spmd_gold(dataframe, queue):
get_dataframes_and_queues(dataframe_filter_="dpnp", device_filter_="gpu"),
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_kmeans_spmd_synthetic(
n_samples, n_features, n_clusters, dataframe, queue, dtype
n_samples, n_features, n_clusters, dataframe, queue, dtype, array_api_dispatch
):
# Import spmd and batch algo
from sklearnex.cluster import KMeans as KMeans_Batch
Expand All @@ -129,9 +132,11 @@ def test_kmeans_spmd_synthetic(
)

# Validate KMeans init
spmd_model_init = KMeans_SPMD(n_clusters=n_clusters, max_iter=1, random_state=0).fit(
local_dpt_X_train
)
# Configure array_api_dispatch for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
spmd_model_init = KMeans_SPMD(
n_clusters=n_clusters, max_iter=1, random_state=0
).fit(local_dpt_X_train)
batch_model_init = KMeans_Batch(
n_clusters=n_clusters, max_iter=1, random_state=0
).fit(X_train)
Expand All @@ -142,9 +147,13 @@ def test_kmeans_spmd_synthetic(
spmd_model = KMeans_SPMD(
n_clusters=n_clusters, init=spmd_model_init.cluster_centers_, random_state=0
)
spmd_model.fit(local_dpt_X_train)
# Configure array_api_dispatch for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
spmd_model.fit(local_dpt_X_train)
batch_model = KMeans_Batch(
n_clusters=n_clusters, init=spmd_model_init.cluster_centers_, random_state=0
n_clusters=n_clusters,
init=_as_numpy(spmd_model_init.cluster_centers_),
random_state=0,
).fit(X_train)

atol = 1e-5 if dtype == np.float32 else 1e-7
Expand All @@ -162,7 +171,9 @@ def test_kmeans_spmd_synthetic(
# assert_allclose(spmd_model.n_iter_, batch_model.n_iter_, atol=1)

# Ensure predictions of batch algo match spmd
spmd_result = spmd_model.predict(local_dpt_X_test)
# Configure array_api_dispatch for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
spmd_result = spmd_model.predict(local_dpt_X_test)
batch_result = batch_model.predict(X_test)

_assert_kmeans_labels_allclose(
Expand Down
11 changes: 7 additions & 4 deletions sklearnex/spmd/covariance/tests/test_covariance_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_generate_statistic_data,
_get_local_tensor,
Expand Down Expand Up @@ -85,9 +86,10 @@ def test_covariance_spmd_gold(dataframe, queue):
get_dataframes_and_queues(dataframe_filter_="dpnp", device_filter_="gpu"),
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_covariance_spmd_synthetic(
n_samples, n_features, assume_centered, dataframe, queue, dtype
n_samples, n_features, assume_centered, dataframe, queue, dtype, array_api_dispatch
):
# Import spmd and batch algo
from sklearnex.preview.covariance import (
Expand All @@ -103,9 +105,10 @@ def test_covariance_spmd_synthetic(
)

# Ensure results of batch algo match spmd
spmd_result = EmpiricalCovariance_SPMD(assume_centered=assume_centered).fit(
local_dpt_data
)
with config_context(array_api_dispatch=array_api_dispatch):
spmd_result = EmpiricalCovariance_SPMD(assume_centered=assume_centered).fit(
local_dpt_data
)
batch_result = EmpiricalCovariance_Batch(assume_centered=assume_centered).fit(data)

atol = 1e-5 if dtype == np.float32 else 1e-7
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_generate_statistic_data,
_get_local_tensor,
Expand Down Expand Up @@ -150,6 +151,7 @@ def test_incremental_covariance_partial_fit_spmd_gold(
"dataframe,queue",
get_dataframes_and_queues(dataframe_filter_="dpnp", device_filter_="gpu"),
)
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_incremental_covariance_partial_fit_spmd_synthetic(
n_samples,
Expand All @@ -159,6 +161,7 @@ def test_incremental_covariance_partial_fit_spmd_synthetic(
dataframe,
queue,
dtype,
array_api_dispatch,
):
# Import spmd and batch algo
from sklearnex.covariance import IncrementalEmpiricalCovariance
Expand All @@ -181,7 +184,9 @@ def test_incremental_covariance_partial_fit_spmd_synthetic(
local_dpt_data = _convert_to_dataframe(
split_local_data[i], sycl_queue=queue, target_df=dataframe
)
inccov_spmd.partial_fit(local_dpt_data)
# Configure array API dispatch status for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
inccov_spmd.partial_fit(local_dpt_data)

inccov.fit(dpt_data)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_generate_statistic_data,
_get_local_tensor,
Expand Down Expand Up @@ -218,6 +219,7 @@ def test_incremental_pca_fit_spmd_random(
@pytest.mark.parametrize("num_samples", [200, 400])
@pytest.mark.parametrize("num_features", [10, 20])
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_incremental_pca_partial_fit_spmd_random(
dataframe,
Expand All @@ -228,6 +230,7 @@ def test_incremental_pca_partial_fit_spmd_random(
num_samples,
num_features,
dtype,
array_api_dispatch,
):
# Import spmd and non-SPMD algo
from sklearnex.preview.decomposition import IncrementalPCA
Expand All @@ -252,7 +255,9 @@ def test_incremental_pca_partial_fit_spmd_random(
split_local_X[i], sycl_queue=queue, target_df=dataframe
)
dpt_X = _convert_to_dataframe(X_split[i], sycl_queue=queue, target_df=dataframe)
incpca_spmd.partial_fit(local_dpt_X)
# Configure array API dispatch status for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
incpca_spmd.partial_fit(local_dpt_X)
incpca.partial_fit(dpt_X)

for attribute in attributes_to_compare:
Expand All @@ -263,7 +268,9 @@ def test_incremental_pca_partial_fit_spmd_random(
err_msg=f"{attribute} is incorrect",
)

y_trans_spmd = incpca_spmd.transform(dpt_X_test)
# Configure array API dispatch status for spmd estimator
with config_context(array_api_dispatch=array_api_dispatch):
y_trans_spmd = incpca_spmd.transform(dpt_X_test)
y_trans = incpca.transform(dpt_X_test)

assert_allclose(_as_numpy(y_trans_spmd), _as_numpy(y_trans), atol=tol)
16 changes: 14 additions & 2 deletions sklearnex/spmd/decomposition/tests/test_pca_spmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
_convert_to_dataframe,
get_dataframes_and_queues,
)
from sklearnex import config_context
from sklearnex.tests.utils.spmd import (
_generate_statistic_data,
_get_local_tensor,
Expand Down Expand Up @@ -92,9 +93,17 @@ def test_pca_spmd_gold(dataframe, queue):
get_dataframes_and_queues(dataframe_filter_="dpnp", device_filter_="gpu"),
)
@pytest.mark.parametrize("dtype", [np.float32, np.float64])
@pytest.mark.parametrize("array_api_dispatch", [True, False])
@pytest.mark.mpi
def test_pca_spmd_synthetic(
n_samples, n_features, n_components, whiten, dataframe, queue, dtype
n_samples,
n_features,
n_components,
whiten,
dataframe,
queue,
dtype,
array_api_dispatch,
):
# TODO: Resolve issues with batch fallback and lack of support for n_rows_rank < n_cols
if n_components == "mle" or n_components == 3:
Expand All @@ -114,7 +123,10 @@ def test_pca_spmd_synthetic(
)

# Ensure results of batch algo match spmd
spmd_result = PCA_SPMD(n_components=n_components, whiten=whiten).fit(local_dpt_data)
with config_context(array_api_dispatch=array_api_dispatch):
spmd_result = PCA_SPMD(n_components=n_components, whiten=whiten).fit(
local_dpt_data
)
batch_result = PCA_Batch(n_components=n_components, whiten=whiten).fit(data)

tol = 1e-3 if dtype == np.float32 else 1e-7
Expand Down
Loading
Loading