Streamline Your Data Ingestion with Event-Driven Serverless Loads into BigQuery from GCS using Cloud Functions

Samet Karadag
Google Cloud - Community
8 min readOct 13, 2023

--

In today’s data-driven world, the ability to efficiently ingest, transform, and analyze data in real-time is crucial for organizations to gain insights and make data-driven decisions. Google Cloud Platform (GCP) provides a powerful and flexible solution for handling data ingestion through an event-driven architecture, and in this article, we’ll walk you through how to set up event-driven loads into BigQuery using a Python-based Cloud Function.

What is Event-Driven Data Ingestion?

Event-driven data ingestion is a mechanism where data is loaded into a target system, like BigQuery, in response to specific events or triggers. These events can be changes in a storage bucket, the arrival of a new file, a message in a queue, or any other predefined event that you want to act on. Event-driven data ingestion is highly efficient, as it minimizes latency and automates the process, ensuring that data is loaded into BigQuery as soon as it’s available.

Use Case: Automatically Ingesting Data into BigQuery When a File is Uploaded to GCS

Imagine you have a Google Cloud Storage (GCS) bucket where data files are constantly being uploaded. You want to load these files into BigQuery, but the data can be in either CSV or JSON format. Furthermore, JSON files may be complex dictionaries with lists, which need to be converted into newline-delimited JSON format for optimal processing in BigQuery. You also want to create separate tables in BigQuery, with the table names derived from the GCS bucket names.

Let’s see how you can achieve this using a Google Cloud Function. Below is a Python-based Cloud Function code that demonstrates this:

import os
import json
from google.cloud import bigquery
from google.cloud import storage
import functions_framework
import pprint

# Set your GCS bucket and BigQuery dataset and table names
BIGQUERY_DATASET_ID = 'ingestion'

def prepare_json_file(file_bucket, file_name):
# Initialize GCS client
storage_client = storage.Client()
temp_file_name = './'+file_bucket+'_'+file_name

# Download the file from GCS to a local temporary file
bucket = storage_client.bucket(file_bucket)
blob = bucket.blob(file_name)
blob.download_to_filename(temp_file_name)

# Read the content of the temporary file
with open(temp_file_name, 'r') as file:
# lines = file.readlines()
json_file = json.load(file)

records = [json.dumps(record) for record in json_file["records"]]

newline_delimited_json = '\n'.join(records)

# Check if the file has at least 2 lines
if len(newline_delimited_json) >= 2:

# # Write the modified content to a new temporary file
with open(temp_file_name, 'w') as file:
file.writelines(newline_delimited_json)

# Upload the modified file to GCS, to the process_json bucket
bucket = storage_client.bucket('preprocess_json')
blob = bucket.blob(file_name)
blob.upload_from_filename(temp_file_name)
print(f"Converted '{file_name}' to newline_delimited_json format in GCS bucket '{file_bucket}'.")

else:
print("The file does not have enough lines to remove the first and last lines.")

# Clean up the local temporary file
os.remove(temp_file_name)
return file_name

def load_data_to_bigquery(data):
# Get the file details from the event
file_bucket = data['bucket']
file_name = data['name']

BIGQUERY_TABLE_ID = file_bucket.split('-')[1].upper()

# Initialize BigQuery and Storage clients
bigquery_client = bigquery.Client()
storage_client = storage.Client()

# Construct the BigQuery table reference
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(BIGQUERY_TABLE_ID)

# Define job configuration
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.CSV if file_name.endswith('.csv') else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

# If it is a JSON file remove first and last line to convert it to newline_delimited_json rather than an array
if file_name.endswith('.json'):
file_name = prepare_json_file(file_bucket, file_name)
file_bucket = 'preprocess_json'

# Load data from GCS to BigQuery
uri = f"gs://{file_bucket}/{file_name}"
load_job = bigquery_client.load_table_from_uri(uri, table_ref, job_config=job_config)

load_job.result() # Wait for the job to complete

print(f"File {file_name} loaded into BigQuery table {BIGQUERY_DATASET_ID}.{BIGQUERY_TABLE_ID}")
return (f"File {file_name} loaded into BigQuery table {BIGQUERY_DATASET_ID}.{BIGQUERY_TABLE_ID}")

# Entry point for Cloud Function
# Triggered by a change in a storage bucket

@functions_framework.cloud_event
def process_gcs_event(cloud_event):
data = cloud_event.data
bucket = data['bucket']
file_name = data['name']
timeCreated = data["timeCreated"]

print(f"Data: {data}")
print(f"Bucket: {bucket}")
print(f"File: {file_name}")
print(f"Created: {timeCreated}")
return_msg = load_data_to_bigquery(data)

return return_msg

Understanding the Code

  1. This cloud function job is to load file to bigquery as soon as it is uploaded to GCS bucket. This enables event driven architecture and minimizes to latency of the load job.
  2. Cloud function both supports CSV and JSON data types. If it is a CSV file it directly invokes bqload, if it is a JSON file such as a dictionary with list, it preprocess this file within the function such that it converts json file into newline_delimited_json format.
  3. Event-Driven Triggers: The Cloud Function is triggered when a file is uploaded to a GCS bucket. It automatically detects whether the file is in CSV or JSON format and handles the conversion accordingly.
  4. Dynamic Table Naming: The function derives the BigQuery table name from the GCS bucket name. This allows for dynamic table creation based on the source of the data. Cloud function uses the table name relevant to the GCS bucket name such that the name adter ‘-’ character will be the staging table_name in the ingestion dataset.

How to Deploy the Cloud Function

Here is sample flow to create the cloud function via GCP Console:

Cloud Function

It will open up Evetarc trigger window:

Eventarc Trigger

Select your source bucket (where you want to trigger a bq load job when new files are uploaded) and grant eventReceiver role to the service account which will be receiving events.

If you are not using default compute service account, make sure that your service account has the necessary roles such as ‘Cloud Run Invoker’, ‘Service Account Token Creator’, ‘Bigquery Data Editor’, ‘Bigquery Job User’ and Storage object viewer on the source bucket via GCS.

Set your service account (the one you granted eventReceiver) and adjust cpu/memory settings.

Tip: If the function doesn’t do any pre-processing of data, you can give small resources i.e:1 CPU/1GB and adjust parallelism to 10. In our example function the function doesn’t do preprocessing for CSV but it does for JSON to convert array into NEWLINE_DELIMITED_JSON file. So for the json we should ensure that it has enough memory to fit the files.

Cloud function advanced Settings

Requirements are:

google_cloud_bigquery==3.11.4
google_cloud_storage==2.11.0
functions-framework==3.*

Bonus: Concurrent Loads

You may already recognized that the above function is non-deterministic and will loose data with concurrent uploads such as when 2 files are uploaded at the same time as it has the “WRITE_TRUNCATE” write disposition in bigquery.WriteDisposition.WRITE_TRUNCATE.

One workaround can be changing this to WRITE_APPEND so both loads are covered in the staging environment.

In this case it will be a bit tricky to curate this data after the load. Your curation or ELT job should check newer data by checking load_time, an increasing key or need a merge job.

A better and scalable way is to run the curation jobs in parallel per load. This requires each load jobs creates a table with a timestamp(in micro seconds)

Here is the updated cloud function that can support concurrent uploads by creating new table per load and setting table_expiration to 1 day:

import os
import json
from google.cloud import bigquery
from google.cloud import storage
import functions_framework
import time
import datetime
from flask import jsonify

# Set your GCS bucket and BigQuery dataset and table names
BIGQUERY_DATASET_ID = 'ingestion'

def prepare_json_file(file_bucket, file_name):
# Initialize GCS client
storage_client = storage.Client()
temp_file_name = './'+file_bucket+'_'+file_name

# Download the file from GCS to a local temporary file
bucket = storage_client.bucket(file_bucket)
blob = bucket.blob(file_name)
blob.download_to_filename(temp_file_name)

# Read the content of the temporary file
with open(temp_file_name, 'r') as file:
# lines = file.readlines()
json_file = json.load(file)

records = [json.dumps(record) for record in json_file["records"]]

newline_delimited_json = '\n'.join(records)

# Check if the file has at least 2 lines
if len(newline_delimited_json) >= 2:

# # Write the modified content to a new temporary file
with open(temp_file_name, 'w') as file:
file.writelines(newline_delimited_json)

# Upload the modified file to GCS, to the process_json bucket
bucket = storage_client.bucket('preprocess_json')
blob = bucket.blob(file_name)
blob.upload_from_filename(temp_file_name)
print(f"Converted '{file_name}' to newline_delimited_json format in GCS bucket '{file_bucket}'.")

else:
print("The file does not have enough lines to remove the first and last lines.")

# Clean up the local temporary file
os.remove(temp_file_name)
return file_name


def get_table_schema(table_id):
# Initialize BigQuery client
bigquery_client = bigquery.Client()

# Get the schema of the existing table
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(table_id)
table = bigquery_client.get_table(table_ref)

return table.schema

def create_empty_table(table_id, schema):
# Initialize BigQuery client
bigquery_client = bigquery.Client()

# Construct the BigQuery table reference
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(table_id)

# Define the schema for the new table
table = bigquery.Table(table_ref, schema=schema)

# set table to expire 5 days from now
expiration = datetime.datetime.now(datetime.timezone.utc) + datetime.timedelta(
days=1
)

table.expires = expiration

# Create the empty table in BigQuery
bigquery_client.create_table(table)

print(f"Empty table {table_id} created in BigQuery.")

def load_data_to_bigquery(data):

file_bucket = data['bucket']
file_name = data['name']

BIGQUERY_TABLE_ID = file_bucket.split('-')[1].upper()
# Create a new table name with a timestamp epoch
epoch_timestamp = str(int(time.time()*1000000))
new_table_name = f"{BIGQUERY_TABLE_ID}_{epoch_timestamp}"

# Initialize BigQuery and Storage clients
bigquery_client = bigquery.Client()
storage_client = storage.Client()

# Construct the BigQuery table reference
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(BIGQUERY_TABLE_ID)

# Get the schema from an existing table (e.g., ingestion.DP1)
schema = bigquery_client.get_table(table_ref).schema # Change to the desired table

# Construct the BigQuery table reference
table_ref = bigquery_client.dataset(BIGQUERY_DATASET_ID).table(new_table_name)

# Create a new empty table with the same schema as the existing table
create_empty_table(new_table_name, schema)

# Define job configuration
job_config = bigquery.LoadJobConfig(
autodetect=True,
source_format=bigquery.SourceFormat.CSV if file_name.endswith('.csv') else bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
write_disposition=bigquery.WriteDisposition.WRITE_TRUNCATE,
)

# If it is a JSON file remove first and last line to convert it to newline_delimited_json rather than an array
if file_name.endswith('.json'):
file_name = prepare_json_file(file_bucket, file_name)
file_bucket = 'preprocess_json'

# Load data from GCS to BigQuery
uri = f"gs://{file_bucket}/{file_name}"
load_job = bigquery_client.load_table_from_uri(uri, table_ref, job_config=job_config)

load_job.result() # Wait for the job to complete

print(f"File {file_name} loaded into BigQuery table {BIGQUERY_DATASET_ID}.{new_table_name}")
return new_table_name

# Entry point for Cloud Function
# Triggered by a change in a storage bucket
@functions_framework.cloud_event
def process_gcs_event(cloud_event):
data = cloud_event.data
bucket = data['bucket']
file_name = data['name']
timeCreated = data["timeCreated"]

print(f"Data: {data}")
print(f"Bucket: {bucket}")
print(f"File: {file_name}")
print(f"Created: {timeCreated}")
return_msg = load_data_to_bigquery(data)


return jsonify({ 'table_name' : return_msg })

Hope you find this useful and please let me know your thoughts in the comments.

--

--