Skip to content

feat: add ml.preprocessing.KBinsDiscretizer #81

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 9 commits into from
Oct 4, 2023
13 changes: 10 additions & 3 deletions bigframes/ml/compose.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
preprocessing.StandardScaler,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.KBinsDiscretizer,
preprocessing.LabelEncoder,
]

Expand Down Expand Up @@ -91,18 +92,24 @@ def transformers_(

return result

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
def _compile_to_sql(
self,
columns: List[str],
X: bpd.DataFrame,
) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns (List[str]):
a list of column names to transform
X (bpd.DataFrame):
The Dataframe with training data.

Returns:
a list of tuples of (sql_expression, output_name)"""
return [
transformer._compile_to_sql([column])[0]
transformer._compile_to_sql([column], X=X)[0]
for column in columns
for _, transformer, target_column in self.transformers_
if column == target_column
Expand All @@ -115,7 +122,7 @@ def fit(
) -> ColumnTransformer:
(X,) = utils.convert_to_dataframe(X)

compiled_transforms = self._compile_to_sql(X.columns.tolist())
compiled_transforms = self._compile_to_sql(X.columns.tolist(), X)
transform_sqls = [transform_sql for transform_sql, _ in compiled_transforms]

self._bqml_model = self._bqml_model_factory.create_model(
Expand Down
12 changes: 11 additions & 1 deletion bigframes/ml/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(self, steps: List[Tuple[str, base.BaseEstimator]]):
preprocessing.OneHotEncoder,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.KBinsDiscretizer,
preprocessing.LabelEncoder,
),
):
Expand Down Expand Up @@ -93,7 +94,7 @@ def fit(
) -> Pipeline:
(X,) = utils.convert_to_dataframe(X)

compiled_transforms = self._transform._compile_to_sql(X.columns.tolist())
compiled_transforms = self._transform._compile_to_sql(X.columns.tolist(), X=X)
transform_sqls = [transform_sql for transform_sql, _ in compiled_transforms]

if y is not None:
Expand Down Expand Up @@ -151,6 +152,7 @@ def _extract_as_column_transformer(
preprocessing.StandardScaler,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.KBinsDiscretizer,
preprocessing.LabelEncoder,
],
Union[str, List[str]],
Expand Down Expand Up @@ -190,6 +192,13 @@ def _extract_as_column_transformer(
*preprocessing.MinMaxScaler._parse_from_sql(transform_sql),
)
)
elif transform_sql.startswith("ML.BUCKETIZE"):
transformers.append(
(
"k_bins_discretizer",
*preprocessing.KBinsDiscretizer._parse_from_sql(transform_sql),
)
)
elif transform_sql.startswith("ML.LABEL_ENCODER"):
transformers.append(
(
Expand All @@ -213,6 +222,7 @@ def _merge_column_transformer(
preprocessing.OneHotEncoder,
preprocessing.MaxAbsScaler,
preprocessing.MinMaxScaler,
preprocessing.KBinsDiscretizer,
preprocessing.LabelEncoder,
]:
"""Try to merge the column transformer to a simple transformer."""
Expand Down
152 changes: 142 additions & 10 deletions bigframes/ml/preprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from bigframes.ml import base, core, globals, utils
import bigframes.pandas as bpd
import third_party.bigframes_vendored.sklearn.preprocessing._data
import third_party.bigframes_vendored.sklearn.preprocessing._discretization
import third_party.bigframes_vendored.sklearn.preprocessing._encoder
import third_party.bigframes_vendored.sklearn.preprocessing._label

Expand All @@ -44,12 +45,15 @@ def __init__(self):
def __eq__(self, other: Any) -> bool:
return type(other) is StandardScaler and self._bqml_model == other._bqml_model

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns: a list of column names to transform
columns:
a list of column names to transform.
X (default None):
Ignored.

Returns: a list of tuples of (sql_expression, output_name)"""
return [
Expand Down Expand Up @@ -124,12 +128,15 @@ def __init__(self):
def __eq__(self, other: Any) -> bool:
return type(other) is MaxAbsScaler and self._bqml_model == other._bqml_model

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns: a list of column names to transform
columns:
a list of column names to transform.
X (default None):
Ignored.

Returns: a list of tuples of (sql_expression, output_name)"""
return [
Expand Down Expand Up @@ -204,12 +211,15 @@ def __init__(self):
def __eq__(self, other: Any) -> bool:
return type(other) is MinMaxScaler and self._bqml_model == other._bqml_model

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns: a list of column names to transform
columns:
a list of column names to transform.
X (default None):
Ignored.

Returns: a list of tuples of (sql_expression, output_name)"""
return [
Expand Down Expand Up @@ -267,6 +277,124 @@ def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
)


class KBinsDiscretizer(
base.Transformer,
third_party.bigframes_vendored.sklearn.preprocessing._discretization.KBinsDiscretizer,
):
__doc__ = (
third_party.bigframes_vendored.sklearn.preprocessing._discretization.KBinsDiscretizer.__doc__
)

def __init__(
self,
n_bins: int = 5,
strategy: Literal["uniform", "quantile"] = "quantile",
):
if strategy != "uniform":
raise NotImplementedError(
f"Only strategy = 'uniform' is supported now, input is {strategy}."
)
if n_bins < 2:
raise ValueError(
f"n_bins has to be larger than or equal to 2, input is {n_bins}."
)
self.n_bins = n_bins
self.strategy = strategy
self._bqml_model: Optional[core.BqmlModel] = None
self._bqml_model_factory = globals.bqml_model_factory()
self._base_sql_generator = globals.base_sql_generator()

# TODO(garrettwu): implement __hash__
def __eq__(self, other: Any) -> bool:
return (
type(other) is KBinsDiscretizer
and self.n_bins == other.n_bins
and self._bqml_model == other._bqml_model
)

def _compile_to_sql(
self,
columns: List[str],
X: bpd.DataFrame,
) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns:
a list of column names to transform
X:
The Dataframe with training data.

Returns: a list of tuples of (sql_expression, output_name)"""
array_split_points = {}
if self.strategy == "uniform":
for column in columns:
min_value = X[column].min()
max_value = X[column].max()
bin_size = (max_value - min_value) / self.n_bins
array_split_points[column] = [
min_value + i * bin_size for i in range(self.n_bins - 1)
]

return [
(
self._base_sql_generator.ml_bucketize(
column, array_split_points[column], f"kbinsdiscretizer_{column}"
),
f"kbinsdiscretizer_{column}",
)
for column in columns
]

@classmethod
def _parse_from_sql(cls, sql: str) -> tuple[KBinsDiscretizer, str]:
"""Parse SQL to tuple(KBinsDiscretizer, column_label).

Args:
sql: SQL string of format "ML.BUCKETIZE({col_label}, array_split_points, FALSE) OVER()"

Returns:
tuple(KBinsDiscretizer, column_label)"""
s = sql[sql.find("(") + 1 : sql.find(")")]
array_split_points = s[s.find("[") + 1 : s.find("]")]
col_label = s[: s.find(",")]
n_bins = array_split_points.count(",") + 2
return cls(n_bins, "uniform"), col_label

def fit(
self,
X: Union[bpd.DataFrame, bpd.Series],
y=None, # ignored
) -> KBinsDiscretizer:
(X,) = utils.convert_to_dataframe(X)

compiled_transforms = self._compile_to_sql(X.columns.tolist(), X)
transform_sqls = [transform_sql for transform_sql, _ in compiled_transforms]

self._bqml_model = self._bqml_model_factory.create_model(
X,
options={"model_type": "transform_only"},
transforms=transform_sqls,
)

# The schema of TRANSFORM output is not available in the model API, so save it during fitting
self._output_names = [name for _, name in compiled_transforms]
return self

def transform(self, X: Union[bpd.DataFrame, bpd.Series]) -> bpd.DataFrame:
if not self._bqml_model:
raise RuntimeError("Must be fitted before transform")

(X,) = utils.convert_to_dataframe(X)

df = self._bqml_model.transform(X)
return typing.cast(
bpd.DataFrame,
df[self._output_names],
)


class OneHotEncoder(
base.Transformer,
third_party.bigframes_vendored.sklearn.preprocessing._encoder.OneHotEncoder,
Expand Down Expand Up @@ -308,13 +436,15 @@ def __eq__(self, other: Any) -> bool:
and self.max_categories == other.max_categories
)

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns:
a list of column names to transform
a list of column names to transform.
X (default None):
Ignored.

Returns: a list of tuples of (sql_expression, output_name)"""

Expand Down Expand Up @@ -432,13 +562,15 @@ def __eq__(self, other: Any) -> bool:
and self.max_categories == other.max_categories
)

def _compile_to_sql(self, columns: List[str]) -> List[Tuple[str, str]]:
def _compile_to_sql(self, columns: List[str], X=None) -> List[Tuple[str, str]]:
"""Compile this transformer to a list of SQL expressions that can be included in
a BQML TRANSFORM clause

Args:
columns:
a list of column names to transform
a list of column names to transform.
X (default None):
Ignored.

Returns: a list of tuples of (sql_expression, output_name)"""

Expand Down
9 changes: 9 additions & 0 deletions bigframes/ml/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,15 @@ def ml_min_max_scaler(self, numeric_expr_sql: str, name: str) -> str:
"""Encode ML.MIN_MAX_SCALER for BQML"""
return f"""ML.MIN_MAX_SCALER({numeric_expr_sql}) OVER() AS {name}"""

def ml_bucketize(
self,
numeric_expr_sql: str,
array_split_points: Iterable[Union[int, float]],
name: str,
) -> str:
"""Encode ML.MIN_MAX_SCALER for BQML"""
return f"""ML.BUCKETIZE({numeric_expr_sql}, {array_split_points}, FALSE) AS {name}"""

def ml_one_hot_encoder(
self,
numeric_expr_sql: str,
Expand Down
Loading