Nearly every big data processing framework supports SDKs in multiple languages and often those SDKs will have differences in the supported features and transformations. It is a challenge for SDK developers to keep the quality and performance of software consistent across programming languages they would like to support. Similarly the data engineers will need to ensure that they are picking the right programming language SDK for their use case.
Apache Beam introduced a concept called Portability Framework to address this challenge by allowing data engineers to mix and match functionalities provided by SDKs in different languages into a single data pipeline.
In this blog post, I will cover the following topics:
- Why does Portability matter ?
- Beam’s Approach to Portability
- Multi-Language Pipeline
- Dataflow Runner V2
If you are not familiar with Apache Beam, I suggest starting with the overview of Apache Beam.
Why does Portability matter ?
There are multiple technical and non-technical reasons, but at a high level:
- You should be able to run your pipeline on multiple execution engines or runners. You might want to run it on the Google Cloud dataflow engine if your CSP of choice is Google or using Flink on AWS and on-premise.
- Groups of data engineers and data scientists might want to use different programming languages. One group might favour Java over Python or vice versa and ideally you should not need to write the same transform across multiple languages to improve productivity.
- Some programming languages have better support for a type of libraries e.g. Python has a very strong support for machine learning libraries such as Tensorflow and PyTorch. You should not need to rewrite your Java pipelines to make use of these libraries.
- And finally you want to improve speed of software delivery and that means reusing the code written by various groups of developers using different programming languages either within your organisation or available as open source.
In summary, portability should improve collaboration across development groups, improve developer velocity and reduce overall cost of developing and maintaining pipelines.
In this blog post, I will try and describe how beam deals with portability from the viewpoint of three personas:
- Engineers building a new runner and wants to make it SDK independent
- Data engineers who would like to make their transforms available across multiple languages
- Data engineers who would like to use transforms from multiple languages in their pipeline.
Beam supports portability in two ways:
You develop a pipeline once and you can run anywhere using one of the supported runners.
Currently supported set of runners are:
Flink, Spark, Google Dataflow, Twister, Hazelcast, Nemo etc. In addition it also supports a DirectRunner which is very useful for pipeline development.
And second, it supports SDK in multiple programming languages to develop pipelines. The most common and feature rich is Java followed by Python, SQL and Go. Additionally a Scala API called Scio has been developed by engineers at Spotify.
One way of supporting multiple language SDKs across a set of run time engines (runners) is by implementing every transformation on all supported language SDKs and on all supported runners.
For example, if you have a set of transformations e.g. BigQuery IO or Kafka IO, the developer of the transformation will need to implement it in Java and Python and Go and so on…
And then the runners will have to support those transformations for each SDK.
Assuming you have m transformations which you want to support in n different programming languages and want to support on p number of runners, very quickly it becomes an intractable problem.
The following diagram shows the complexity.
Maximilian Michels’s talk on this topic is quite comprehensive and worth a watch.
Beam’s Approach to Portability
As mentioned above, I will cover the multi-language pipeline from the viewpoint of three different personas. The first one is the group of engineers building a new runner and wants to make it SDK independent. For them the Runner Authoring Guide contains necessary information, but at a high level Beam has introduced two API layers which allows them to create a SDK independent runner so that it can execute transforms written in multiple supported languages. The two specific APIs are Fn API and Runner API.
Runner API is a SDK independent schema and RPC interface for launching a pipeline and checking status of a job. And the job of Fn API is to provide capability to invoke UDFs from users’ pipelines.
The second group of engineers are those developing transformations in one language (e.g. Java) and want to use a portability framework to make their transformations available to SDK of another language (e.g. Python). Currently there are few such transformations and you can find these listed as “via X-language ” on Apache Beam Built-in I/O Connectors page. One such example is KafkaIO which is written in Java but available in Python.
To make a Java transformation available to Python SDK, you will need to implement a couple of classes and there you have two options:
- Use Python JavaExternalTransform API to directly access Java transforms, but the Java transform class should have a public constructor or a public static method to construct the transform and a public builder method to configure the transform.
Example – https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/kafka.py
The following two classes are worth exploring to see the implementation details.
class ReadFromKafka(ExternalTransform):
"""
An external PTransform which reads from Kafka and returns a KV pair for
each item in the specified Kafka topics. If no Kafka Deserializer for
key/value is provided, then the data will be returned as a raw byte array.
class WriteToKafka(ExternalTransform):
"""
An external PTransform which writes KV data to a specified Kafka topic.
If no Kafka Serializer for key/value is provided, then key/value are
assumed to be byte arrays.
Alternatively, you can implement two interfaces. These are ExternalTransformBuild and ExternalTransformRegistrar.
The ExternalTransformBuild constructs the Java transform based on the configuration passed to the pipeline and ExternalTransformRegistrar will register it with the expansion service.
The transforms should have a unique Uniform Resource Name (URN), which is used for registering it with the expansion service.
Expansion service plays a key role, which creates and injects appropriate language specific pipeline fragments into the pipeline. Note that expansion services are language dependent. In most cases default expansion service should suffice.
You will need to use Beam schema to map constructor and method parameters between Python and Java code.
If we take a step back and see what is actually happening here : A code written in one programming language is being used together with code from another programming language.
Let’s have a quick look at the execution sequence of a multi-language pipeline.
In this case, we have a pipeline in Python which is using a set of transformations to read from and write to a Kafka topic.
- The Kafka IO is written in Java but made available to be used in the Python SDK.
- The expansion service injects serialised objects for Kafka IO in the pipeline.
- When a runner executes the pipeline job, it makes use of language specific SDK harnesses to execute the language specific piece of code.
And lastly; from the perspective of a data engineer wanting to use a transform in a multi-language platform, it’s relatively simple.
In fact, to use a Java transform in your python pipeline, all you need to do is to ensure that you have Java SDK installed on your development environment and then you can use the transform as you will use any other transform from Python packages.
In this case, the ReadFromKafka and WriteToKafka transforms are available in the kafka.io module and as you can see in this code, this is similar to using any other python transforms.
An example code is available at Github – https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/kafkataxi/kafka_taxi.py
…
import apache_beam as beam
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.kafka import WriteToKafka
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
…
…
ride_col = (
pipeline
| ReadFromKafka(
consumer_config={'bootstrap.servers': bootstrap_servers},
topics=[topic],
with_metadata=with_metadata)
…
…
| WriteToKafka(
producer_config={'bootstrap.servers': bootstrap_servers},
topic=topic))
Executing a multi-language pipeline is no different than a single language pipeline. For example you can run the kafka_taxi pipeline as shown below, which is same as running any other pipeline developed using Python SDK.
python3 -m venv env
source env/bin/activate
pip3 install 'apache-beam[gcp]'
export PROJECT="$(gcloud config get-value project)"
export TEMP_LOCATION="gs://demo_bucket_belgium/temp"
export REGION="europe-west1"
export JOB_NAME="kafka-taxi-`date +%Y%m%d-%H%M%S`"
export NUM_WORKERS="5"
python3 -m apache_beam.examples.kafkataxi.kafka_taxi \
--runner DataflowRunner \
--temp_location $TEMP_LOCATION \
--project $PROJECT \
--region $REGION \
--num_workers $NUM_WORKERS \
--job_name $JOB_NAME \
--subnetwork https://www.googleapis.com/compute/v1/projects/$PROJECT_ID/regions/europe-west1/subnetworks/da-vpc-01 \
--service_account_email=xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx \
--bootstrap_servers $BOOTSTRAP_SERVER
As mentioned above to leverage the portability framework and execute multi-language pipeline, runners will need to implement SDK harness. Currently Dataflow, Flink and Spark runners support it.
The dataflow implementation part of runner_V2 for python and it is described in this blog post.
I hope this blog post helped you to understand the multi-language pipeline concepts from the perspective of three different personas. Also the documentation and sample code links should help to expedite development of a multi-language pipeline.