-
Notifications
You must be signed in to change notification settings - Fork 48
feat: add streaming.StreamingDataFrame class #864
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
Not sure why b/354024943 is caused. Tests are successful locally. Need to keep an eye on it. |
bigframes/session/__init__.py
Outdated
|
||
>>> sdf = bpd.read_gbq_table_streaming("bigquery-public-data.ml_datasets.penguins") | ||
""" | ||
from bigframes import streaming |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make sure we raise a PreviewWarning and mark this as a preview method in the docstring.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
bigframes/session/__init__.py
Outdated
df = self._read_gbq_table( | ||
table, api_name="read_gbq_table_steaming", enable_snapshot=False | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this will probably setup a default index? Should probably explicitly set a null index to ensure don't end up putting a window function into the final sql. Also, do we need to validate that the table is streaming-compatible? I'm guessing some table types don't work
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done for NULL index. Easier SQL generated. Thanks.
For table types, it should be fine to let BQ emit errors. Do you have an idea how we can validate those? I don't see a clear way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we fetch metadata for each table when we read into bigframes. My guess is only "real" bq tables work with streaming, and probably not views, external tables, etc.
bigframes/streaming/__init__.py
Outdated
def __getitem__(self, *args, **kwargs): | ||
return _return_type_wrapper(self._df.__getitem__, StreamingDataFrame)( | ||
*args, **kwargs | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Som parameterizations here are goin to return just a series, causing issues later on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The cure should be adding StreamingSeries, logged in b/356201125.
bigframes/streaming/__init__.py
Outdated
@property | ||
def sql(self): | ||
return self._df.sql |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
df.sql automatically applies cached executions, and isn't intended for streaming queries. We should have another path for streaming sql that guarantees streaming-compatibility.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added a param to disable cache in the paths. We can refactor if need a totally separate paths.
bigframes/streaming/__init__.py
Outdated
def __repr__(self, *args, **kwargs): | ||
return _return_type_wrapper(self._df.__repr__, StreamingDataFrame)( | ||
*args, **kwargs | ||
) | ||
|
||
__repr__.__doc__ = _curate_df_doc(inspect.getdoc(dataframe.DataFrame.__repr__)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a streaming frame, but we want to print it out just like a static frame? Could be misleading
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that's what _curate_df_doc() does.
bigframes/streaming/__init__.py
Outdated
def __setitem__(self, *args, **kwargs): | ||
return _return_type_wrapper(self._df.__setitem__, StreamingDataFrame)( | ||
*args, **kwargs | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could cause joins, which are not allowed. Should catch before query-time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, if the input is another Series, and if it is from another table, it's a join. And it will return an error for no index, I think? So it won't be sent to query?
Maybe it is a place that we need different definition and implementations with normal DF. Logged in b/356201125.
bigframes/streaming/__init__.py
Outdated
# Private constructor | ||
_create_key = object() | ||
|
||
def __init__(self, df: dataframe.DataFrame, *, create_key=0): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would really not wrap the main dataframe.DataFrame object. This reduces flexibility to modify this object - as it could break streaming behavior by accident.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is the best way at least for current moment. That we want to get some SDF APIs working. If any code is sharable I would like to try to share instead of duplicate. It would be better to find something breaking and fix at first place than let the implementations diverge and hard to find where they are different and cause separate issues.
Now with disabling snapshot and caching, we can see some operations can just work. I think it is decent for it now. Later on, Tardis will increase SQL coverage. And we will want to add more APIs. We may have a better idea of how diverge they need to be.
bigframes/pandas/__init__.py
Outdated
@@ -598,6 +599,18 @@ def read_gbq_table( | |||
read_gbq_table.__doc__ = inspect.getdoc(bigframes.session.Session.read_gbq_table) | |||
|
|||
|
|||
def read_gbq_table_streaming(table: str) -> bigframes.streaming.StreamingDataFrame: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this to bigframes.streaming.read_gbq_table
?
I think the Session method is OK for now, though I wonder if we would want a separate Session for streaming contexts? Session has a lot of configuration and implementation specific to pandas emulation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
StreamingDataFrame implemented basic create, projection, filter and repr operations. Delegate the operations to a member DataFrame with disabled cache and snapshot.