Skip to content

Commit 239071e

Browse files
committed
feat: Add support for quota project
1 parent 2d71951 commit 239071e

File tree

8 files changed

+158
-25
lines changed

8 files changed

+158
-25
lines changed

README.md

Lines changed: 23 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -197,14 +197,17 @@ Follow [this document](https://cloud.google.com/dataproc/docs/concepts/component
197197
* This includes the table's schema (column names and data types), partitioning information, constraints etc.
198198
It doesn't contain the actual table data.
199199
* SQL Command for Catalog Table Creation
200-
```java
200+
```sql
201201
CREATE TABLE sample_catalog_table
202-
(name STRING) // Schema Details
202+
(name STRING) -- Schema Details
203203
WITH
204-
('connector' = 'bigquery',
205-
'project' = '<bigquery_project_name>',
206-
'dataset' = '<bigquery_dataset_name>',
207-
'table' = '<bigquery_table_name>');
204+
(
205+
'connector' = 'bigquery',
206+
'project' = '<bigquery_project_name>',
207+
'dataset' = '<bigquery_dataset_name>',
208+
'table' = '<bigquery_table_name>',
209+
'quota-project-id' = '<gcp_project_name>' -- Optional, defaults to credentials or project
210+
);
208211
```
209212

210213

@@ -296,7 +299,7 @@ wait for the job to complete.
296299

297300
### Sink Configurations
298301

299-
The connector supports a number of options to configure the source.
302+
The connector supports a number of options to configure the sink.
300303

301304
| Property | Data Type | Description |
302305
|----------------------------------------------|------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
@@ -437,18 +440,18 @@ sourceTable = sourceTable.select($("*"));
437440

438441
The connector supports a number of options to configure the source.
439442

440-
| Property | Data Type | Description |
441-
|----------------------------------------------|--------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
442-
| `projectId` | String | Google Cloud Project ID of the table. This config is required, and assumes no default value. |
443-
| `dataset` | String | Dataset containing the table. This config is required, and assumes no default value. |
444-
| `table` | String | BigQuery table name (not the full ID). This config is required, and assumes no default value. |
445-
| `credentialsOptions` | CredentialsOptions | Google credentials for connecting to BigQuery. This config is optional, and default behavior is to use the `GOOGLE_APPLICATION_CREDENTIALS` environment variable.<br/>**Note**: The query bounded source only uses default application credentials. |
446-
| `columnNames` | List&lt;String&gt; | Columns to project from the table. If unspecified, all columns are fetched. |
447-
| `limit` | Integer | Maximum number of rows to read from source table **per task slot**. If unspecified, all rows are fetched. |
448-
| `maxRecordsPerSplitFetch` | Integer | Maximum number of records to read from a split once Flink requests fetch. If unspecified, the default value used is 10000. <br/>**Note**: Configuring this number too high may cause memory pressure in the task manager, depending on the BigQuery record's size and total rows on the stream. |
449-
| `maxStreamCount` | Integer | Maximum read streams to open during a read session. BigQuery can return a lower number of streams than specified based on internal optimizations. If unspecified, this config is not set and BigQuery has complete control over the number of read streams created. |
450-
| `rowRestriction` | String | BigQuery SQL query for row filter pushdown. If unspecified, all rows are fetched. |
451-
| `snapshotTimeInMillis` | Long | Time (in milliseconds since epoch) for the BigQuery table snapshot to read. If unspecified, the latest snapshot is read. |
443+
| Property | Data Type | Description |
444+
|---------------------------|--------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
445+
| `projectId` | String | Google Cloud Project ID of the table. This config is required, and assumes no default value. |
446+
| `dataset` | String | Dataset containing the table. This config is required, and assumes no default value. |
447+
| `table` | String | BigQuery table name (not the full ID). This config is required, and assumes no default value. |
448+
| `credentialsOptions` | CredentialsOptions | Google credentials for connecting to BigQuery. This config is optional, and default behavior is to use the `GOOGLE_APPLICATION_CREDENTIALS` environment variable.<br/>**Note**: The query bounded source only uses default application credentials. |
449+
| `columnNames` | List&lt;String&gt; | Columns to project from the table. If unspecified, all columns are fetched. |
450+
| `limit` | Integer | Maximum number of rows to read from source table **per task slot**. If unspecified, all rows are fetched. |
451+
| `maxRecordsPerSplitFetch` | Integer | Maximum number of records to read from a split once Flink requests fetch. If unspecified, the default value used is 10000. <br/>**Note**: Configuring this number too high may cause memory pressure in the task manager, depending on the BigQuery record's size and total rows on the stream. |
452+
| `maxStreamCount` | Integer | Maximum read streams to open during a read session. BigQuery can return a lower number of streams than specified based on internal optimizations. If unspecified, this config is not set and BigQuery has complete control over the number of read streams created. |
453+
| `rowRestriction` | String | BigQuery SQL query for row filter pushdown. If unspecified, all rows are fetched. |
454+
| `snapshotTimeInMillis` | Long | Time (in milliseconds since epoch) for the BigQuery table snapshot to read. If unspecified, the latest snapshot is read. |
452455

453456

454457
### Datatypes
@@ -568,6 +571,7 @@ to provide it:
568571
[here](https://cloud.google.com/docs/authentication/client-libraries).
569572
- In case the environment variable cannot be changed, the credentials file can be configured as a connector option. The
570573
file should reside on the same path on all the nodes of the cluster.
574+
- The quota project can be set in the connector options or through the credentials.
571575

572576
### How to fix classloader error in Flink application?
573577

flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/BigQueryDynamicTableFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ public Set<ConfigOption<?>> optionalOptions() {
7272
additionalOptions.add(BigQueryConnectorOptions.CREDENTIALS_ACCESS_TOKEN);
7373
additionalOptions.add(BigQueryConnectorOptions.CREDENTIALS_FILE);
7474
additionalOptions.add(BigQueryConnectorOptions.CREDENTIALS_KEY);
75+
additionalOptions.add(BigQueryConnectorOptions.QUOTA_PROJECT_ID);
7576
additionalOptions.add(BigQueryConnectorOptions.TEST_MODE);
7677
additionalOptions.add(BigQueryConnectorOptions.DELIVERY_GUARANTEE);
7778
additionalOptions.add(BigQueryConnectorOptions.SINK_PARALLELISM);
@@ -100,6 +101,7 @@ public Set<ConfigOption<?>> forwardOptions() {
100101
forwardOptions.add(BigQueryConnectorOptions.CREDENTIALS_ACCESS_TOKEN);
101102
forwardOptions.add(BigQueryConnectorOptions.CREDENTIALS_FILE);
102103
forwardOptions.add(BigQueryConnectorOptions.CREDENTIALS_KEY);
104+
forwardOptions.add(BigQueryConnectorOptions.QUOTA_PROJECT_ID);
103105
forwardOptions.add(BigQueryConnectorOptions.DELIVERY_GUARANTEE);
104106
forwardOptions.add(BigQueryConnectorOptions.SINK_PARALLELISM);
105107
forwardOptions.add(BigQueryConnectorOptions.ENABLE_TABLE_CREATION);

flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryConnectorOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ private BigQueryConnectorOptions() {}
5959
.noDefaultValue()
6060
.withDescription("Specifies the BigQuery table name.");
6161

62+
/**
63+
* [OPTIONAL] The quota project ID to use when connecting.
64+
*/
65+
public static final ConfigOption<String> QUOTA_PROJECT_ID =
66+
ConfigOptions.key("quota-project-id")
67+
.stringType()
68+
.noDefaultValue()
69+
.withDescription("Specifies the BigQuery quota project to use.");
70+
6271
/**
6372
* [OPTIONAL, Read Configuration] Integer value indicating the maximum number of rows/records to
6473
* be read from source. <br>

flink-1.17-connector-bigquery/flink-connector-bigquery/src/main/java/com/google/cloud/flink/bigquery/table/config/BigQueryTableConfigurationProvider.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,8 @@ public BigQueryConnectOptions translateBigQueryConnectOptions() {
126126
config.get(BigQueryConnectorOptions.CREDENTIALS_FILE))
127127
.setCredentialsKey(
128128
config.get(BigQueryConnectorOptions.CREDENTIALS_KEY))
129+
.setQuotaProjectId(
130+
config.get(BigQueryConnectorOptions.QUOTA_PROJECT_ID))
129131
.build())
130132
.build();
131133
}

flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/common/config/CredentialsOptions.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@ public abstract class CredentialsOptions implements Serializable {
4242
@Nullable
4343
public abstract String getAccessToken();
4444

45+
@Nullable
46+
public abstract String getQuotaProjectId();
47+
4548
/**
4649
* Returns the Google Credentials created given the provided configuration.
4750
*
@@ -60,6 +63,7 @@ public final int hashCode() {
6063
hash = 61 * hash + Objects.hashCode(getCredentialsFile());
6164
hash = 61 * hash + Objects.hashCode(getCredentialsKey());
6265
hash = 61 * hash + Objects.hashCode(getAccessToken());
66+
hash = 61 * hash + Objects.hashCode(getQuotaProjectId());
6367
return hash;
6468
}
6569

@@ -77,6 +81,7 @@ public final boolean equals(Object obj) {
7781
final CredentialsOptions other = (CredentialsOptions) obj;
7882
return Objects.equals(this.getCredentialsFile(), other.getCredentialsFile())
7983
&& Objects.equals(this.getCredentialsKey(), other.getCredentialsKey())
84+
&& Objects.equals(this.getQuotaProjectId(), other.getQuotaProjectId())
8085
&& Objects.equals(this.getAccessToken(), other.getAccessToken());
8186
}
8287

@@ -117,6 +122,14 @@ public abstract static class Builder {
117122
*/
118123
public abstract Builder setAccessToken(String credentialsToken);
119124

125+
/**
126+
* Sets the BigQuery quota project ID to use.
127+
*
128+
* @param quotaProjectId The BigQuery project ID
129+
* @return this builder's instance
130+
*/
131+
public abstract Builder setQuotaProjectId(String quotaProjectId);
132+
120133
/**
121134
* Builds a fully initialized {@link CredentialsOptions} instance.
122135
*

flink-connector-bigquery-common/src/main/java/com/google/cloud/flink/bigquery/services/BigQueryServicesImpl.java

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import com.google.api.services.bigquery.Bigquery;
3030
import com.google.api.services.bigquery.model.Table;
3131
import com.google.api.services.bigquery.model.TableSchema;
32+
import com.google.auth.Credentials;
3233
import com.google.cloud.bigquery.BigQuery;
3334
import com.google.cloud.bigquery.BigQueryOptions;
3435
import com.google.cloud.bigquery.Dataset;
@@ -135,12 +136,16 @@ public void cancel() {
135136
/** Implementation of a BigQuery read client wrapper. */
136137
public static class StorageReadClientImpl implements StorageReadClient {
137138
private final BigQueryReadClient client;
139+
private final String quotaProjectId;
138140

139141
private StorageReadClientImpl(CredentialsOptions options) throws IOException {
142+
Credentials credentials = options.getCredentials();
143+
quotaProjectId = BigQueryUtils.getQuotaProjectId(options, credentials);
144+
140145
BigQueryReadSettings.Builder settingsBuilder =
141146
BigQueryReadSettings.newBuilder()
142-
.setCredentialsProvider(
143-
FixedCredentialsProvider.create(options.getCredentials()))
147+
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
148+
.setQuotaProjectId(quotaProjectId)
144149
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
145150
.setTransportChannelProvider(
146151
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
@@ -177,7 +182,9 @@ private StorageReadClientImpl(CredentialsOptions options) throws IOException {
177182

178183
@Override
179184
public ReadSession createReadSession(CreateReadSessionRequest request) {
180-
return client.createReadSession(request);
185+
CreateReadSessionRequest updatedRequest =
186+
BigQueryUtils.updateWithQuotaProject(request, quotaProjectId);
187+
return client.createReadSession(updatedRequest);
181188
}
182189

183190
@Override
@@ -196,10 +203,12 @@ public static class StorageWriteClientImpl implements StorageWriteClient {
196203
private final BigQueryWriteClient client;
197204

198205
private StorageWriteClientImpl(CredentialsOptions options) throws IOException {
206+
Credentials credentials = options.getCredentials();
207+
String quotaProjectId = BigQueryUtils.getQuotaProjectId(options, credentials);
199208
BigQueryWriteSettings.Builder settingsBuilder =
200209
BigQueryWriteSettings.newBuilder()
201-
.setCredentialsProvider(
202-
FixedCredentialsProvider.create(options.getCredentials()))
210+
.setCredentialsProvider(FixedCredentialsProvider.create(credentials))
211+
.setQuotaProjectId(quotaProjectId)
203212
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
204213
.setTransportChannelProvider(
205214
BigQueryReadSettings.defaultGrpcTransportProviderBuilder()
@@ -320,9 +329,12 @@ public static class QueryDataClientImpl implements QueryDataClient {
320329
private final Bigquery bigquery;
321330

322331
public QueryDataClientImpl(CredentialsOptions options) {
332+
Credentials credentials = options.getCredentials();
333+
String quotaProjectId = BigQueryUtils.getQuotaProjectId(options, credentials);
323334
bigQuery =
324335
BigQueryOptions.newBuilder()
325-
.setCredentials(options.getCredentials())
336+
.setCredentials(credentials)
337+
.setQuotaProjectId(quotaProjectId)
326338
.setHeaderProvider(USER_AGENT_HEADER_PROVIDER)
327339
.build()
328340
.getService();

0 commit comments

Comments
 (0)