How to Run Batch Data Jobs with GCP Batch and Cloud Workflows

Samet Karadag
Google Cloud - Community
7 min readJun 15, 2023

--

There are instances where batch data jobs need to run for extended periods. Some of the reasons may be API rate limits or interactions with multiple systems.

While serverless options like Cloud Run and Cloud Functions have duration limitations, and Spark serverless may not be the best fit for sequential batch jobs, Google Cloud Platform (GCP) provides a powerful combination of GCP Batch and Cloud Workflows to overcome these challenges.

In this post, we will explore how to use GCP Batch and Cloud Workflows together to run sequential batch data jobs that last for long hours.

If you want to submit a job which takes an hour or day Cloud Run -> Create Job is the place with the preview feature of 1440 minutes or 24 hours and 60 minutes in GA!

Understanding the Architecture: GCP Batch is a managed service that allows you to run batch jobs on GCP infrastructure, while Cloud Workflows provides a way to define, orchestrate, and manage workflows in a serverless manner. Combining the two, we can create a workflow that submits batch jobs and handles retries in case of failures.

Step1: Containerize Your Batch Job: To start with, you need to build your batch python job and containerize it so that it can be provied to GCP Batch.

If you don’t have any python batch job here is a sample one that you can play with;

import requests
from google.cloud import bigquery
import os
from google.cloud import storage
import subprocess

#Optional if you want to use a downloaded service account key, or test with docker-compose
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "./your_key.json"

# Step 1: Call News API and ingest data into BigQuery
def call_news_api_and_ingest():
# Call the News API to retrieve data
api_key = 'YOUR_NEWS_API_KEY'
url = 'https://newsapi.org/v2/top-headlines?country=us&apiKey=' + api_key
response = requests.get(url)
news_data = response.json()

# Prepare the data for ingestion into BigQuery
news_articles = news_data['articles']
rows_to_insert = []
for article in news_articles:
rows_to_insert.append(
{
'title': article['title'],
'description': article['description'],
'url': article['url']
}
)

# Set up BigQuery client
project_id = 'YOUR_PROJECT_ID'
dataset_id = 'YOUR_DATASET_ID'
table_id = 'YOUR_TABLE_ID'
client = bigquery.Client(project=project_id)

# Ingest data into BigQuery
table_ref = client.dataset(dataset_id).table(table_id)
errors = client.insert_rows(table_ref, rows_to_insert)

if errors:
raise Exception(f'Error inserting rows into BigQuery: {errors}')
else:
print('Data successfully ingested into BigQuery.')

Above code gets some news from newsapi and ingests it into a BQ table.

Next you need to containerize your batch job and upload to GCP container registry.

Write your Dockerfile:

FROM python:3.9


WORKDIR /script

COPY requirements.txt .

RUN pip install -r requirements.txt

COPY your-code.py .
COPY your-sa-key.json .

ENTRYPOINT [ "python3", "your-code.py"]

Here is a sample batch job that can do the job:

#!/bin/bash

# Set up GCP Container Registry variables
PROJECT_ID="YOUR_PROJECT_ID"
REPO_NAME="YOUR_ARTIFACT_REGISTRY_NAME"
IMAGE_NAME="news-api-ingester"
TAG="latest"

# Build the Docker image
docker build -t "europe-west4-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${TAG}" .

# Authenticate Docker with GCP Container Registry
gcloud auth configure-docker europe-west4-docker.pkg.dev

# Push the Docker image to GCP Container Registry
docker push "europe-west4-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${TAG}"

echo "Containerization and image upload completed."

If you want to test your code manually you can submit your job within GCP Console -> Compute-> Batch page;

Find batch within Menu or search for it,
Create a new job
Fill the details and your image url
Define parallism and resource needs to run your job

Here is an equivalent command line;

gcloud beta batch jobs submit job-livs47yr --location europe-west4 --config - <<EOD
{
"name": "projects/sentiment-forecast/locations/europe-west4/jobs/job-livs47yr",
"taskGroups": [
{
"taskCount": "1",
"parallelism": "1",
"taskSpec": {
"computeResource": {
"cpuMilli": "1000",
"memoryMib": "512"
},
"runnables": [
{
"container": {
"imageUri": "europe-west4-docker.pkg.dev/${PROJECT_ID}/${REPO_NAME}/${IMAGE_NAME}:${TAG}",
"entrypoint": "",
"volumes": []
}
}
],
"volumes": []
}
}
],
"allocationPolicy": {
"instances": [
{
"policy": {
"provisioningModel": "STANDARD",
"machineType": "e2-medium"
}
}
],
"location": {
"allowedLocations": [
"zones/europe-west4-a"
]
}
},
"logsPolicy": {
"destination": "CLOUD_LOGGING"
}
}
EOD

Step 2: Write the Workflow YAML: To get started, create a new Cloud Workflows YAML file. Let’s name it sequential-batch-job.yaml. Copy the provided sample Cloud Workflows YAML and save it in the file.

Here is a sample Cloud Workflows YAML that submits your batch job with automatic retries:

Below is a legacy way which submits a batch job, monitors and waits for completition. Skip to the link after the code to see how it is done with the new batch connector for workflow.

main:
params: [input]
steps:
- init:
assign:
- projectId: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- region: "europe-west4"
- batchApi: "batch.googleapis.com/v1"
- batchApiUrl: ${"https://" + batchApi + "/projects/" + projectId + "/locations/" + region + "/jobs"}
- imageUri: "europe-west4-docker.pkg.dev/sentiment-forecast/batch/related-tweets:latest"
- jobId: ${"job-tweet-filter" + string(int(sys.now()))}
- retryCount: 0 # Add a retry count variable
- createAndRunBatchJob:
call: http.post
args:
url: ${batchApiUrl}
query:
job_id: ${jobId}
headers:
Content-Type: application/json
auth:
type: OAuth2
body:
taskGroups:
taskSpec:
runnables:
- container:
imageUri: ${imageUri}
# Run 1 task on 1 VM
taskCount: 1
parallelism: 1
logsPolicy:
destination: CLOUD_LOGGING
result: createAndRunBatchJobResponse
- getJob:
call: http.get
args:
url: ${batchApiUrl + "/" + jobId}
auth:
type: OAuth2
result: getJobResult
- logState:
call: sys.log
args:
data: ${"Current job state " + getJobResult.body.status.state}
- checkState:
switch:
- condition: ${getJobResult.body.status.state == "SUCCEEDED"}
next: returnResult
- condition: ${getJobResult.body.status.state == "FAILED" and retryCount < 10} # Add condition for retry
next: retryJob
- condition: ${getJobResult.body.status.state == "FAILED" and retryCount >= 10} # Add condition for failure after retries
next: failExecution
next: sleep
- retryJob:
assign:
- retryCount: ${retryCount + 1} # Increment the retry count
next: sleepforretry
- sleep:
call: sys.sleep
args:
seconds: 600
next: getJob
- sleepforretry:
call: sys.sleep
args:
seconds: 600
next: createAndRunBatchJob
- returnResult:
return:
jobId: ${jobId}
- failExecution:
raise:
message: ${"The underlying batch job " + jobId + " failed"}

[ UPDATE ] : As of June 2023 we have Batch Connector for Workflows which makes it more seemless to run the batch job and wait for completion. Check this code to see how it is used. (Thanks Mete Atamel and Guillaume Laforge for the reference!)

Step 3: Customizing the Workflow YAML: Within the YAML file, there are placeholders that need to be replaced with your project-specific information. Update the following values:

  • projectId: Replace ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")} with your actual GCP project ID.
  • region: If desired, update the region to the appropriate one based on your GCP project's location.
  • imageUri: Replace europe-west4-docker.pkg.dev/sentiment-forecast/batch/related-tweets:latest with the URI of your batch job image.
  • retryCount: If you want a different maximum number of retry attempts, update 10 to your preferred value.

Step 4: Understanding the Workflow Steps The workflow YAML contains the following main steps:

  • init: Initializes variables used throughout the workflow.
  • createAndRunBatchJob: Submits the batch job using an HTTP POST request to the GCP Batch API.
  • getJob: Retrieves the current state of the submitted batch job.
  • logState: Logs the current state of the batch job.
  • checkState: Checks the state of the batch job and determines the next step based on the condition.
  • retryJob: Increments the retry count and proceeds to sleep.
  • sleep: Delays the workflow for a specified duration (600 seconds in the sample) before checking the job status again.
  • returnResult: Returns the job ID if the job succeeds.
  • failExecution: Raises an exception if the job fails after the maximum number of retries.

Here is the flow diagram of our workflow;

Workflow diagram

Step 5: Saving and Deploying the Workflow Save the modified YAML file and deploy the workflow using the following command:

gcloud workflows deploy sequential-batch-job --source=sequential-batch-job.yaml

or via console:

Search for Workflows on top bar
Create new Workflow

Fill your preferences

Optionally you can add Cloud Scheduler or PubSub message as the trigger and create a new Cloud Scheduler;

Paste your YAML

Step 6: Monitoring the Workflow Once the workflow is deployed, you can monitor its progress using the Cloud Workflows Console or the command-line interface. You can also integrate with Cloud Monitoring to set up alerts and notifications based on the workflow’s execution status.

Conclusion: By combining GCP Batch and Cloud Workflows, you can effectively run long-running batch data jobs that involve interactions with multiple systems or are subject to API rate limits. This tutorial demonstrated how to create a workflow that submits batch jobs and handles retries automatically. Using Python for coding convenience, you can extend this approach to meet your specific requirements and process large volumes of data efficiently.

Don’t forget to make your batch job idempotent to ensure retryable and reliable processing.

Happy data engineering!

--

--