From 7f6481cd69238f9609b363fdad21466e9cc8d686 Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 17 Dec 2025 18:49:44 +0000 Subject: [PATCH 1/4] feat: Use write api for automatic data uploads --- bigframes/session/bq_caching_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index 736dbf7be1..dea6271bd9 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -594,7 +594,7 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable): # Might be better as a queue and a worker thread with self._upload_lock: if local_table not in self.cache._uploaded_local_data: - uploaded = self.loader.load_data( + uploaded = self.loader.write_data( local_table, bigframes.core.guid.generate_guid() ) self.cache.cache_remote_replacement(local_table, uploaded) From 40323d861dbfb5b6787a0abc14cdd9101505eb1f Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Fri, 19 Dec 2025 20:38:38 +0000 Subject: [PATCH 2/4] use write api as default for all uploads --- bigframes/session/__init__.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 3cb9d2bb68..c8ec877f2c 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -976,8 +976,7 @@ def read_pandas( quota and your data cannot be embedded in SQL due to size or data type limitations. * "bigquery_write": - [Preview] Use the BigQuery Storage Write API. This feature - is in public preview. + Use the BigQuery Storage Write API. Returns: An equivalent bigframes.pandas.(DataFrame/Series/Index) object @@ -1026,7 +1025,7 @@ def _read_pandas( mem_usage = pandas_dataframe.memory_usage(deep=True).sum() if write_engine == "default": write_engine = ( - "bigquery_load" + "bigquery_write" if mem_usage > bigframes.constants.MAX_INLINE_BYTES else "bigquery_inline" ) From 599c404ad8ae82c369970e63139dad9fec02cf3b Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Mon, 12 Jan 2026 23:38:41 +0000 Subject: [PATCH 3/4] add config option for default write engine --- bigframes/_config/compute_options.py | 13 ++++++++++++- bigframes/session/bq_caching_executor.py | 13 ++++++++++--- tests/system/small/test_large_local_data.py | 20 ++++++++++++++++++++ 3 files changed, 42 insertions(+), 4 deletions(-) diff --git a/bigframes/_config/compute_options.py b/bigframes/_config/compute_options.py index 7810ee897f..b7a49ede8b 100644 --- a/bigframes/_config/compute_options.py +++ b/bigframes/_config/compute_options.py @@ -15,7 +15,7 @@ """Options for displaying objects.""" import dataclasses -from typing import Any, Dict, Optional +from typing import Any, Dict, Literal, Optional @dataclasses.dataclass @@ -140,6 +140,17 @@ class ComputeOptions: int | None: Number of rows, if set. """ + default_write_engine: Literal["bigquery_load", "bigquery_write"] = "bigquery_write" + """ + Sets the default write engine for uploadin local data to bigquery. + + The two options are "bigquery_load" or "bigquery_write". "bigquery_write" is generally + preferred as it is faster, but "bigquery_load" may be used if bigquery write api is unavailable. + + Returns: + str: "bigquery_load" or "bigquery_write" + """ + semantic_ops_confirmation_threshold: Optional[int] = 0 """ Deprecated. diff --git a/bigframes/session/bq_caching_executor.py b/bigframes/session/bq_caching_executor.py index d031580162..151f99840d 100644 --- a/bigframes/session/bq_caching_executor.py +++ b/bigframes/session/bq_caching_executor.py @@ -598,9 +598,16 @@ def _upload_local_data(self, local_table: local_data.ManagedArrowTable): # Might be better as a queue and a worker thread with self._upload_lock: if local_table not in self.cache._uploaded_local_data: - uploaded = self.loader.write_data( - local_table, bigframes.core.guid.generate_guid() - ) + engine = bigframes.options.compute.default_write_engine + if engine == "bigquery_load": + uploaded = self.loader.load_data( + local_table, bigframes.core.guid.generate_guid() + ) + else: + assert engine == "bigquery_write" + uploaded = self.loader.write_data( + local_table, bigframes.core.guid.generate_guid() + ) self.cache.cache_remote_replacement(local_table, uploaded) def _execute_plan_gbq( diff --git a/tests/system/small/test_large_local_data.py b/tests/system/small/test_large_local_data.py index 39885ea853..6f475a9218 100644 --- a/tests/system/small/test_large_local_data.py +++ b/tests/system/small/test_large_local_data.py @@ -23,6 +23,26 @@ large_dataframe.index = large_dataframe.index.astype("Int64") +@pytest.mark.parametrize( + ("default_write_engine",), + [ + pytest.param("bigquery_load"), + pytest.param("bigquery_write"), + ], +) +def test_read_pandas_config_default_engine( + session: bigframes.Session, default_write_engine +): + pytest.importorskip("pandas", minversion="2.0.0") + with bigframes.option_context( + "compute.default_write_engine", + default_write_engine, + ): + bf_df = session.read_pandas(large_dataframe) + + assert_frame_equal(large_dataframe, bf_df.to_pandas()) + + def test_read_pandas_defer_noop(session: bigframes.Session): pytest.importorskip("pandas", minversion="2.0.0") bf_df = session.read_pandas(large_dataframe, write_engine="_deferred") From e86724820c6dd673bf03b776fac173dd4b83161b Mon Sep 17 00:00:00 2001 From: Trevor Bergeron Date: Wed, 14 Jan 2026 19:19:18 +0000 Subject: [PATCH 4/4] fix config app and status bar test --- bigframes/session/__init__.py | 2 +- tests/system/small/test_progress_bar.py | 7 ++++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/bigframes/session/__init__.py b/bigframes/session/__init__.py index 8c52013cbd..f208f0cb50 100644 --- a/bigframes/session/__init__.py +++ b/bigframes/session/__init__.py @@ -1029,7 +1029,7 @@ def _read_pandas( mem_usage = pandas_dataframe.memory_usage(deep=True).sum() if write_engine == "default": write_engine = ( - "bigquery_write" + bigframes.options.compute.default_write_engine if mem_usage > bigframes.constants.MAX_INLINE_BYTES else "bigquery_inline" ) diff --git a/tests/system/small/test_progress_bar.py b/tests/system/small/test_progress_bar.py index d726bfde2c..83ea3b876f 100644 --- a/tests/system/small/test_progress_bar.py +++ b/tests/system/small/test_progress_bar.py @@ -87,13 +87,18 @@ def test_progress_bar_extract_jobs( def test_progress_bar_load_jobs( session: bf.Session, penguins_pandas_df_default_index: pd.DataFrame, capsys ): + # repeat the DF to be big enough to trigger the load job. df = penguins_pandas_df_default_index while len(df) < MAX_INLINE_DF_BYTES: df = pd.DataFrame(np.repeat(df.values, 2, axis=0)) + # default write engine usually streaming, which doesn't have job with bf.option_context( - "display.progress_bar", "terminal" + "display.progress_bar", + "terminal", + "compute.default_write_engine", + "bigquery_load", ), tempfile.TemporaryDirectory() as dir: path = dir + "/test_read_csv_progress_bar*.csv" df.to_csv(path, index=False)