Data pipelines for data science using Google Cloud Composer and Dataflow

We are living in a world where every piece of data is important and can be utilised to create a digital environment where many things are automated and machines understand you better. The science which extract knowledge from this data is data science .

Collecting and storing data is a big challenge for many industries like ecommerce where lots of machine learning models are consuming the data collected and lot of data is generated every hour.

Many ecommerce industries use big query(or similar datasource for bigdata) to store most of the real time data apart from the regular databases. Here i will be covering the design/construction of data pipeline from Big query to some other datasource(say cassandra, bigtable, etc) via dataflow pipelines.

Required Assets:

  1. Google cloud composer (alternative is Apache airflow setup on local machine/vm)
  2. GCS bucket for code upload(or local machine/vm with setup)
  3. Dataflow(can use Direct Runner for local testing)

High Level Flow:

Data Pipeline

Google cloud composer (or airflow) can be used to trigger pipeline or even schedule it on regular basis. It can also provide the essential runtime flags and configs. Also it provides additional functionality to define connections, variables, and secrets.

Airflow DAG Dashboard
Manual Trigger Option in DAG

Dataflow will process the pipeline data in distributed manner.

Within dataflow worker, we can transform the data to the final format by certain mathematics and even download/run some machine learning models required for transformation.

DAG( Directed Acyclic Graph) is a collection of all the tasks you want to run. A DAG is defined in a Python script, which represents the DAGs structure (tasks and their dependencies) as code. Airflow libraries provide lot of operators to achieve these tasks eg:

BigQueryOperator provides functionality to read data from big query tables. and even write to other big query table

with open('test_query.sql', 'r') as file:
query = file.read()
load_job = BigQueryOperator(
dag=dag,
task_id="load_data_from_tableA",
sql=query,
use_legacy_sql=False,
bigquery_conn_id="test-bq",
# you can use another service account, what you need to do is to create a bq_connection in admin > connections
write_disposition="WRITE_TRUNCATE", # either WRITE_TRUNCATE OR WRITE_APPEND
allow_large_results=True,
destination_dataset_table="test-infra-bq.test_dataset.final_table"

In our usecase we will use DataFlowPythonOperator to run dataflow script written in Python. Apache also provides equivalent java operator if your script is in java.

default_args = {
'owner': 'Mohd.Faisal',
'start_date': airflow.utils.dates.days_ago(2),
'depends_on_past': False,
'email': ['mohd.faisal@linkedin.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 0,
'retry_delay': timedelta(minutes=8),
'on_failure_callback': task_fail_slack_alert,
'on_success_callback': task_success_slack_alert,
'dataflow_default_options': {
'project': 'test-infra-bq',
'setup_file': path + '/setup.py',
},
}

dag = DAG('migrate_rank_daily',
description='Test DAG',
schedule_interval="0 20 * * *",
default_args=default_args,
catchup=False)
dataflow_job = DataFlowPythonOperator(
py_file=path + '/dataflow_script.py', task_id='dataflow_test_task',
options={
'job_name': 'migrate-data-daily',
'numWorkers': '6',
'maxNumWorkers': '10',
'cassandra_username': cassandra_username,
'cassandra_password': cassandra_password,
'cassandra_hosts': cassandra_hosts,
'temp_location': 'gs://test-123/test-daily-pipeline/temp',
'subnetwork': 'regions/asia-southeast1/subnetworks/app-123',
'region': 'asia-southeast1',
'worker_zone': 'asia-southeast1-c',
'no_use_public_ips': None,
},
gcp_conn_id='bq-data-conn',
dag=dag)

In the script above path, cassandra_username, cassandra_password, cassandra_hosts are already stored as visible variables or secrets, bq-data-conn is the Big query connection created on airflow/composer using service acccount.

Using DAG operator Airflow/composer will run the dataflow script passed in arguments of operator.

Dataflow script:

from __future__ import absolute_import

import argparse
import logging

import apache_beam as beam
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
PipelineOptions,
SetupOptions,
StandardOptions,
WorkerOptions)
from cassandra.query import dict_factory
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import RoundRobinPolicy
from datetime import date


class vectToDerivatives(beam.DoFn):
def __init__(self, username, password, host_str):
logging.info(" cassandra hosts:", host_str)

self.hosts = host_str.split(",")
self.keyspace = 'test_keyspace'
self.port = 9042
self.failed_ids = 0
self.username = username
self.password = password
self.cluster = None
self.session = None
self.stmt = None
self.query = "INSERT INTO test_cassandra_table (id,feature_a,feature_b) VALUES (?, ?, ?);"

def setup(self):
auth_provider = PlainTextAuthProvider(username=self.username,
password=self.password)
self.cluster = Cluster(self.hosts, auth_provider=auth_provider,
load_balancing_policy=RoundRobinPolicy(),
protocol_version=4)
self.cluster.port = self.port
logging.info("connecting cassandra")
self.session = self.cluster.connect(self.keyspace)
self.session.row_factory = dict_factory
self.stmt = self.session.prepare(self.query)
logging.info("cassandra connection successfull")

def process(self, msg):
qry = self.stmt.bind(
[msg['id'], msg['feature_a'], msg['feature_b'])
try:
self.session.execute(qry)
except Exception as exception:
self.failed_ids = self.failed_ids + 1
logging.info("failed to write id ", msg['product_id'])
logging.error(exception)
yield "OK"


def run(argv=None):
logging.info("Parsing dataflow flags... ")
pipeline_options = PipelineOptions()
pipeline_options.view_as(SetupOptions).save_main_session = True
parser = argparse.ArgumentParser()
parser.add_argument(
'--cassandra_username',
required=True,
help=(
'cassandra db username'))
parser.add_argument(
'--cassandra_password',
required=True,
help=(
'cassandra db password'))
parser.add_argument(
'--cassandra_hosts',
required=True,
help=(
'cassandra db password'))
parser.add_argument(
'--project',
required=True,
help=(
'project id staging or production '))
parser.add_argument(
'--temp_location',
required=True,
help=(
'temp location'))
parser.add_argument(
'--job_name',
required=True,
help=(
'job name'))

known_args, pipeline_args = parser.parse_known_args(argv)
today = date.today()

logging.info("Processing Date is " + str(today))
google_cloud_options = pipeline_options.view_as(GoogleCloudOptions)
google_cloud_options.project = known_args.project
google_cloud_options.job_name = known_args.job_name
google_cloud_options.temp_location = known_args.temp_location
# pipeline_options.view_as(StandardOptions).runner = known_args.runner

with beam.Pipeline(argv=pipeline_args, options=pipeline_options) as p:
query = "Select * from test-infra-bq.test_dataset.final_table"
process_features = (p | 'read product vectors' >> beam.io.Read(
beam.io.BigQuerySource(query=query, use_standard_sql=True)))
logging.debug("staring transform/write")
process_features | 'BQ product vectors to cassandra' >> beam.ParDo(
vectToDerivatives(known_args.cassandra_username, known_args.cassandra_password, known_args.cassandra_hosts))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
logging.info("Starting dataflow daily pipeline ")
try:
run()
except:
pass

beam.io.Read() reads data from Big Query tables(s).

beam.ParDo() performs transformations in distributed method by calling process() method of vectToDerivatives class for each row of Big Query result.

Datflow job can be monitored on google cloud console.

Advantages

  1. Pay as you use: since dataflow workers are spawn runtime , you need to pay for it only when you use it.
  2. Dynamic allocation of dataflow workers: Spawn any number of workers you want with required spec. It also provides flexibility to set max number of workers.
  3. Distributed processing: Many transformations can be done in a distributed way on various workers for the same pipeline.
  4. Centralised logs, visualisation and monitoring in google-cloud composer and Dataflow console.
  5. Besides scheduling it supports manual triggering anytime without any additional code deployment.
  6. Resubmission of failed task supported instead of running whole pipeline again from start.
  7. Rich tested libraries from apache beam available for multiple data operations.
  8. Better handling of configurations , credentials, connections and runtime flags in google cloud composer(or airfow).

Contributor at Tokopedia Search

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store