Batch load data using the Storage Write API

This document describes how to use the BigQuery Storage Write API to batch load data into BigQuery.

In batch-load scenarios, an application writes data and commits it as a single atomic transaction. When using the Storage Write API to batch load data, create one or more streams in pending type. Pending type supports stream-level transactions. Records are buffered in a pending state until you commit the stream.

For batch workloads, also consider using the Storage Write API through the Apache Spark SQL connector for BigQuery using Dataproc, rather than writing custom Storage Write API code.

The Storage Write API is well-suited to a data pipeline architecture. A main process creates a number of streams. For each stream, it assigns a worker thread or a separate process to write a portion of the batch data. Each worker creates a connection to its stream, writes data, and finalizes its stream when it's done. After all of the workers signal successful completion to the main process, the main process commits the data. If a worker fails, its assigned portion of the data will not show up in the final results, and the whole worker can be safely retried. In a more sophisticated pipeline, workers checkpoint their progress by reporting the last offset written to the main process. This approach can result in a robust pipeline that is resilient to failures.

Batch load data using pending type

To use pending type, the application does the following:

  1. Call CreateWriteStream to create one or more streams in pending type.
  2. For each stream, call AppendRows in a loop to write batches of records.
  3. For each stream, call FinalizeWriteStream. After you call this method, you cannot write any more rows to the stream. If you call AppendRows after calling FinalizeWriteStream, it returns a StorageError with StorageErrorCode.STREAM_FINALIZED in the google.rpc.Status error. For more information the google.rpc.Status error model, see Errors.
  4. Call BatchCommitWriteStreams to commit the streams. After you call this method, the data becomes available for reading. If there is an error committing any of the streams, the error is returned in the stream_errors field of the BatchCommitWriteStreamsResponse.

Committing is an atomic operation, and you can commit multiple streams at once. A stream can only be committed once, so if the commit operation fails, it is safe to retry it. Until you commit a stream, the data is pending and not visible to reads.

After the stream is finalized and before it is committed, the data can remain in the buffer for up to 4 hours. Pending streams must be committed within 24 hours. There is a quota limit on the total size of the pending stream buffer.

The following code shows how to write data in pending type:

C#

To learn how to install and use the client library for BigQuery, see BigQuery client libraries. For more information, see the BigQuery C# API reference documentation.

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

usingGoogle.Api.Gax.Grpc;usingGoogle.Cloud.BigQuery.Storage.V1;usingGoogle.Protobuf;usingSystem;usingSystem.Collections.Generic;usingSystem.Linq;usingSystem.Threading.Tasks;usingstaticGoogle.Cloud.BigQuery.Storage.V1.AppendRowsRequest.Types;publicclassAppendRowsPendingSample{/// <summary>/// This code sample demonstrates how to write records in pending mode./// Create a write stream, write some sample data, and commit the stream to append the rows./// The CustomerRecord proto used in the sample can be seen in Resources folder and generated C# is placed in Data folder in/// https://github.com/GoogleCloudPlatform/dotnet-docs-samples/tree/main/bigquery-storage/api/BigQueryStorage.Samples/// </summary>publicasyncTaskAppendRowsPendingAsync(stringprojectId,stringdatasetId,stringtableId){BigQueryWriteClientbigQueryWriteClient=awaitBigQueryWriteClient.CreateAsync();// Initialize a write stream for the specified table.// When creating the stream, choose the type. Use the Pending type to wait// until the stream is committed before it is visible. See:// https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.TypeWriteStreamstream=newWriteStream{Type=WriteStream.Types.Type.Pending};TableNametableName=TableName.FromProjectDatasetTable(projectId,datasetId,tableId);stream=awaitbigQueryWriteClient.CreateWriteStreamAsync(tableName,stream);// Initialize streaming call, retrieving the stream objectBigQueryWriteClient.AppendRowsStreamrowAppender=bigQueryWriteClient.AppendRows();// Sending requests and retrieving responses can be arbitrarily interleaved.// Exact sequence will depend on client/server behavior.// Create task to do something with responses from server.TaskappendResultsHandlerTask=Task.Run(async()=> {AsyncResponseStream<AppendRowsResponse>appendRowResults=rowAppender.GetResponseStream();while(awaitappendRowResults.MoveNextAsync()){AppendRowsResponseresponseItem=appendRowResults.Current;// Do something with responses.Console.WriteLine($"Appending rows resulted in: {responseItem.AppendResult}");}// The response stream has completed.});// List of records to be appended in the table.List<CustomerRecord>records=newList<CustomerRecord>{newCustomerRecord{CustomerNumber=1,CustomerName="Alice"},newCustomerRecord{CustomerNumber=2,CustomerName="Bob"}};// Create a batch of row data by appending serialized bytes to the// SerializedRows repeated field.ProtoDataprotoData=newProtoData{WriterSchema=newProtoSchema{ProtoDescriptor=CustomerRecord.Descriptor.ToProto()},Rows=newProtoRows{SerializedRows={records.Select(r=>r.ToByteString())}}};// Initialize the append row request.AppendRowsRequestappendRowRequest=newAppendRowsRequest{WriteStreamAsWriteStreamName=stream.WriteStreamName,ProtoRows=protoData};// Stream a request to the server.awaitrowAppender.WriteAsync(appendRowRequest);// Append a second batch of data.protoData=newProtoData{Rows=newProtoRows{SerializedRows={newCustomerRecord{CustomerNumber=3,CustomerName="Charles"}.ToByteString()}}};// Since this is the second request, you only need to include the row data.// The name of the stream and protocol buffers descriptor is only needed in// the first request.appendRowRequest=newAppendRowsRequest{// If Offset is not present, the write is performed at the current end of stream.ProtoRows=protoData};awaitrowAppender.WriteAsync(appendRowRequest);// Complete writing requests to the stream.awaitrowAppender.WriteCompleteAsync();// Await the handler. This will complete once all server responses have been processed.awaitappendResultsHandlerTask;// A Pending type stream must be "finalized" before being committed. No new// records can be written to the stream after this method has been called.awaitbigQueryWriteClient.FinalizeWriteStreamAsync(stream.Name);BatchCommitWriteStreamsRequestbatchCommitWriteStreamsRequest=newBatchCommitWriteStreamsRequest{Parent=tableName.ToString(),WriteStreams={stream.Name}};BatchCommitWriteStreamsResponsebatchCommitWriteStreamsResponse=awaitbigQueryWriteClient.BatchCommitWriteStreamsAsync(batchCommitWriteStreamsRequest);if(batchCommitWriteStreamsResponse.StreamErrors?.Count > 0){// Handle errors here.Console.WriteLine("Error committing write streams. Individual errors:");foreach(StorageErrorerrorinbatchCommitWriteStreamsResponse.StreamErrors){Console.WriteLine(error.ErrorMessage);}}else{Console.WriteLine($"Writes to stream {stream.Name} have been committed.");}}}

Go

To learn how to install and use the client library for BigQuery, see BigQuery client libraries. For more information, see the BigQuery Go API reference documentation.

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

import("context""fmt""io""math/rand""time""cloud.google.com/go/bigquery/storage/apiv1/storagepb""cloud.google.com/go/bigquery/storage/managedwriter""cloud.google.com/go/bigquery/storage/managedwriter/adapt""github.com/GoogleCloudPlatform/golang-samples/bigquery/snippets/managedwriter/exampleproto""google.golang.org/protobuf/proto")// generateExampleMessages generates a slice of serialized protobuf messages using a statically defined// and compiled protocol buffer file, and returns the binary serialized representation.funcgenerateExampleMessages(numMessagesint)([][]byte,error){msgs:=make([][]byte,numMessages)fori:=0;i < numMessages;i++{random:=rand.New(rand.NewSource(time.Now().UnixNano()))// Our example data embeds an array of structs, so we'll construct that first.sList:=make([]*exampleproto.SampleStruct,5)fori:=0;i < int(random.Int63n(5)+1);i++{sList[i]=&exampleproto.SampleStruct{SubIntCol:proto.Int64(random.Int63()),}}m:=&exampleproto.SampleData{BoolCol:proto.Bool(true),BytesCol:[]byte("some bytes"),Float64Col:proto.Float64(3.14),Int64Col:proto.Int64(123),StringCol:proto.String("example string value"),// These types require special encoding/formatting to transmit.// DATE values are number of days since the Unix epoch.DateCol:proto.Int32(int32(time.Now().UnixNano()/86400000000000)),// DATETIME uses the literal format.DatetimeCol:proto.String("2022-01-01 12:13:14.000000"),// GEOGRAPHY uses Well-Known-Text (WKT) format.GeographyCol:proto.String("POINT(-122.350220 47.649154)"),// NUMERIC and BIGNUMERIC can be passed as string, or more efficiently// using a packed byte representation.NumericCol:proto.String("99999999999999999999999999999.999999999"),BignumericCol:proto.String("578960446186580977117854925043439539266.34992332820282019728792003956564819967"),// TIME also uses literal format.TimeCol:proto.String("12:13:14.000000"),// TIMESTAMP uses microseconds since Unix epoch.TimestampCol:proto.Int64(time.Now().UnixNano()/1000),// Int64List is an array of INT64 types.Int64List:[]int64{2,4,6,8},// This is a required field, and thus must be present.RowNum:proto.Int64(23),// StructCol is a single nested message.StructCol:&exampleproto.SampleStruct{SubIntCol:proto.Int64(random.Int63()),},// StructList is a repeated array of a nested message.StructList:sList,}b,err:=proto.Marshal(m)iferr!=nil{returnnil,fmt.Errorf("error generating message %d: %w",i,err)}msgs[i]=b}returnmsgs,nil}// appendToPendingStream demonstrates using the managedwriter package to write some example data// to a pending stream, and then committing it to a table.funcappendToPendingStream(wio.Writer,projectID,datasetID,tableIDstring)error{// projectID := "myproject"// datasetID := "mydataset"// tableID := "mytable"ctx:=context.Background()// Instantiate a managedwriter client to handle interactions with the service.client,err:=managedwriter.NewClient(ctx,projectID)iferr!=nil{returnfmt.Errorf("managedwriter.NewClient: %w",err)}// Close the client when we exit the function.deferclient.Close()// Create a new pending stream. We'll use the stream name to construct a writer.pendingStream,err:=client.CreateWriteStream(ctx,&storagepb.CreateWriteStreamRequest{Parent:fmt.Sprintf("projects/%s/datasets/%s/tables/%s",projectID,datasetID,tableID),WriteStream:&storagepb.WriteStream{Type:storagepb.WriteStream_PENDING,},})iferr!=nil{returnfmt.Errorf("CreateWriteStream: %w",err)}// We need to communicate the descriptor of the protocol buffer message we're using, which// is analagous to the "schema" for the message. Both SampleData and SampleStruct are// two distinct messages in the compiled proto file, so we'll use adapt.NormalizeDescriptor// to unify them into a single self-contained descriptor representation.m:=&exampleproto.SampleData{}descriptorProto,err:=adapt.NormalizeDescriptor(m.ProtoReflect().Descriptor())iferr!=nil{returnfmt.Errorf("NormalizeDescriptor: %w",err)}// Instantiate a ManagedStream, which manages low level details like connection state and provides// additional features like a future-like callback for appends, etc. NewManagedStream can also create// the stream on your behalf, but in this example we're being explicit about stream creation.managedStream,err:=client.NewManagedStream(ctx,managedwriter.WithStreamName(pendingStream.GetName()),managedwriter.WithSchemaDescriptor(descriptorProto))iferr!=nil{returnfmt.Errorf("NewManagedStream: %w",err)}defermanagedStream.Close()// First, we'll append a single row.rows,err:=generateExampleMessages(1)iferr!=nil{returnfmt.Errorf("generateExampleMessages: %w",err)}// We'll keep track of the current offset in the stream with curOffset.varcurOffsetint64// We can append data asyncronously, so we'll check our appends at the end.varresults[]*managedwriter.AppendResultresult,err:=managedStream.AppendRows(ctx,rows,managedwriter.WithOffset(0))iferr!=nil{returnfmt.Errorf("AppendRows first call error: %w",err)}results=append(results,result)// Advance our current offset.curOffset=curOffset+1// This time, we'll append three more rows in a single request.rows,err=generateExampleMessages(3)iferr!=nil{returnfmt.Errorf("generateExampleMessages: %w",err)}result,err=managedStream.AppendRows(ctx,rows,managedwriter.WithOffset(curOffset))iferr!=nil{returnfmt.Errorf("AppendRows second call error: %w",err)}results=append(results,result)// Advance our offset again.curOffset=curOffset+3// Finally, we'll append two more rows.rows,err=generateExampleMessages(2)iferr!=nil{returnfmt.Errorf("generateExampleMessages: %w",err)}result,err=managedStream.AppendRows(ctx,rows,managedwriter.WithOffset(curOffset))iferr!=nil{returnfmt.Errorf("AppendRows third call error: %w",err)}results=append(results,result)// Now, we'll check that our batch of three appends all completed successfully.// Monitoring the results could also be done out of band via a goroutine.fork,v:=rangeresults{// GetResult blocks until we receive a response from the API.recvOffset,err:=v.GetResult(ctx)iferr!=nil{returnfmt.Errorf("append %d returned error: %w",k,err)}fmt.Fprintf(w,"Successfully appended data at offset %d.\n",recvOffset)}// We're now done appending to this stream. We now mark pending stream finalized, which blocks// further appends.rowCount,err:=managedStream.Finalize(ctx)iferr!=nil{returnfmt.Errorf("error during Finalize: %w",err)}fmt.Fprintf(w,"Stream %s finalized with %d rows.\n",managedStream.StreamName(),rowCount)// To commit the data to the table, we need to run a batch commit. You can commit several streams// atomically as a group, but in this instance we'll only commit the single stream.req:=&storagepb.BatchCommitWriteStreamsRequest{Parent:managedwriter.TableParentFromStreamName(managedStream.StreamName()),WriteStreams:[]string{managedStream.StreamName()},}resp,err:=client.BatchCommitWriteStreams(ctx,req)iferr!=nil{returnfmt.Errorf("client.BatchCommit: %w",err)}iflen(resp.GetStreamErrors()) > 0{returnfmt.Errorf("stream errors present: %v",resp.GetStreamErrors())}fmt.Fprintf(w,"Table data committed at %s\n",resp.GetCommitTime().AsTime().Format(time.RFC3339Nano))returnnil}

Java

To learn how to install and use the client library for BigQuery, see BigQuery client libraries. For more information, see the BigQuery Java API reference documentation.

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

importcom.google.api.core.ApiFuture;importcom.google.api.core.ApiFutureCallback;importcom.google.api.core.ApiFutures;importcom.google.api.gax.retrying.RetrySettings;importcom.google.cloud.bigquery.storage.v1.AppendRowsResponse;importcom.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsRequest;importcom.google.cloud.bigquery.storage.v1.BatchCommitWriteStreamsResponse;importcom.google.cloud.bigquery.storage.v1.BigQueryWriteClient;importcom.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;importcom.google.cloud.bigquery.storage.v1.Exceptions;importcom.google.cloud.bigquery.storage.v1.Exceptions.StorageException;importcom.google.cloud.bigquery.storage.v1.FinalizeWriteStreamResponse;importcom.google.cloud.bigquery.storage.v1.JsonStreamWriter;importcom.google.cloud.bigquery.storage.v1.StorageError;importcom.google.cloud.bigquery.storage.v1.TableName;importcom.google.cloud.bigquery.storage.v1.WriteStream;importcom.google.common.util.concurrent.MoreExecutors;importcom.google.protobuf.Descriptors.DescriptorValidationException;importjava.io.IOException;importjava.util.concurrent.ExecutionException;importjava.util.concurrent.Phaser;importjavax.annotation.concurrent.GuardedBy;importorg.json.JSONArray;importorg.json.JSONObject;importorg.threeten.bp.Duration;publicclassWritePendingStream{publicstaticvoidrunWritePendingStream()throwsDescriptorValidationException,InterruptedException,IOException{// TODO(developer): Replace these variables before running the sample.StringprojectId="MY_PROJECT_ID";StringdatasetName="MY_DATASET_NAME";StringtableName="MY_TABLE_NAME";writePendingStream(projectId,datasetName,tableName);}publicstaticvoidwritePendingStream(StringprojectId,StringdatasetName,StringtableName)throwsDescriptorValidationException,InterruptedException,IOException{BigQueryWriteClientclient=BigQueryWriteClient.create();TableNameparentTable=TableName.of(projectId,datasetName,tableName);DataWriterwriter=newDataWriter();// One time initialization.writer.initialize(parentTable,client);try{// Write two batches of fake data to the stream, each with 10 JSON records. Data may be// batched up to the maximum request size:// https://cloud.google.com/bigquery/quotas#write-api-limitslongoffset=0;for(inti=0;i < 2;i++){// Create a JSON object that is compatible with the table schema.JSONArrayjsonArr=newJSONArray();for(intj=0;j < 10;j++){JSONObjectrecord=newJSONObject();record.put("col1",String.format("batch-record %03d-%03d",i,j));jsonArr.put(record);}writer.append(jsonArr,offset);offset+=jsonArr.length();}}catch(ExecutionExceptione){// If the wrapped exception is a StatusRuntimeException, check the state of the operation.// If the state is INTERNAL, CANCELLED, or ABORTED, you can retry. For more information, see:// https://grpc.github.io/grpc-java/javadoc/io/grpc/StatusRuntimeException.htmlSystem.out.println("Failed to append records. \n"+e);}// Final cleanup for the stream.writer.cleanup(client);System.out.println("Appended records successfully.");// Once all streams are done, if all writes were successful, commit all of them in one request.// This example only has the one stream. If any streams failed, their workload may be// retried on a new stream, and then only the successful stream should be included in the// commit.BatchCommitWriteStreamsRequestcommitRequest=BatchCommitWriteStreamsRequest.newBuilder().setParent(parentTable.toString()).addWriteStreams(writer.getStreamName()).build();BatchCommitWriteStreamsResponsecommitResponse=client.batchCommitWriteStreams(commitRequest);// If the response does not have a commit time, it means the commit operation failed.if(commitResponse.hasCommitTime()==false){for(StorageErrorerr:commitResponse.getStreamErrorsList()){System.out.println(err.getErrorMessage());}thrownewRuntimeException("Error committing the streams");}System.out.println("Appended and committed records successfully.");}// A simple wrapper object showing how the stateful stream writer should be used.privatestaticclassDataWriter{privateJsonStreamWriterstreamWriter;// Track the number of in-flight requests to wait for all responses before shutting down.privatefinalPhaserinflightRequestCount=newPhaser(1);privatefinalObjectlock=newObject();@GuardedBy("lock")privateRuntimeExceptionerror=null;voidinitialize(TableNameparentTable,BigQueryWriteClientclient)throwsIOException,DescriptorValidationException,InterruptedException{// Initialize a write stream for the specified table.// For more information on WriteStream.Type, see:// https://googleapis.dev/java/google-cloud-bigquerystorage/latest/com/google/cloud/bigquery/storage/v1/WriteStream.Type.htmlWriteStreamstream=WriteStream.newBuilder().setType(WriteStream.Type.PENDING).build();// Configure in-stream automatic retry settings.// Error codes that are immediately retried:// * ABORTED, UNAVAILABLE, CANCELLED, INTERNAL, DEADLINE_EXCEEDED// Error codes that are retried with exponential backoff:// * RESOURCE_EXHAUSTEDRetrySettingsretrySettings=RetrySettings.newBuilder().setInitialRetryDelay(Duration.ofMillis(500)).setRetryDelayMultiplier(1.1).setMaxAttempts(5).setMaxRetryDelay(Duration.ofMinutes(1)).build();CreateWriteStreamRequestcreateWriteStreamRequest=CreateWriteStreamRequest.newBuilder().setParent(parentTable.toString()).setWriteStream(stream).build();WriteStreamwriteStream=client.createWriteStream(createWriteStreamRequest);// Use the JSON stream writer to send records in JSON format.// For more information about JsonStreamWriter, see:// https://cloud.google.com/java/docs/reference/google-cloud-bigquerystorage/latest/com.google.cloud.bigquery.storage.v1.JsonStreamWriterstreamWriter=JsonStreamWriter.newBuilder(writeStream.getName(),writeStream.getTableSchema()).setRetrySettings(retrySettings).build();}publicvoidappend(JSONArraydata,longoffset)throwsDescriptorValidationException,IOException,ExecutionException{synchronized(this.lock){// If earlier appends have failed, we need to reset before continuing.if(this.error!=null){throwthis.error;}}// Append asynchronously for increased throughput.ApiFuture<AppendRowsResponse>future=streamWriter.append(data,offset);ApiFutures.addCallback(future,newAppendCompleteCallback(this),MoreExecutors.directExecutor());// Increase the count of in-flight requests.inflightRequestCount.register();}publicvoidcleanup(BigQueryWriteClientclient){// Wait for all in-flight requests to complete.inflightRequestCount.arriveAndAwaitAdvance();// Close the connection to the server.streamWriter.close();// Verify that no error occurred in the stream.synchronized(this.lock){if(this.error!=null){throwthis.error;}}// Finalize the stream.FinalizeWriteStreamResponsefinalizeResponse=client.finalizeWriteStream(streamWriter.getStreamName());System.out.println("Rows written: "+finalizeResponse.getRowCount());}publicStringgetStreamName(){returnstreamWriter.getStreamName();}staticclassAppendCompleteCallbackimplementsApiFutureCallback<AppendRowsResponse>{privatefinalDataWriterparent;publicAppendCompleteCallback(DataWriterparent){this.parent=parent;}publicvoidonSuccess(AppendRowsResponseresponse){System.out.format("Append %d success\n",response.getAppendResult().getOffset().getValue());done();}publicvoidonFailure(Throwablethrowable){synchronized(this.parent.lock){if(this.parent.error==null){StorageExceptionstorageException=Exceptions.toStorageException(throwable);this.parent.error=(storageException!=null)?storageException:newRuntimeException(throwable);}}System.out.format("Error: %s\n",throwable.toString());done();}privatevoiddone(){// Reduce the count of in-flight requests.this.parent.inflightRequestCount.arriveAndDeregister();}}}}

Node.js

To learn how to install and use the client library for BigQuery, see BigQuery client libraries. For more information, see the BigQuery Node.js API reference documentation.

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

const{adapt,managedwriter}=require('@google-cloud/bigquery-storage');const{WriterClient,Writer}=managedwriter;constcustomer_record_pb=require('./customer_record_pb.js');const{CustomerRecord}=customer_record_pb;constprotobufjs=require('protobufjs');require('protobufjs/ext/descriptor');asyncfunctionappendRowsPending(){/** * If you make updates to the customer_record.proto protocol buffers definition, * run: * pbjs customer_record.proto -t static-module -w commonjs -o customer_record.js * pbjs customer_record.proto -t json --keep-case -o customer_record.json * from the /samples directory to generate the customer_record module. */// So that BigQuery knows how to parse the serialized_rows, create a// protocol buffer representation of your message descriptor.constroot=protobufjs.loadSync('./customer_record.json');constdescriptor=root.lookupType('CustomerRecord').toDescriptor('proto2');constprotoDescriptor=adapt.normalizeDescriptor(descriptor).toJSON();/** * TODO(developer): Uncomment the following lines before running the sample. */// projectId = 'my_project';// datasetId = 'my_dataset';// tableId = 'my_table';constdestinationTable=`projects/${projectId}/datasets/${datasetId}/tables/${tableId}`;conststreamType=managedwriter.PendingStream;constwriteClient=newWriterClient({projectId});try{constwriteStream=awaitwriteClient.createWriteStreamFullResponse({streamType,destinationTable,});conststreamId=writeStream.name;console.log(`Stream created: ${streamId}`);constconnection=awaitwriteClient.createStreamConnection({streamId,});constwriter=newWriter({connection,protoDescriptor,});letserializedRows=[];constpendingWrites=[];// Row 1letrow={rowNum:1,customerName:'Octavia',};serializedRows.push(CustomerRecord.encode(row).finish());// Row 2row={rowNum:2,customerName:'Turing',};serializedRows.push(CustomerRecord.encode(row).finish());// Set an offset to allow resuming this stream if the connection breaks.// Keep track of which requests the server has acknowledged and resume the// stream at the first non-acknowledged message. If the server has already// processed a message with that offset, it will return an ALREADY_EXISTS// error, which can be safely ignored.// The first request must always have an offset of 0.letoffsetValue=0;// Send batch.letpw=writer.appendRows({serializedRows},offsetValue);pendingWrites.push(pw);serializedRows=[];// Row 3row={rowNum:3,customerName:'Bell',};serializedRows.push(CustomerRecord.encode(row).finish());// Offset must equal the number of rows that were previously sent.offsetValue=2;// Send batch.pw=writer.appendRows({serializedRows},offsetValue);pendingWrites.push(pw);constresults=awaitPromise.all(pendingWrites.map(pw=>pw.getResult()),);console.log('Write results:',results);const{rowCount}=awaitconnection.finalize();console.log(`Row count: ${rowCount}`);constresponse=awaitwriteClient.batchCommitWriteStream({parent:destinationTable,writeStreams:[streamId],});console.log(response);}catch(err){console.log(err);}finally{writeClient.close();}}

Python

This example shows a simple record with two fields. For a longer example that shows how to send different data types, including STRUCT types, see the append_rows_proto2 sample on GitHub.

To learn how to install and use the client library for BigQuery, see BigQuery client libraries. For more information, see the BigQuery Python API reference documentation.

To authenticate to BigQuery, set up Application Default Credentials. For more information, see Set up authentication for client libraries.

"""This code sample demonstrates how to write records in pending modeusing the low-level generated client for Python."""fromgoogle.protobufimportdescriptor_pb2fromgoogle.cloudimportbigquery_storage_v1fromgoogle.cloud.bigquery_storage_v1importtypes,writer# If you update the customer_record.proto protocol buffer definition, run:## protoc --python_out=. customer_record.proto## from the samples/snippets directory to generate the customer_record_pb2.py module.from.importcustomer_record_pb2defcreate_row_data(row_num:int,name:str):row=customer_record_pb2.CustomerRecord()row.row_num=row_numrow.customer_name=namereturnrow.SerializeToString()defappend_rows_pending(project_id:str,dataset_id:str,table_id:str):"""Create a write stream, write some sample data, and commit the stream."""write_client=bigquery_storage_v1.BigQueryWriteClient()parent=write_client.table_path(project_id,dataset_id,table_id)write_stream=types.WriteStream()# When creating the stream, choose the type. Use the PENDING type to wait# until the stream is committed before it is visible. See:# https://cloud.google.com/bigquery/docs/reference/storage/rpc/google.cloud.bigquery.storage.v1#google.cloud.bigquery.storage.v1.WriteStream.Typewrite_stream.type_=types.WriteStream.Type.PENDINGwrite_stream=write_client.create_write_stream(parent=parent,write_stream=write_stream)stream_name=write_stream.name# Create a template with fields needed for the first request.request_template=types.AppendRowsRequest()# The initial request must contain the stream name.request_template.write_stream=stream_name# So that BigQuery knows how to parse the serialized_rows, generate a# protocol buffer representation of your message descriptor.proto_schema=types.ProtoSchema()proto_descriptor=descriptor_pb2.DescriptorProto()customer_record_pb2.CustomerRecord.DESCRIPTOR.CopyToProto(proto_descriptor)proto_schema.proto_descriptor=proto_descriptorproto_data=types.AppendRowsRequest.ProtoData()proto_data.writer_schema=proto_schemarequest_template.proto_rows=proto_data# Some stream types support an unbounded number of requests. Construct an# AppendRowsStream to send an arbitrary number of requests to a stream.append_rows_stream=writer.AppendRowsStream(write_client,request_template)# Create a batch of row data by appending proto2 serialized bytes to the# serialized_rows repeated field.proto_rows=types.ProtoRows()proto_rows.serialized_rows.append(create_row_data(1,"Alice"))proto_rows.serialized_rows.append(create_row_data(2,"Bob"))# Set an offset to allow resuming this stream if the connection breaks.# Keep track of which requests the server has acknowledged and resume the# stream at the first non-acknowledged message. If the server has already# processed a message with that offset, it will return an ALREADY_EXISTS# error, which can be safely ignored.## The first request must always have an offset of 0.request=types.AppendRowsRequest()request.offset=0proto_data=types.AppendRowsRequest.ProtoData()proto_data.rows=proto_rowsrequest.proto_rows=proto_dataresponse_future_1=append_rows_stream.send(request)# Send another batch.proto_rows=types.ProtoRows()proto_rows.serialized_rows.append(create_row_data(3,"Charles"))# Since this is the second request, you only need to include the row data.# The name of the stream and protocol buffers DESCRIPTOR is only needed in# the first request.request=types.AppendRowsRequest()proto_data=types.AppendRowsRequest.ProtoData()proto_data.rows=proto_rowsrequest.proto_rows=proto_data# Offset must equal the number of rows that were previously sent.request.offset=2response_future_2=append_rows_stream.send(request)print(response_future_1.result())print(response_future_2.result())# Shutdown background threads and close the streaming connection.append_rows_stream.close()# A PENDING type stream must be "finalized" before being committed. No new# records can be written to the stream after this method has been called.write_client.finalize_write_stream(name=write_stream.name)# Commit the stream you created earlier.batch_commit_write_streams_request=types.BatchCommitWriteStreamsRequest()batch_commit_write_streams_request.parent=parentbatch_commit_write_streams_request.write_streams=[write_stream.name]write_client.batch_commit_write_streams(batch_commit_write_streams_request)print(f"Writes to stream: '{write_stream.name}' have been committed.")

This code example depends on a compiled protocol module, customer_record_pb2.py. To create the compiled module, execute protoc --python_out=. customer_record.proto, where protoc is the protocol buffer compiler. The customer_record.proto file defines the format of the messages used in the Python example.

// The BigQuery Storage API expects protocol buffer data to be encoded in the// proto2 wire format. This allows it to disambiguate missing optional fields// from default values without the need for wrapper types.syntax="proto2";// Define a message type representing the rows in your table. The message// cannot contain fields which are not present in the table.messageCustomerRecord{optionalstringcustomer_name=1;// Use the required keyword for client-side validation of required fields.requiredint64row_num=2;}