samelamin / spark-bigquery

Google BigQuery support for Spark, Structured Streaming, SQL, and DataFrames with easy Databricks integration.



Build Status

This Spark module allows saving DataFrame as BigQuery table.

The project was inspired by spotify/spark-bigquery, but there are several differences and enhancements:

  • Use of the Structured Streaming API

  • Use within Pyspark

  • Saving via Decorators

  • Allow saving to partitioned tables

  • Easy integration with Databricks

  • Use of Standard SQL

  • Use Of Time-Ingested Partition Columns

  • Run Data Manipulation Language Queries DML

  • Update schemas on writes using the setSchemaUpdateOptions

  • JSON is used as an intermediate format instead of Avro. This allows having fields on different levels named the same:

  "obj": {
    "data": {
      "data": {}
  • DataFrame's schema is automatically adapted to a legal one:

    1. Illegal characters are replaced with _
    2. Field names are converted to lower case to avoid ambiguity
    3. Duplicate field names are given a numeric suffix (_1, _2, etc.)


I created a container that launches zepplin with spark and the connector for ease of use and quick startup. You can find it here


Including spark-bigquery into your project





To use it in a local SBT console first add the package as a dependency then set up your project details

resolvers += Opts.resolver.sonatypeReleases

libraryDependencies += "com.github.samelamin" %% "spark-bigquery" % "0.2.6"
import com.samelamin.spark.bigquery._

// Set up GCP credentials

// Set up BigQuery project and bucket

// Set up BigQuery dataset location, default is US

Structured Streaming from S3/HDFS to BigQuery

S3,Blob Storage or HDFS are the defacto technology for storage in the cloud, this package allows you to stream any data added to a Big Query Table of your choice

import com.samelamin.spark.bigquery._

val df = spark.readStream.json("s3a://bucket")

      .option("checkpointLocation", "s3a://checkpoint/dir")

Structured Streaming from BigQuery Table

You can use this connector to stream from a BigQuery Table. The connector uses a Timestamped column to get offsets.

import com.samelamin.spark.bigquery._

val df = spark

You can also specify a custom timestamp column:

import com.samelamin.spark.bigquery._


You can also specify a custom Time Ingested Partition column:

import com.samelamin.spark.bigquery._


Saving DataFrame using BigQuery Hadoop writer API

By Default any table created by this connector has a timestamp column of bq_load_timestamp which has the value of the current timestamp.

import com.samelamin.spark.bigquery._

val df = ...

You can also save to a table decorator by saving to dataset-id.table-name$YYYYMMDD

Saving DataFrame using Pyspark

from pyspark.sql import SparkSession

BQ_PROJECT_ID = "projectId"
DATASET_ID = "datasetId"
TABLE_NAME = "tableName"

KEY_FILE = "/path/to/service_account.json" # When not on GCP
STAGING_BUCKET = "gcs-bucket"              # Intermediate JSON files
DATASET_LOCATION = "US"                    # Location for dataset creation

# Start session and reference the JVM package via py4j for convienence
session = SparkSession.builder.getOrCreate()
bigquery =

# Prepare the bigquery context
bq = bigquery.BigQuerySQLContext(session._wrapped._jsqlContext)

# Extract and Transform a dataframe
# df =

# Load into a table or table partition
bqDF = bigquery.BigQueryDataFrame(df._jdf)
    "{0}:{1}.{2}".format(BQ_PROJECT_ID, DATASET_ID, TABLE_NAME),
    False, # Day paritioned when created
    0,     # Partition expired when created

Submit with:

pyspark --packages com.github.samelamin:spark-bigquery_2.11:0.2.6


gcloud dataproc jobs submit pyspark --properties spark.jars.packages=com.github.samelamin:spark-bigquery_2.11:0.2.6

Reading DataFrame From BigQuery

import com.samelamin.spark.bigquery._
val sqlContext = spark.sqlContext


val df ="com.samelamin.spark.bigquery").option("tableReferenceSource","bigquery-public-data:samples.shakespeare").load()

### Reading DataFrame From BigQuery in Pyspark

bq =
df= DataFrame(bq.bigQuerySelect("SELECT word, word_count FROM [bigquery-public-data:samples.shakespeare]"), session._wrapped)

Running DML Queries

import com.samelamin.spark.bigquery._

// Load results from a SQL query
sqlContext.runDMLQuery("UPDATE dataset-id.table-name SET test_col = new_value WHERE test_col = old_value")

Please note that DML queries need to be done using Standard SQL

Update Schemas

You can also allow the saving of a dataframe to update a schema:

import com.samelamin.spark.bigquery._


Notes on using this API:

  • Structured Streaming needs a partitioned table which is created by default when writing a stream
  • Structured Streaming needs a timestamp column where offsets are retrieved from, by default all tables are created with a bq_load_timestamp column with a default value of the current timstamp.
  • For use with Databricks please follow this guide


Copyright 2016 samelamin.

Licensed under the Apache License, Version 2.0: