For example: myproject.mydataset.mysparkproc(num INT64)
.
For more information, see pass a value as an IN
parameter or the OUT
and INOUT
parameters in this document.
CONNECTION_PROJECT_ID
: the project that contains the connection to run the Spark procedure.CONNECTION_REGION
: the region that contains the connection to run the Spark procedure—for example, us
.CONNECTION_ID
: the connection ID—for example, myconnection
. When you view the connection details in the Google Cloud console, the connection ID is the value in the last section of the fully qualified connection ID that is shown in Connection ID—for example projects/myproject/locations/connection_location/connections/myconnection
.
RUNTIME_VERSION
: the runtime version of Spark—for example, 2.2
.MAIN_PYTHON_FILE_URI
: the path to a PySpark file—for example, gs://mybucket/mypysparkmain.py
. Alternatively, if you want to add the body of the stored procedure in the CREATE PROCEDURE
statement, then add PYSPARK_CODE
after LANGUAGE PYTHON AS
as shown in the example in Use inline code in this document.
PYSPARK_CODE
: the definition of a PySpark application in the CREATE PROCEDURE
statement if you want to pass the body of the procedure inline. The value is a string literal. If the code includes quotation marks and backslashes, those must be either escaped or represented as a raw string. For example, the code return "\n";
can be represented as one of the following:
"return \"\\n\";"
. Both quotation marks and backslashes are escaped."""return "\\n";"""
. Backslashes are escaped while quotation marks are not.r"""return "\n";"""
. No escaping is needed.MAIN_JAR_URI
: the path of the JAR file that contains the main
class, for example, gs://mybucket/my_main.jar
.CLASS_NAME
: the fully qualified name of a class in a JAR set with the jar_uris
option, for example, com.example.wordcount
.URI
: the path of the JAR file that contains the class specified in the main
class, for example, gs://mybucket/mypysparkmain.jar
.For additional options that you can specify in OPTIONS
, see the procedure option list.
When creating a procedure using the PySpark editor, you don't need to use the CREATE PROCEDURE
statement. Instead, add your Python code directly in the Pyspark editor and save or run your code.
To create a stored procedure for Spark in the PySpark editor, follow these steps:
Go to the BigQuery page.
If you want to type in the PySpark code directly, open the PySpark editor. To open the PySpark editor, click the
menu next to Create SQL query, and then select Create PySpark Procedure.To set options, click More > PySpark Options, and then do the following:
Specify the location where you want to run the PySpark code.
In the Connection field, specify the Spark connection.
In the Stored procedure invocation section, specify the dataset in which you want to store the temporary stored procedures that are generated. You can either set a specific dataset or allow for the use of a temporary dataset to invoke the PySpark code.
The temporary dataset is generated with the location specified in the preceding step. If a dataset name is specified, ensure that the dataset and Spark connection must be in the same location.
In the Parameters section, define parameters for the stored procedure. The value of the parameter is only used during in-session runs of the PySpark code, but the declaration itself is stored in the procedure.
In the Advanced options section, specify the procedure options. For a detailed list of the procedure options, see the procedure option list.
In the Properties section, add the key-value pairs to configure the job. You can use any of the key-value pairs from the Dataproc Serverless Spark properties.
In Service account settings, specify the custom service account, CMEK, staging dataset, and staging Cloud Storage folder to be used during in-session runs of the PySpark code.
Click Save.
After you create the stored procedure by using the PySpark editor, you can save the stored procedure. To do so, follow these steps:
In the Google Cloud console, go to the BigQuery page.
In the query editor, create a stored procedure for Spark using Python with PySpark editor.
Click Save > Save procedure.
In the Save stored procedure dialog, specify the dataset name where you want to store the stored procedure and the name of the stored procedure.
Click Save.
If you only want to run the PySpark code instead of saving it as a stored procedure, you can click Run instead of Save.
The custom container provides the runtime environment for the workload's driver and executor processes. To use custom containers, use the following sample code:
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", container_image="CONTAINER_IMAGE", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE]
Replace the following:
PROJECT_ID
: the project in which you want to create the stored procedure—for example, myproject
.DATASET
: the dataset in which you want to create the stored procedure—for example, mydataset
.PROCEDURE_NAME
: the name of the stored procedure that you want to run in BigQuery—for example, mysparkprocedure
.PROCEDURE_ARGUMENT
: a parameter to enter the input arguments. In this parameter, specify the following fields:
ARGUMENT_MODE
: the mode of the argument. Valid values include IN
, OUT
, and INOUT
. By default the value is IN
.
ARGUMENT_NAME
: the name of the argument.ARGUMENT_TYPE
: the type of the argument.For example: myproject.mydataset.mysparkproc(num INT64)
.
For more information, see pass a value as an IN
parameter or the OUT
and INOUT
parameters in this document.
CONNECTION_PROJECT_ID
: the project that contains the connection to run the Spark procedure.CONNECTION_REGION
: the region that contains the connection to run the Spark procedure—for example, us
.CONNECTION_ID
: the connection ID, for example, myconnection
. When you view the connection details in the Google Cloud console, the connection ID is the value in the last section of the fully qualified connection ID that is shown in Connection ID—for example projects/myproject/locations/connection_location/connections/myconnection
.
RUNTIME_VERSION
: the runtime version of Spark—for example, 2.2
.MAIN_PYTHON_FILE_URI
: the path to a PySpark file—for example, gs://mybucket/mypysparkmain.py
. Alternatively, if you want to add the body of the stored procedure in the CREATE PROCEDURE
statement, then add PYSPARK_CODE
after LANGUAGE PYTHON AS
as shown in the example in Use inline code in this document.
PYSPARK_CODE
: the definition of a PySpark application in the CREATE PROCEDURE
statement if you want to pass the body of the procedure inline. The value is a string literal. If the code includes quotation marks and backslashes, those must be either escaped or represented as a raw string. For example, the code return "\n";
can be represented as one of the following:
"return \"\\n\";"
. Both quotation marks and backslashes are escaped."""return "\\n";"""
. Backslashes are escaped while quotation marks are not.r"""return "\n";"""
. No escaping is needed.CONTAINER_IMAGE
: path of image in artifacts registry. It must only contain libraries to use in your procedure. If not specified, the system default container image associated with the runtime version is used.For more information about how to build a custom container image with Spark, see Build a custom container image.
After you create a stored procedure, you can call it by using one the following options:
Go to the BigQuery page.
In the Explorer pane, expand your project and select the stored procedure for Spark that you want to run.
In the Stored procedure info window, click Invoke stored procedure. Alternatively, you can expand the View actions option and click Invoke.
Click Run.
In the All results section, click View results.
Optional: In the Query results section, follow these steps:
If you want to view Spark driver logs, then click Execution details.
If you want to view logs in Cloud Logging, click Job information, and then in the Log field, click log.
If you want to get the Spark History Server endpoint, click Job information, and then click Spark history server.
To call a stored procedure, use the CALL PROCEDURE
statement:
In the Google Cloud console, go to the BigQuery Studio page.
In the query editor, enter the following statement:
CALL`PROJECT_ID`.DATASET.PROCEDURE_NAME()
Click
Run.For more information about how to run queries, see Run an interactive query.
Instead of using Spark connection's service identity for data access, you can use a custom service account to access data within your Spark code.
To use a custom service account, specify the INVOKER
security mode (using the EXTERNAL SECURITY INVOKER
statement) when you create a Spark stored procedure, and specify the service account when you invoke the stored procedure.
If you want to access and use Spark code from Cloud Storage, you need to grant necessary permissions to the Spark connection's service identify. You need to grant the connection's service account the storage.objects.get
IAM permission or the storage.objectViewer
IAM role.
Optionally, you can grant the connection's service account access to Dataproc Metastore and Dataproc Persistent History Server if you have specified them in the connection. For more information, see Grant access to the service account.
CREATE OR REPLACE PROCEDURE `PROJECT_ID`.DATASET.PROCEDURE_NAME(PROCEDURE_ARGUMENT) EXTERNAL SECURITY INVOKER WITH CONNECTION `CONNECTION_PROJECT_ID.CONNECTION_REGION.CONNECTION_ID` OPTIONS ( engine="SPARK", runtime_version="RUNTIME_VERSION", main_file_uri=["MAIN_PYTHON_FILE_URI"]); LANGUAGE PYTHON [AS PYSPARK_CODE] SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
Optionally, you can add the following arguments to the preceding code:
SET @@spark_proc_properties.staging_bucket='BUCKET_NAME'; SET @@spark_proc_properties.staging_dataset_id='DATASET';
Replace the following:
CUSTOM_SERVICE_ACCOUNT
: Required. A custom service account provided by you.BUCKET_NAME
: Optional. The Cloud Storage bucket that is used as the default Spark application file system. If this is not provided, a default Cloud Storage bucket is created in your project and the bucket is shared by all jobs running under the same project.DATASET
: Optional. The dataset to store the temporary data produced by invoking the procedure. The data is cleaned up after the job is completed. If this is not provided, a default temporary dataset is created for the job.Your custom service account must have the following permissions:
To read and write to the staging bucket used as the default Spark application file system:
storage.objects.*
permissions or the roles/storage.objectAdmin
IAM role on the staging bucket that you specify.storage.buckets.*
permissions or the roles/storage.Admin
IAM role on the project if the staging bucket is not specified.(Optional) To read and write data from and to BigQuery:
bigquery.tables.*
on your BigQuery tables.bigquery.readsessions.*
on your project.roles/bigquery.admin
IAM role includes the previous permissions.(Optional) To read and write data from and to Cloud Storage:
storage.objects.*
permissions or the roles/storage.objectAdmin
IAM role on your Cloud Storage objects.(Optional) To read and write to the staging dataset used for INOUT/OUT
parameters:
bigquery.tables.*
or roles/bigquery.dataEditor
IAM role on the staging dataset that you specify.bigquery.datasets.create
permission or the roles/bigquery.dataEditor
IAM role on the project if the staging dataset is not specified.This section shows examples of how you can create a stored procedure for Apache Spark.
The following example shows how to create a stored procedure for Spark by using the my-project-id.us.my-connection
connection and a PySpark or a JAR file that's stored in a Cloud Storage bucket:
CREATEPROCEDUREmy_bq_project.my_dataset.spark_proc() WITHCONNECTION`my-project-id.us.my-connection` OPTIONS(engine="SPARK",runtime_version="2.2",main_file_uri="gs://my-bucket/my-pyspark-main.py") LANGUAGEPYTHON
Use main_file_uri
to create a stored procedure:
CREATEPROCEDUREmy_bq_project.my_dataset.scala_proc_wtih_main_jar() WITHCONNECTION`my-project-id.us.my-connection` OPTIONS(engine="SPARK",runtime_version="2.2",main_file_uri="gs://my-bucket/my-scala-main.jar") LANGUAGESCALA
Use main_class
to create a stored procedure:
CREATEPROCEDUREmy_bq_project.my_dataset.scala_proc_with_main_class() WITHCONNECTION`my-project-id.us.my-connection` OPTIONS(engine="SPARK",runtime_version="2.2", main_class="com.example.wordcount",jar_uris=["gs://my-bucket/wordcount.jar"]) LANGUAGESCALA
The following example shows how to create a stored procedure for Spark by using the connection my-project-id.us.my-connection
and inline PySpark code:
CREATEORREPLACEPROCEDUREmy_bq_project.my_dataset.spark_proc() WITHCONNECTION`my-project-id.us.my-connection` OPTIONS(engine="SPARK",runtime_version="2.2") LANGUAGEPYTHONASR"""from pyspark.sql import SparkSessionspark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()# Load data from BigQuery.words = spark.read.format("bigquery") \ .option("table", "bigquery-public-data:samples.shakespeare") \ .load()words.createOrReplaceTempView("words")# Perform word count.word_count = words.select('word', 'word_count').groupBy('word').sum('word_count').withColumnRenamed("sum(word_count)", "sum_word_count")word_count.show()word_count.printSchema()# Saving the data to BigQueryword_count.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("wordcount_dataset.wordcount_output")"""
The following examples display the two methods to pass a value as an input parameter in Python:
In the PySpark code, you can obtain the input parameters of the stored procedure for Spark through environment variables in the Spark driver and executors. The name of the environment variable has the format of BIGQUERY_PROC_PARAM.PARAMETER_NAME
, where PARAMETER_NAME
is the name of the input parameter. For example, if the name of the input parameter is var
, the name of the corresponding environment variable is BIGQUERY_PROC_PARAM.var
. The input parameters are JSON encoded. In your PySpark code, you can get the input parameter value in a JSON string from the environment variable and decode it to a Python variable.
The following example shows how to get the value of an input parameter of type INT64
into your PySpark code:
CREATEORREPLACEPROCEDUREmy_bq_project.my_dataset.spark_proc(numINT64) WITHCONNECTION`my-project-id.us.my-connection` OPTIONS(engine="SPARK",runtime_version="2.2") LANGUAGEPYTHONASR"""from pyspark.sql import SparkSessionimport osimport jsonspark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate()sc = spark.sparkContext# Get the input parameter num in JSON string and convert to a Python variablenum = int(json.loads(os.environ["BIGQUERY_PROC_PARAM.num"]))"""
In the PySpark code, you can simply import a built-in library and use it to populate all types of parameters. To pass the parameters to executors, populate the parameters in a Spark driver as Python variables and pass the values to executors. The built-in library supports most of the BigQuery data types except INTERVAL
, GEOGRAPHY
, NUMERIC
, and BIGNUMERIC
.
BigQuery data type | Python data type |
---|---|
BOOL | bool |
STRING | str |
FLOAT64 | float |
INT64 | int |
BYTES | bytes |
DATE | datetime.date |
TIMESTAMP | datetime.datetime |
TIME | datetime.time |
DATETIME | datetime.datetime |
Array | Array |
Struct | Struct |
JSON | Object |
NUMERIC | Unsupported |
BIGNUMERIC | Unsupported |
INTERVAL | Unsupported |
GEOGRAPHY | Unsupported |
The following example shows how to import the built-in library and use it to populate an input parameter of type INT64 and an input parameter of type ARRAY<STRUCT<a INT64, b STRING>> into your PySpark code:
CREATEORREPLACEPROCEDUREmy_bq_project.my_dataset.spark_proc(numINT64,infoARRAY<STRUCT<aINT64,bSTRING>>) WITHCONNECTION`my-project-id.us.my-connection` OPTIONS(engine="SPARK",runtime_version="2.2") LANGUAGEPYTHONASR"""from pyspark.sql import SparkSessionfrom bigquery.spark.procedure import SparkProcParamContextdef check_in_param(x, num): return x['a'] + numdef main(): spark = SparkSession.builder.appName("spark-bigquery-demo").getOrCreate() sc=spark.sparkContext spark_proc_param_context = SparkProcParamContext.getOrCreate(spark) # Get the input parameter num of type INT64 num = spark_proc_param_context.num # Get the input parameter info of type ARRAY<STRUCT<a INT64, b STRING>> info = spark_proc_param_context.info # Pass the parameter to executors df = sc.parallelize(info) value = df.map(lambda x : check_in_param(x, num)).sum()main()"""
In the Java or Scala code, you can obtain the input parameters of the stored procedure for Spark through environment variables in the Spark driver and executors. The name of the environment variable has the format of BIGQUERY_PROC_PARAM.PARAMETER_NAME
, where PARAMETER_NAME
is the name of the input parameter. For example, if the name of the input parameter is var, the name of the corresponding environment variable is BIGQUERY_PROC_PARAM.var
. In your Java or Scala code, you can get the input parameter value from the environment variable.
The following example shows how to get the value of an input parameter from environment variables into your Scala code:
val input_param = sys.env.get("BIGQUERY_PROC_PARAM.input_param").get
The following example shows getting input parameters from environment variables into your Java code:
String input_param = System.getenv("BIGQUERY_PROC_PARAM.input_param");
OUT
and INOUT
parametersOutput parameters return the value from the Spark procedure, whereas the INOUT
parameter accepts a value for the procedure and returns a value from the procedure. To use the OUT
and INOUT
parameters, add the OUT
or INOUT
keyword before the parameter name when creating the Spark procedure. In the PySpark code, you use the built-in library to return a value as an OUT
or an INOUT
parameter. Same as input parameters, the built-in library supports most of the BigQuery data types except INTERVAL
, GEOGRAPHY
, NUMERIC
, and BIGNUMERIC
. The TIME
and DATETIME
type values are converted to the UTC timezone when returning as the OUT
or INOUT
parameters.
CREATEORREPLACEPROCEDUREmy_bq_project.my_dataset.pyspark_proc(INintINT64,INOUTdatetimeDATETIME,OUTbBOOL,OUTinfoARRAY<STRUCT<aINT64,bSTRING>>,OUTtimeTIME,OUTfFLOAT64,OUTbsBYTES,OUTdateDATE,OUTtsTIMESTAMP,OUTjsJSON) WITHCONNECTION`my_bq_project.my_dataset.my_connection` OPTIONS(engine="SPARK",runtime_version="2.2")LANGUAGEPYTHONAS R"""from pyspark.sql.session import SparkSessionimport datetimefrom bigquery.spark.procedure import SparkProcParamContextspark = SparkSession.builder.appName("bigquery-pyspark-demo").getOrCreate()spark_proc_param_context = SparkProcParamContext.getOrCreate(spark)# Reading the IN and INOUT parameter values.int = spark_proc_param_context.intdt = spark_proc_param_context.datetimeprint("INparametervalue:", int, ",INOUTparametervalue:", dt)# Returning the value of the OUT and INOUT parameters.spark_proc_param_context.datetime = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)spark_proc_param_context.b = Truespark_proc_param_context.info = [{"a":2, "b":"dd"}, {"a":2, "b":"dd"}]spark_proc_param_context.time = datetime.time(23, 20, 50, 520000)spark_proc_param_context.f = 20.23spark_proc_param_context.bs = b"hello"spark_proc_param_context.date = datetime.date(1985, 4, 12)spark_proc_param_context.ts = datetime.datetime(1970, 1, 1, 0, 20, 0, 2, tzinfo=datetime.timezone.utc)spark_proc_param_context.js = {"name": "Alice", "age": 30}""";
The following example shows how to transform a Hive Metastore table and write the results to BigQuery:
CREATEORREPLACEPROCEDUREmy_bq_project.my_dataset.spark_proc() WITHCONNECTION`my-project-id.us.my-connection` OPTIONS(engine="SPARK",runtime_version="2.2") LANGUAGEPYTHONASR"""from pyspark.sql import SparkSessionspark = SparkSession \ .builder \ .appName("PythonSparkSQLDataprocHiveMetastoreintegrationtestexample") \ .enableHiveSupport() \ .getOrCreate()spark.sql("CREATEDATABASEIFNOTEXISTSrecords")spark.sql("CREATETABLEIFNOTEXISTSrecords.student(eidint,nameString,scoreint)")spark.sql("INSERTINTOrecords.studentVALUES(1000000,'AlicesChen',10000)")df = spark.sql("SELECT*FROMrecords.student")df.write.format("bigquery") \ .option("writeMethod", "direct") \ .save("records_dataset.student")"""
After you call a stored procedure for Spark, you can view the log information. To obtain the Cloud Logging filter information and the Spark History Cluster endpoint, use the bq show
command. The filter information is available under the SparkStatistics
field of the child job. To get log filters, follow these steps:
Go to the BigQuery page.
In the query editor, list child jobs of the stored procedure's script job:
bqls-j--parent_job_id=$parent_job_id
To learn how to get the job ID, see View job details.
The output is similar to the following:
jobIdJobTypeStateStartTimeDuration ----------------------------------------------------------------------------------------------- script_job_90fb26c32329679c139befcc638a7e71_0querySUCCESS07Sep18:00:270:05:15.052000
Identify the jobId
for your stored procedure and use the bq show
command to view details of the job:
bqshow--format=prettyjson--job$child_job_id
Copy the sparkStatistics
field because you need it in another step.
The output is similar to the following:
{"configuration":{...} … "statistics":{… "query":{"sparkStatistics":{"loggingInfo":{"projectId":"myproject", "resourceType":"myresource"}, "sparkJobId":"script-job-90f0", "sparkJobLocation":"us-central1"}, … }}}
For Logging, generate log filters with the SparkStatistics
fields:
resource.type=sparkStatistics.loggingInfo.resourceType resource.labels.resource_container=sparkStatistics.loggingInfo.projectId resource.labels.spark_job_id=sparkStatistics.sparkJobId resource.labels.location=sparkStatistics.sparkJobLocation
The logs are written in the bigquery.googleapis.com/SparkJob
monitored resource. The logs are labeled by the INFO
, DRIVER
, and EXECUTOR
components. To filter logs from the Spark driver, add the labels.component = "DRIVER"
component to the log filters. To filter logs from the Spark executor, add the labels.component = "EXECUTOR"
component to the log filters.
BigQuery Spark procedure uses the customer-managed encryption key (CMEK) to protect your content, along with the default encryption provided by BigQuery. To use the CMEK in the Spark procedure, first trigger creation of the BigQuery encryption service account and grant the required permissions. Spark procedure also supports the CMEK organization policies if they are applied to your project.
If your stored procedure is using the INVOKER
security mode, your CMEK should be specified through the SQL system variable when calling the procedure. Otherwise, your CMEK can be specified through the connection associated with the stored procedure.
To specify the CMEK through the connection when you create a Spark stored procedure, use the following sample code:
bq mk --connection --connection_type='SPARK' \ --properties='{"kms_key_name"="projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME"}' \ --project_id=PROJECT_ID \ --location=LOCATION \ CONNECTION_NAME
To specify CMEK through the SQL system variable when calling the procedure, use the following sample code:
SET @@spark_proc_properties.service_account='CUSTOM_SERVICE_ACCOUNT'; SET @@spark_proc_properties.kms_key_name='projects/PROJECT_ID/locations/LOCATION/keyRings/KEY_RING_NAME/cryptoKeys/KMS_KEY_NAME; CALL PROJECT_ID.DATASET_ID.PROCEDURE_NAME();
VPC Service Controls lets you set up a secure perimeter to guard against data exfiltration. To use VPC Service Controls with a Spark procedure for additional security, first create a service perimeter.
To fully protect your Spark procedure jobs, add the following APIs to the service perimeter:
bigquery.googleapis.com
)logging.googleapis.com
)storage.googleapis.com
), if you use Cloud Storageartifactregistry.googleapis.com
) or Container Registry API (containerregistry.googleapis.com
), if you use a custom containermetastore.googleapis.com
) and Cloud Run Admin API (run.googleapis.com
), if you use Dataproc MetastoreAdd the spark procedure's query project into the perimeter. Add other projects that host your Spark code or data into the perimeter.
When you use a connection in your project for the first time, it takes about an extra minute to provision. To save time, you can reuse an existing Spark connection when you create a stored procedure for Spark.
When you create a Spark procedure for production use, Google recommends specifying a runtime version. For a list of supported runtime versions, see Dataproc Serverless runtime versions. We recommended to use the Long-Time-Support (LTS) version.
When you specify a custom container in a Spark procedure, we recommend using Artifact Registry and image streaming.
For better performance, you can specify resource allocation properties in the Spark procedure. Spark stored procedures support a list of resource allocation properties same as Dataproc Serverless.
EU
or US
, are not supported.For information about quotas and limits, see stored procedures for Spark quotas and limits.
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
Last updated 2025-04-17 UTC.