This module contains example pipelines that use the Beam DataFrame API.
You must have apache-beam>=2.30.0
installed in order to run these pipelines, because the apache_beam.examples.dataframe
module was added in that release. Using the DataFrame API also requires a compatible pandas version to be installed, see the documentation for details.
Wordcount is the "Hello World" of data analytic systems, so of course we had to implement it for the Beam DataFrame API! See wordcount.py
for the implementation. Note it demonstrates how to integrate the DataFrame API with a larger Beam pipeline by using Beam Schemas in conjunction with to_dataframe and to_pcollection.
To run the pipeline locally:
python -m apache_beam.examples.dataframe.wordcount \ --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output counts
This will produce files like counts-XXXXX-of-YYYYY
with contents like:
KING: 243 LEAR: 236 DRAMATIS: 1 PERSONAE: 1 king: 65 of: 447 Britain: 2 OF: 15 FRANCE: 10 DUKE: 3 ...
taxiride.py
contains implementations for two DataFrame pipelines that process the well-known NYC Taxi dataset. These pipelines don't use any Beam primitives. Instead they build end-to-end pipelines using the DataFrame API, by leveraging DataFrame IOs.
The module defines two pipelines. The location_id_agg
pipeline does a grouped aggregation on the drop-off location ID. The borough_enrich
pipeline extends this example by joining the zone lookup table to find the borough where each drop off occurred, and aggregate per borough.
Some snapshots of NYC taxi data have been staged in gs://apache-beam-samples
for use with these example pipelines:
gs://apache-beam-samples/nyc_taxi/2017/yellow_tripdata_2017-*.csv
: CSV files containing taxi ride data for each month of 2017 (similar directories exist for 2018 and 2019).gs://apache-beam-samples/nyc_taxi/misc/sample.csv
: A sample of 1 million records from the beginning of 2019. At ~85 MiB this is a manageable size for processing locally.gs://apache-beam-samples/nyc_taxi/misc/taxi+_zone_lookup.csv
: Lookup table with information about Zone IDs. Used by theborough_enrich
pipeline.
To run the aggregation pipeline locally, use the following command:
python -m apache_beam.examples.dataframe.taxiride \ --pipeline location_id_agg \ --input gs://apache-beam-samples/nyc_taxi/misc/sample.csv \ --output aggregation.csv
This will write the output to files like aggregation.csv-XXXXX-of-YYYYY
with contents like:
DOLocationID,passenger_count 1,3852 3,130 4,7725 5,24 6,37 7,7429 8,24 9,180 10,938 ...
To run the enrich pipeline locally, use the command:
python -m apache_beam.examples.dataframe.taxiride \ --pipeline borough_enrich \ --input gs://apache-beam-samples/nyc_taxi/misc/sample.csv \ --output enrich.csv
This will write the output to files like enrich.csv-XXXXX-of-YYYYY
with contents like:
Borough,passenger_count Bronx,13645 Brooklyn,70654 EWR,3852 Manhattan,1417124 Queens,81138 Staten Island,531 Unknown,28527
flight_delays.py
contains an implementation of a pipeline that processes the flight ontime data from bigquery-samples.airline_ontime_data.flights
. It uses a conventional Beam pipeline to read from BigQuery, apply a 24-hour rolling window, and define a Beam schema for the data. Then it converts to DataFrames in order to perform a complex aggregation using GroupBy.apply
, and write the result out with to_csv
. Note that the DataFrame computation respects the 24-hour window applied above, and results are partitioned into separate files per day.
To run the pipeline locally:
python -m apache_beam.examples.dataframe.flight_delays \ --start_date 2012-12-24 \ --end_date 2012-12-25 \ --output gs://<bucket>/<dir>/delays.csv \ --project <gcp-project> \ --temp_location gs://<bucket>/<dir>
Note a GCP project
and temp_location
are required for reading from BigQuery.
This will produce files like gs://<bucket>/<dir>/delays.csv-2012-12-24T00:00:00-2012-12-25T00:00:00-XXXXX-of-YYYYY
with contents tracking average delays per airline on that day, for example:
airline,departure_delay,arrival_delay EV,10.01901901901902,4.431431431431432 HA,-1.0829015544041452,0.010362694300518135 UA,19.142555438225976,11.07180570221753 VX,62.755102040816325,62.61224489795919 WN,12.074298711144806,6.717968157695224 ...