Consuming a Rest Api from a Google Cloud Dataflow Flex Template job.
Data flow templates are a very powerful tool that allows you to organize your pipelines on Google Cloud and run them with the Google Cloud Console, the gcloud command-line tool, or REST API calls. In this exercise, we will consume a rest API from a flex template job.
I- Preparation of the development environment
1- Install Python
2- Install the latest Python SDK: python -m pip install apache-beam
3- Install the latest Python SDK — GCP: python -m pip install apache-beam [GCP]
In the Google Cloud project, create a service account and generate keys to handle authentication. The service account must have the following roles:
- Cloud Build Service Account
- Dataflow Admin
- Dataflow Worker
- Service Account User
- Storage Object Admin
II- Construction of the pipeline
For the purposes of this exercise, the credentials and endpoints will be recorded in the code, but in a real project these must be parameterizable:
def process(self, element, **kwargs):
#get access token
payload = {‘grant_type’:’password’,’client_id’:’xxxxx’,’client_secret’:’xxxxx’,’username’:’usuario@domain.co’,’password’:xxxxxx’}
response = requests.post(‘https://endpoint/services/oauth2/token',data=payload)
jsonResponse = response.json()
token = jsonResponse[‘access_token’]
#Calling Api
auth_headers = {‘Authorization’: ‘Bearer ‘+ str(token), }
response = requests.get(‘https://enpoint/method', headers = auth_headers)
return response
III- Dockerfile setup
A Dockerfile is provided to create the image to start the defined flexible template jobs:
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base ARG WORKDIR=/dataflow/template
RUN mkdir -p ${WORKDIR} WORKDIR ${WORKDIR}
COPY requirements.txt .
COPY dataflow.py .
ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE=”${WORKDIR}/requirements.txt”
ENV FLEX_TEMPLATE_PYTHON_PY_FILE=”${WORKDIR}/dataflow.py” RUN pip install -U apache-beam[gcp]==2.32.0
It is recommended to handle the apache-beam [gcp] installation in the docker image and not in the requirements file to avoid time-out errors while pulling.
The requirements.txt file must contain all the project dependencies, for this exercise we have only included requests (requests == 2.26.0).
The requirements.txt file must contain all the project dependencies, for the example we have only included requests (requests == 2.26.0).
IV- Metadata file configuration
Dataflow offers the option to configure a JSON metadata file, the purpose of which is to define and validate the inputs expected by the pipeline. In this particular case, we leave the array empty, however we include it in the example as parameters will probably be required in development.
{
“name”: “Test Connection”,
“description”: “Test Connection”,
“parameters”: [ { “name”:”<Param1Name>”, “label”:”<Param1Label>”, “helpText”:”<Param1HelpText>” }, { “name”:”<Param2Name>”, “label”:”<Param2Label>”, “helpText”:”<Param2HelpText>” } ]
}
IV-Execution of the compilation with CloudBuild
The compilation steps should be defined as follows:
steps:
- name: gcr.io/cloud-builders/gcloud
id: Build Docker Image
args: [‘builds’, ‘submit’, ‘ — tag’, ‘gcr.io/${_PROJECT_ID}/${_IMAGE_NAME}:${_IMAGE_TAG}’, ‘.’]
waitFor: [‘-’]
- name: gcr.io/cloud-builders/gcloud
args: [‘beta’,
‘dataflow’,
‘flex-template’,
‘build’,
‘gs://${_BUCKET_NAME}/templates/connection_template.json’,
‘ — image=gcr.io/${_PROJECT_ID}/${_IMAGE_NAME}:${_IMAGE_TAG}’,
‘ — sdk-language=PYTHON’,
‘ — metadata-file=pipeline_metadata.json’
]
waitFor: [‘Build Docker Image’]
Then the build will run:
gcloud builds submit — config=cloudbuild.yaml — project=’project-name'
Now it is possible to run the job:
gcloud beta dataflow flex-template run $JOB_NAME \
— template-file-gcs-location=gs://$BUCKET_NAME/folder_templates/nombre_template.json \
— parameters experiments=disable_flex_template_entrypoint_overrride \
— parameters staging_location=gs://$BUCKET_NAME/staging \
— parameters temp_location=gs://$BUCKET_NAME/temp \
— parameters service_account_email=$SERVICE_ACCOUNT_NAME@$PROJECT_ID.iam.gserviceaccount.com \
— region=$REGION \
— project=$PROJECT_ID
Success can be seen through the Dataflow user interface:
