Consuming a Rest Api from a Google Cloud Dataflow Flex Template job.

Eglis Alvarez
3 min readSep 15, 2021

Data flow templates are a very powerful tool that allows you to organize your pipelines on Google Cloud and run them with the Google Cloud Console, the gcloud command-line tool, or REST API calls. In this exercise, we will consume a rest API from a flex template job.

I- Preparation of the development environment

1- Install Python
2- Install the latest Python SDK: python -m pip install apache-beam
3- Install the latest Python SDK — GCP: python -m pip install apache-beam [GCP]

In the Google Cloud project, create a service account and generate keys to handle authentication. The service account must have the following roles:

  • Cloud Build Service Account
  • Dataflow Admin
  • Dataflow Worker
  • Service Account User
  • Storage Object Admin

II- Construction of the pipeline

For the purposes of this exercise, the credentials and endpoints will be recorded in the code, but in a real project these must be parameterizable:

def process(self, element, **kwargs):

#get access token

payload = {‘grant_type’:’password’,’client_id’:’xxxxx’,’client_secret’:’xxxxx’,’username’:’usuario@domain.co’,’password’:xxxxxx’}

response = requests.post(‘https://endpoint/services/oauth2/token',data=payload)

jsonResponse = response.json()

token = jsonResponse[‘access_token’]

#Calling Api

auth_headers = {‘Authorization’: ‘Bearer ‘+ str(token), }

response = requests.get(‘https://enpoint/method', headers = auth_headers)

return response

III- Dockerfile setup

A Dockerfile is provided to create the image to start the defined flexible template jobs:

FROM gcr.io/dataflow-templates-base/python3-template-launcher-base ARG WORKDIR=/dataflow/template

RUN mkdir -p ${WORKDIR} WORKDIR ${WORKDIR}

COPY requirements.txt .

COPY dataflow.py .

ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=”${WORKDIR}/requirements.txt”

ENV FLEX_TEMPLATE_PYTHON_PY_FILE=”${WORKDIR}/dataflow.py” RUN pip install -U apache-beam[gcp]==2.32.0

It is recommended to handle the apache-beam [gcp] installation in the docker image and not in the requirements file to avoid time-out errors while pulling.
The requirements.txt file must contain all the project dependencies, for this exercise we have only included requests (requests == 2.26.0).

The requirements.txt file must contain all the project dependencies, for the example we have only included requests (requests == 2.26.0).

IV- Metadata file configuration

Dataflow offers the option to configure a JSON metadata file, the purpose of which is to define and validate the inputs expected by the pipeline. In this particular case, we leave the array empty, however we include it in the example as parameters will probably be required in development.

{

“name”: “Test Connection”,

“description”: “Test Connection”,

“parameters”: [ { “name”:”<Param1Name>”, “label”:”<Param1Label>”, “helpText”:”<Param1HelpText>” }, { “name”:”<Param2Name>”, “label”:”<Param2Label>”, “helpText”:”<Param2HelpText>” } ]

}

IV-Execution of the compilation with CloudBuild

The compilation steps should be defined as follows:

steps:

- name: gcr.io/cloud-builders/gcloud

id: Build Docker Image

args: [‘builds’, ‘submit’, ‘ — tag’, ‘gcr.io/${_PROJECT_ID}/${_IMAGE_NAME}:${_IMAGE_TAG}’, ‘.’]

waitFor: [‘-’]

- name: gcr.io/cloud-builders/gcloud

args: [‘beta’,

‘dataflow’,

‘flex-template’,

‘build’,

‘gs://${_BUCKET_NAME}/templates/connection_template.json’,

‘ — image=gcr.io/${_PROJECT_ID}/${_IMAGE_NAME}:${_IMAGE_TAG}’,

‘ — sdk-language=PYTHON’,

‘ — metadata-file=pipeline_metadata.json’

]

waitFor: [‘Build Docker Image’]

Then the build will run:

gcloud builds submit — config=cloudbuild.yaml — project=’project-name'

Now it is possible to run the job:

gcloud beta dataflow flex-template run $JOB_NAME \

— template-file-gcs-location=gs://$BUCKET_NAME/folder_templates/nombre_template.json \

— parameters experiments=disable_flex_template_entrypoint_overrride \

— parameters staging_location=gs://$BUCKET_NAME/staging \

— parameters temp_location=gs://$BUCKET_NAME/temp \

— parameters service_account_email=$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com \

— region=$REGION \

— project=$PROJECT_ID

Success can be seen through the Dataflow user interface:

--

--