Tag Archives: Dataflow

Introducing TensorFlow Recorder

When training computer vision machine learning models, data loading can often be a performance bottleneck, causing your GPU or TPU resources to be underutilized while waiting for data to be loaded into the model. Storing your dataset in the efficient TensorFlow Record (TFRecord) format is a great way to solve these problems, but creating TFRecords can unfortunately often require a great deal of complex code.

Last week we open sourced the TensorFlow Recorder project (also known as TFRecorder), which makes it possible for data scientists, data engineers, or AI/ML engineers to create image based TFRecords with just a few lines of code. Using TFRecords is incredibly important for creating efficient TensorFlow ML pipelines, but until now they haven’t been so easy to create. Before TFRecorder, in order to create TFRecords at scale you would have had to write a data pipeline that parsed your structured data, loaded images from storage, and serialized the results into the TFRecord format. TFRecorder allows you to write TFRecords directly from a Pandas dataframe or CSV without writing any complicated code.

You can see an example of TFRecoder below, but first let’s talk about some of the specific advantages of TFRecords.

How TFRecords Can Help

Using the TFRecord file format allows you to store your data in sets of files, each containing a sequence of protocol buffers serialized as a binary record that can be read very efficiently, which will help reduce the data loading bottleneck mentioned above.

Data loading performance can be further improved by implementing prefetching and parallel interleave along with using the TFRecord format. Prefetching reduces the time of each model training step(s) by fetching the data for the next training step while your model is executing training on the current step. Parallel interleave allows you to read from multiple TFRecords shards (pieces of a TFRecord file) and apply preprocessing of those interleaved data streams. This reduces the latency required to read a training batch and is especially helpful when reading data from the network.

Using TensorFlow Recorder

Creating a TFRecord using TFRecorder requires only a few lines of code. Here’s how it works.
import pandas as pd
import tfrecorder
df = pd.read_csv(...)
df.tensorflow.to_tfrecord(output_dir="gs://my/bucket")

TFRecorder currently expects data to be in the same format as Google AutoML Vision.

This format looks like a pandas dataframe or CSV formatted as:
splitimage_urilabel
TRAIN
gs://my/bucket/image1.jpgcat

Where:
  • split can take on the values TRAIN, VALIDATION, and TEST
  • image_uri specifies a local or google cloud storage location for the image file.
  • label can be either a text-based label that will be integerized or an integer
In the future, we hope to extend TensorFlow Recorder to work with data in any format.

While this example would work well to convert a few thousand images into TFRecords, it probably wouldn’t scale well if you have millions of images. To scale up to huge datasets, TensorFlow Recorder provides connectivity with Google Cloud Dataflow, which is a serverless Apache Beam pipeline runner. Scaling up to DataFlow requires only a little bit more configuration.
df.tensorflow.to_tfrecord(
output_dir="gs://my/bucket",
runner="DataFlowRunner",
project="my-project",
region="us-central1)

What’s next?

We’d love for you to try out TensorFlow Recorder. You can get it from GitHub or simply pip install tfrecorder. Tensorflow Recorder is very new and we’d greatly appreciate your feedback, suggestions, and pull requests.

By Mike Bernico and Carlos Ezequiel, Google Cloud AI Engineers

Apache Beam graduates to a top-level project

Please join me in extending a hearty digital “Huzzah!” to the Apache Beam community: as announced today, Apache Beam is an official graduate of the Apache Incubator and is now a full-fledged, top-level Apache project. This achievement is a direct reflection of the hard work the community has invested in transforming Beam into an open, professional and community-driven project.

11 months ago, Google and a number of partners donated a giant pile of code to the Apache Software Foundation, thus forming the incubating Beam project. The bulk of this code composed the Google Cloud Dataflow SDK: the libraries that developers used to write streaming and batch pipelines that ran on any supported execution engine. At the time, the main supported engine was Google’s Cloud Dataflow service with support for Apache Spark and Apache Flink in development); as of today there are five officially supported runners. Though there were many motivations behind the creation of Apache Beam, the one at the heart of everything was a desire to build an open and thriving community and ecosystem around this powerful model for data processing that so many of us at Google spent years refining. But taking a project with over a decade of engineering momentum behind it from within a single company and opening it to the world is no small feat. That’s why I feel today’s announcement is so meaningful.

With that context in mind, let’s look at some statistics squirreled away in the graduation maturity model assessment:

  • Out of the ~22 large modules in the codebase, at least 10 modules have been developed from scratch by the community, with little to no contribution from Google.
  • Since September, no single organization has had more than ~50% of the unique contributors per month.
  • The majority of new committers added during incubation came from outside Google.

And for good measure, here’s a quote from the Vice President of the Apache Incubator, lifted from the public Apache incubator general discussions list where Beam’s graduation was first proposed:

“In my day job as well as part of my work at Apache, I have been very impressed at the way that Google really understands how to work with open source communities like Apache. The Apache Beam project is a great example of this and is a great example of how to build a community." -- Ted Dunning, Vice President of Apache Incubator

The point I’m trying to make here is this: while Google’s commitment to Apache Beam remains as strong as it always has been, everyone involved (both within Google and without) has done an excellent job of building an open source project that’s truly open in the best sense of the word.

This is what makes open source software amazing: people coming together to build great, practical systems for everyone to use because the work is exciting, useful and relevant. This is the core reason I was so excited about us creating Apache Beam in the first place, the reason I’m proud to have played some small part in that journey, and the reason I’m so grateful for all the work the community has invested in making the project a reality.

Naturally, graduation is only one milestone in the lifetime of the project, and we have many more ahead of us, but becoming top-level project is an indication that Apache Beam now has a development community that is ready for prime time.

That means we’re ready to continue pushing forward the state of the art in stream and batch processing. We’re ready to bring the promise of portability to programmatic data processing, much in the way SQL has done so for declarative data analysis. We’re ready to build the things that never would have gotten built had this project stayed confined within the walls of Google. And last but perhaps not least, we’re ready to recoup the vast quantities of text space previously consumed by the mandatory “(incubating)” moniker accompanying all of our initial mentions of Apache Beam!

But seriously, whatever your motivation, please consider joining us along the way. We have an exciting road ahead.

By Tyler Akidau, Apache Beam PMC and Staff Software Engineer at Google

Docker + Dataflow = happier workflows

When I first saw the Google Cloud Dataflow monitoring UI -- with its visual flow execution graph that updates as your job runs, and convenient links to the log messages -- the idea came to me. What if I could take that UI, and use it for something it was never built for? Could it be connected with open source projects aimed at promoting reproducible scientific analysis, like Common Workflow Language (CWL) or Workflow Definition Language (WDL)?
Screenshot of a Dockerflow workflow for DNA sequence analysis.

In scientific computing, it’s really common to submit jobs to a local high-performance computing (HPC) cluster. There are tools to do that in the cloud, like Elasticluster and Starcluster. They replicate the local way of doing things, which means they require a bunch of infrastructure setup and management that the university IT department would otherwise do. Even after you’re set up, you still have to ssh into the cluster to do anything. And then there are a million different choices for workflow managers, each unsatisfactory in its own special way.

By day, I’m a product manager. I hadn’t done any serious coding in a few years. But I figured it shouldn’t be that hard to create a proof-of-concept, just to show that the Apache Beam API that Dataflow implements can be used for running scientific workflows. Now, Dataflow was created for a different purpose, namely, to support scalable data-parallel processing, like transforming giant data sets, or computing summary statistics, or indexing web pages. To use Dataflow for scientific workflows would require wrapping up shell steps that launch VMs, run some code, and shuttle data back and forth from an object store. It should be easy, right?

It wasn’t so bad. Over the weekend, I downloaded the Dataflow SDK, ran the wordcount examples, and started modifying. I had a “Hello, world” proof-of-concept in a day.

To really run scientific workflows would require more, of course. Varying VM shapes, a way to pass parameters from one step to the next, graph definition, scattering and gathering, retries. So I shifted into prototyping mode.

I created a new GitHub project called Dockerflow. With Dockerflow, workflows can be defined in YAML files. They can also be written in pretty compact Java code. You can run a batch of workflows at once by providing a CSV file with one row per workflow to define the parameters.

Dataflow and Docker complement each other nicely:

  • Dataflow provides a fully managed service with a nice monitoring interface, retries,  graph optimization and other niceties.
  • Docker provides portability of the tools themselves, and there's a large library of packaged tools already available as Docker images.

While Dockerflow supports a simple YAML workflow definition, a similar approach could be taken to implement a runner for one of the open standards like CWL or WDL.

To get a sense of working with Dockerflow, here’s “Hello, World” written in YAML:

defn:
  name: HelloWorkflow
steps:
- defn:
    name: Hello
    inputParameters:
      name: message
      defaultValue: Hello, World!
    docker:
      imageName: ubuntu
      cmd: echo $message

And here’s the same example written in Java:

public class HelloWorkflow implements WorkflowDefn {
  @Override
  public Workflow createWorkflow(String[] args) throws IOException {
    Task hello =
        TaskBuilder.named("Hello").input("message", “Hello, World!”).docker(“ubuntu”).script("echo $message").build();
    return TaskBuilder.named("HelloWorkflow").steps(hello).args(args).build();
  }
}

Dockerflow is just a prototype at this stage, though it can run real workflows and includes many nice features, like dry runs, resuming failed runs from mid-workflow, and, of course, the nice UI. It uses Cloud Dataflow in a way that was never intended -- to run scientific batch workflows rather than large-scale data-parallel workloads. I wish I’d written it in Python rather than Java. The Dataflow Python SDK wasn’t quite as mature when I started.

Which is all to say, it’s been a great 20% project, and the future really depends on whether it solves a problem people have, and if others are interested in improving on it. We welcome your contributions and comments! How do you run and monitor scientific workflows today?

By Jonathan Bingham, Google Genomics and Verily Life Sciences

Dataflow and open source – proposal to join the Apache Incubator

Imagine if every time you upgrade your servers you had to learn a new programming framework and rewrite all your applications. That might sound crazy, but it’s what happens with big data pipelines.

It wasn't long ago that Apache Hadoop MapReduce was the obvious engine for all things big data, then Apache Spark came along, and more recently Apache Flink, a streaming-native engine. Unlike upgrading hardware, adopting these more modern engines has generally required rewriting pipelines to adopt engine-specific APIs, often with different implementations for streaming and batch scenarios. This can mean throwing away user code that had just been weathered enough to be considered (mostly) bug-free, and replacing it with immature new code. All of this just because the data pipelines needed to scale better, or have lower latency, or run more cheaply, or complete faster.

Adjusting such aspects should not require throwing away well-tested business logic. You should be able to move your application or data pipeline to the appropriate engine, or to the appropriate environment (e.g., from on-prem to cloud) while keeping the business logic intact. But, to do this, two conditions need to be met. First, you need a portable SDK, which can produce programs that can execute on one of many pluggable execution environments. Second, that SDK has to expose a programming model whose semantics are focused on your workload and not on the capabilities of the underlying engine. For example, MapReduce as a programming model doesn’t meet the bill (even though MapReduce as an execution method might be appropriate in some cases) because it cannot productively express low-latency computations.

Google designed Dataflow specifically to address both of these issues. The Dataflow Java SDK has been architected to support pluggable “runners” to connect to execution engines, of which four currently exist: data Artisans created one for Apache Flink, Cloudera did it for Apache Spark, and Google implemented a single-node local execution runner as well as one for Google’s hosted Cloud Dataflow service.

That portability is possible because the Dataflow programming model is focused on real-life streaming semantics, like real event time (as opposed to the time at which the event arrives), and real sessions (as opposed to whatever arbitrary boundary the batch cycle imposes). This allows Dataflow programs to execute in either batch or stream mode as needed, and to switch from one pluggable execution engine to the other without needing to be rewritten.

Today we’re taking another step in this collaboration. Along with participants from Cloudera, data Artisans, Talend, Cask and PayPal, we sent a proposal for Dataflow to become an Apache Software Foundation (ASF) Incubator project. In this proposal the Dataflow model, Java SDK, and runners will be bundled into one incubating project with the Python SDK joining the project in the future. We believe this proposal is a step towards the ability to define one data pipeline for multiple processing needs, without tradeoffs, which can be run in a number of runtimes, on-premise, in the cloud, or locally. Google Cloud Dataflow will remain as a “no-ops” managed service to execute Dataflow pipelines quickly and cost-effectively in Google Cloud Platform.



With Dataflow, you can write one portable data pipeline, which can be used for either batch or stream, and executed in a number of runtimes including Flink, Spark, Google Cloud Dataflow or the local direct pipeline.

We're excited to propose Dataflow as an Apache Incubator project because we believe the Dataflow model, SDK and runners offer a number of unique features in the open-source data space.


  • Pipeline first, runtime second  With the Dataflow model and SDKs, you focus first on defining your data pipelines, not how they'll run or the characteristics of the particular runner executing them.
  • Portability  Data pipelines are portable across a number of runtime engines. You can choose a runtime based on any number of considerations, such as performance, cost or scalability.
  • Unified model  Batch and streaming are integrated into a unified model with powerful semantics, such as windowing, ordering and triggering.
  • Development tooling  The Dataflow SDK contains the tools you need to create portable data pipelines quickly and easily using open-source languages, libraries and tools.


To understand the power of the Dataflow model, we recommend this article on the O’Reilly Radar: The World Beyond Batch: Streaming 102. For more information about Dataflow, you can also:




We're grateful to the Apache Software Foundation and community for their consideration of the Dataflow proposal and look forward to actively participating in open development of Dataflow.

- Posted by Frances Perry (Software Engineer) and James Malone (Product Manager)

Build a mobile gaming analytics platform

Popular mobile games can attract millions of players and generate terabytes of game-related data in a short burst of time. This places extraordinary pressure on the infrastructure powering these games and requires scalable data analytics services to provide timely, actionable insights in a cost-effective way.

To address these needs, a growing number of successful gaming companies use Google’s web-scale analytics services to create personalized experiences for their players. They use telemetry and smart instrumentation to gain insight into how players engage with the game and to answer questions like: At what game level are players stuck? What virtual goods did they buy? And what's the best way to tailor the game to appeal to both casual and hardcore players?


A new reference architecture describes how you can collect, archive and analyze vast amounts of gaming telemetry data using Google Cloud Platform’s data analytics products. The architecture demonstrates two patterns for analyzing mobile game events:

  • Batch processing: This pattern helps you process game logs and other large files in a fast, parallelized manner. For example, leading mobile gaming company DeNA moved to BigQuery from Hadoop to get faster query responses for their log file analytics pipeline. In this GDC Lightning Talk video they explain the speed benefits of Google’s analytics tools and how the team was able to process large gaming datasets without the need to manage any infrastructure.
  • Real-time processing: Use this pattern when you want to understand what's happening in the game right now. Cloud Pub/Sub and Cloud Dataflow provide a fully managed way to perform a number of data-processing tasks like data cleansing and fraud detection in real-time. For example, you can highlight a player with maximum hit-points outside the valid range. Real-time processing is also a great way to continuously update dashboards of key game metrics, like how many active users are currently logged in or which in-game items are most popular.

Some Cloud Dataflow features are especially useful in a mobile context since messages may be delayed from the source due to mobile Internet connection issues or batteries running out. Cloud Dataflow's built-in session windowing functionality and triggers aggregate events based on the actual time they occurred (event time) as opposed to the time they're processed so that you can still group events together by user session even if there's a delay from the source.

But why choose between one or the other pattern? A key benefit of this architecture is that you can write your data pipeline processing once and execute it in either batch or streaming mode without modifying your codebase. So if you start processing your logs in batch mode, you can easily move to real-time processing in the future. This is an advantage of the high-level Cloud Dataflow model that was released as open source by Google.



Cloud Dataflow loads the processed data into one or more BigQuery tables. BigQuery is built for very large scale, and allows you to run aggregation queries against petabyte-scale datasets with fast response times. This is great for interactive analysis and data exploration, like the example screenshot above, where a simple BigQuery SQL query dynamically creates a Daily Active Users (DAU) graph using Google Cloud Datalab.


And what about player engagement and in-game dynamics? The BigQuery example above shows a bar chart of the ten toughest game bosses. It looks like boss10 killed players more than 75% of the time, much more than the next toughest. Perhaps it would make sense to lower the strength of this boss? Or maybe give the player some more powerful weapons? The choice is yours, but with this reference architecture you'll see the results of your changes straight away. Review the new reference architecture to jumpstart your data-driven quest to engage your players and make your games more successful, contact us, or sign up for a free trial of Google Cloud Platform to get started.

Further Reading and Additional Resources


- Posted by Oyvind Roti, Solutions Architect

Meeting the challenge of financial data transformation

Today’s guest post comes from Salvatore Sferrazza and Sebastian Just from FIS Global, an international provider of financial services and technology solutions. Salvatore and Sebastian tell us how Google Cloud Dataflow transforms fluctuating, large-scale financial services data so that it can be accurately captured and moved across systems.

Much software development in the capital markets (and enterprise systems in general) revolves around the transformation, enrichment and movement of data from one system to another. The unpredictable nature of financial market data volumes, often driven by volatility, exacerbates the pain of scaling and posting data when and where it’s needed for daily trade reconciliation, settlement and regulatory reporting. The implications of technology missteps within such crucial business processes range from missed business opportunities to undesired risk exposure to regulatory non-compliance. These activities must be relentlessly predictable, repeatable and measurable to yield maximum value to stakeholders.

While developers rely on the Extract, Transform and Load (ETL) activities that are so crucial to processing data, they now face limits in terms of the speed and efficiency of ETL as the amount of transactions grows faster than they can process it. As shortened settlement durations and the Consolidated Audit Trail (CAT) loom on the horizon, financial services institutions need simple, fast and powerful approaches to quickly scale and ultimately mitigate time-sensitive risks and operational costs.

Traditionally, developers have considered the activities around ETL data an unglamorous yet necessary dimension of building software products for encapsulating functions that are core to every tier of computing. So when data-driven enterprises are tasked with harvesting insights from massive data sets, it’s quite likely that ETL, in one form or another, is lurking nearby. But in today’s world, data can come from anywhere and in any format, creating a series of labor, time and intellectual challenges. While there may be hundreds of ways to solve the problem, few provide the efficiency and effectiveness so needed in our “big data” world — until recently.

The Google Cloud Dataflow service and its associated software development kit (SDK) provides a series of powerful tools for a myriad of data transformation duties. Designed to perform data processing tasks of any size in a managed services environment, Google Cloud Dataflow simplifies the mechanics of large-scale transformation and supports both batch and stream processing using the same programming model. In our latest white paper, we introduce some of the main concepts behind building and running applications that use Dataflow, then get “hands on” with a job to transform and ingest options market symbol data before storing the transformations within a Google BigQuery data set.

In short, Google Cloud Dataflow allows you to focus on data processing tasks and not cluster management. Rather than asking you to guess the right cluster size, Dataflow automatically scales up or down horizontally as much as needed for your exact processing requirements. This includes scaling all the way down to zero when there is no work, so you’re never paying for an idle cluster. Dataflow also alleviates the pain of writing ETL jobs by standardizing the process of implementing application requirements. As a result, you’ll be able to focus on the data transformations you need to make rather than on the processing mechanics themselves. This not only provides greater flexibility, lower latency and enhanced control of ETL jobs; it offers built-in cost management and ties together other useful Google Cloud services. Beyond common ETL, Dataflow pipelines may also include inline computation ranging from simple counting to highly complex, multi-step analysis. In our experience with the service so far, it can potentially remove much of the work from engineers within financial institutions and regulatory organizations, while providing elasticity to the entire process and ensuring accuracy, scale, performance and cost efficiency.

As market volatility and reporting requirements drive the need for accuracy, low latency and risk reduction, transforming and interpreting market data in a big data world is imperative to trading efficiency and accessibility. Every second counts. With a more cost-effective, real-time and scalable method of processing an ever-increasing volume of data, financial institutions will be able to address specific requirements and volumes at hand while keeping up with the demands of a rapidly evolving global financial system. We hope our experience, as captured in the technical white paper, will prove useful to others in their quest for the more effective way to process data.

Please see this paper’s GitHub page for the complete and buildable project source code.

- Posted by Salvatore Sferrazza, Principal at FIS and Sebastian Just, Manager at FIS

Processing logs at scale using Cloud Dataflow

Logs generated by applications and services can provide an immense amount of information about how your deployment is running and the experiences your users are having as they interact with the products and services. But as deployments grow more complex, gleaning insights from this data becomes more challenging. Logs come from an increasing number of sources, so they can be hard to collate and query for useful information. And building, operating and maintaining your own infrastructure to analyze log data at scale requires extensive expertise in running distributed systems and storage. Today, we’re introducing a new solution paper and reference implementation that will show how you can process logs from multiple sources and extract meaningful information by using Google Cloud Platform and Google Cloud Dataflow.

Log processing typically involves some combination of the following activities:

  • Configuring applications and services
  • Collecting and capturing log files
  • Storing and managing log data
  • Processing and extracting data
  • Persisting insights

Each of those components has it’s own scaling and management challenges, often using different approaches at different times. These sorts of challenges can slow down the generation of meaningful, actionable information from your log data.

Cloud Platform provides a number of services that can help you to address these challenges. You can use Cloud Logging to collect logs from applications and services, and then store them in Google Cloud Storage buckets or stream them to Pub/Sub topics. Dataflow can read from Cloud Storage or Pub/Sub (and many more), process log data, extract and transform metadata and compute aggregations. You can persist the output from Dataflow in BigQuery, where it can be analyzed or reviewed anytime. These mechanisms are offered as managed services—meaning they can scale when needed. That also means that you don't need to worry about provisioning resources up front.

The solution paper and reference implementation describe how you can use Dataflow to process log data from multiple sources and persist findings directly in BigQuery. You’ll learn how to configure Cloud Logging to collect logs from applications running in Container Engine, how to export those logs to Cloud Storage, and how to execute the Dataflow processing job. In addition, the solution shows you how to reconfigure Cloud Logging to use Pub/Sub to stream data directly to Dataflow, so you can process logs in real-time.


Check out the Processing Logs at Scale using Cloud Dataflow solution to learn how to combine logging, storage, processing and persistence into a scalable log processing approach. Then take a look at the reference implementation tutorial on Github to deploy a complete end-to-end working example. Feedback is welcome and appreciated; comment here, submit a pull request, create an issue, or find me on Twitter @crcsmnky and let me know how I can help.

- Posted by Sandeep Parikh, Google Solutions Architect