Example pipelines using the Beam YAML API. These examples can also be found on github.
Note: These examples below are automatically tested for correctness and may be used as a starting point for your own pipelines.
This examples reads from a public file stored on Google Cloud. This
requires authenticating with Google Cloud, or setting the file in
ReadFromText
to a local file.
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.
This pipeline reads in a text file, counts distinct words found in the text, then logs a row containing each word and its count.
pipeline:
type: chain
transforms:
# Read text file into a collection of rows, each with one field, "line"
- type: ReadFromText
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
# Split line field in each row into list of words
- type: MapToFields
config:
language: python
fields:
words:
callable: |
import re
def my_mapping(row):
return re.findall(r"[A-Za-z\']+", row.line.lower())
# Explode each list of words into separate rows
- type: Explode
config:
fields: words
# Since each word is now distinct row, rename field to "word"
- type: MapToFields
config:
fields:
word: words
# Group by distinct words in the collection and add field "count" that
# contains number of instances, or count, for each word in the collection.
- type: Combine
config:
language: python
group_by: word
combine:
count:
value: word
fn: count
# Log out results
- type: LogForTesting
# Expected:
# Row(word='king', count=311)
# Row(word='lear', count=253)
# Row(word='dramatis', count=1)
# Row(word='personae', count=1)
# Row(word='of', count=483)
# Row(word='britain', count=2)
# Row(word='france', count=32)
# Row(word='duke', count=26)
# Row(word='burgundy', count=20)
# Row(word='cornwall', count=75)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce names
config:
elements:
- season: 'spring'
produce: 'π'
- season: 'spring'
produce: 'π₯'
- season: 'summer'
produce: 'π₯'
- season: 'fall'
produce: 'π₯'
- season: 'spring'
produce: 'π'
- season: 'winter'
produce: 'π'
- season: 'spring'
produce: 'π
'
- season: 'summer'
produce: 'π
'
- season: 'fall'
produce: 'π
'
- season: 'summer'
produce: 'π½'
- type: Combine
name: Shortest names per key
config:
language: python
group_by: season
combine:
produce: count
- type: LogForTesting
# Expected:
# Row(season='spring', produce=4)
# Row(season='summer', produce=3)
# Row(season='fall', produce=2)
# Row(season='winter', produce=1)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: 'π₯'
amount: 3
- produce: 'π₯'
amount: 2
- produce: 'π'
amount: 1
- produce: 'π
'
amount: 4
- produce: 'π
'
amount: 5
- produce: 'π
'
amount: 3
- type: Combine
name: Get max value per key
config:
language: python
group_by: produce
combine:
amount: max
- type: LogForTesting
# Expected:
# Row(produce='π₯', amount=3)
# Row(produce='π', amount=1)
# Row(produce='π
', amount=5)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: 'π₯'
amount: 3
- produce: 'π₯'
amount: 2
- produce: 'π'
amount: 1
- produce: 'π
'
amount: 4
- produce: 'π
'
amount: 5
- produce: 'π
'
amount: 3
- type: Combine
name: Get mean value per key
config:
language: python
group_by: produce
combine:
amount: mean
- type: LogForTesting
# Expected:
# Row(produce='π₯', amount=2.5)
# Row(produce='π', amount=1.0)
# Row(produce='π
', amount=4.0)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: 'π₯'
amount: 3
- produce: 'π₯'
amount: 2
- produce: 'π'
amount: 1
- produce: 'π
'
amount: 4
- produce: 'π
'
amount: 5
- produce: 'π
'
amount: 3
- type: Combine
name: Get min value per key
config:
language: python
group_by: produce
combine:
amount: min
- type: LogForTesting
# Expected:
# Row(produce='π₯', amount=2)
# Row(produce='π', amount=1)
# Row(produce='π
', amount=3)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- recipe: 'pie'
fruit: 'raspberry'
quantity: 1
unit_price: 3.50
- recipe: 'pie'
fruit: 'blackberry'
quantity: 1
unit_price: 4.00
- recipe: 'pie'
fruit: 'blueberry'
quantity: 1
unit_price: 2.00
- recipe: 'muffin'
fruit: 'blueberry'
quantity: 2
unit_price: 2.00
- recipe: 'muffin'
fruit: 'banana'
quantity: 3
unit_price: 1.00
# Simulates global GroupBy by creating global key
- type: MapToFields
name: Add global key
config:
append: true
language: python
fields:
global_key: '0'
- type: Combine
name: Get multiple aggregations per key
config:
language: python
group_by: global_key
combine:
min_price:
value: unit_price
fn: min
mean_price:
value: unit_price
fn: mean
max_price:
value: unit_price
fn: max
# Removes unwanted global key
- type: MapToFields
name: Remove global key
config:
fields:
min_price: min_price
mean_price: mean_price
max_price: max_price
- type: LogForTesting
# Expected:
# Row(min_price=1.0, mean_price=2.5, max_price=4.0)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- recipe: 'pie'
fruit: 'raspberry'
quantity: 1
unit_price: 3.50
- recipe: 'pie'
fruit: 'blackberry'
quantity: 1
unit_price: 4.00
- recipe: 'pie'
fruit: 'blueberry'
quantity: 1
unit_price: 2.00
- recipe: 'muffin'
fruit: 'blueberry'
quantity: 2
unit_price: 3.00
- recipe: 'muffin'
fruit: 'banana'
quantity: 3
unit_price: 1.00
- type: Combine
name: Sum values per key
config:
language: python
group_by: fruit
combine:
total_quantity:
value: quantity
fn: sum
mean_price:
value: unit_price
fn: mean
- type: LogForTesting
# Expected:
# Row(fruit='raspberry', total_quantity=1, mean_price=3.5)
# Row(fruit='blackberry', total_quantity=1, mean_price=4.0)
# Row(fruit='blueberry', total_quantity=3, mean_price=2.5)
# Row(fruit='banana', total_quantity=3, mean_price=1.0)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: 'π₯'
amount: 3
- produce: 'π₯'
amount: 2
- produce: 'π'
amount: 1
- produce: 'π
'
amount: 4
- produce: 'π
'
amount: 5
- produce: 'π
'
amount: 3
- type: Combine
name: Count elements per key
config:
language: python
group_by: produce
combine:
amount: sum
- type: LogForTesting
# Expected:
# Row(produce='π₯', amount=5)
# Row(produce='π', amount=1)
# Row(produce='π
', amount=12)
This is an example that illustrates how to use session windows and then extract windowing information for further processing.
pipeline:
type: chain
transforms:
# Create some fake data.
- type: Create
name: CreateVisits
config:
elements:
- user: alice
timestamp: 1
- user: alice
timestamp: 3
- user: bob
timestamp: 7
- user: bob
timestamp: 12
- user: bob
timestamp: 20
- user: alice
timestamp: 101
- user: alice
timestamp: 109
- user: alice
timestamp: 115
# Use the timestamp field as the element timestamp.
# (Typically this would be assigned by the source.)
- type: AssignTimestamps
config:
timestamp: timestamp
# Group the data by user for each session window count the number of events
# in each per session.
# See https://beam.apache.org/documentation/programming-guide/#session-windows
- type: Combine
name: SumVisitsPerUser
config:
language: python
group_by: user
combine:
visits:
value: user
fn: count
windowing:
type: sessions
gap: 10s
# Extract the implicit Beam windowing data (including what the final
# merged session values were) into explicit fields of our rows.
- type: ExtractWindowingInfo
config:
fields: [window_start, window_end, window_string]
# Drop "short" sessions (in this case, Alice's first two visits.)
- type: Filter
config:
language: python
keep: window_end - window_start > 15
# Only keep a couple of fields.
- type: MapToFields
config:
fields:
user: user
window_string: window_string
- type: LogForTesting
# Expected:
# Row(user='bob', window_string='[7.0, 30.0)')
# Row(user='alice', window_string='[101.0, 125.0)')
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- season: 'spring'
produce: 'π'
- season: 'spring'
produce: 'π₯'
- season: 'spring'
produce: 'π'
- season: 'spring'
produce: 'π
'
- season: 'summer'
produce: 'π₯'
- season: 'summer'
produce: 'π
'
- season: 'summer'
produce: 'π½'
- season: 'fall'
produce: 'π₯'
- season: 'fall'
produce: 'π
'
- season: 'winter'
produce: 'π'
- type: Combine
name: Group into batches
config:
language: python
group_by: season
combine:
produce:
value: produce
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 3
- type: LogForTesting
# Expected:
# Row(season='spring', produce=['π₯', 'π', 'π'])
# Row(season='summer', produce=['π₯', 'π
', 'π½'])
# Row(season='fall', produce=['π₯', 'π
'])
# Row(season='winter', produce=['π'])
This examples reads from a public file stored on Google Cloud. This
requires authenticating with Google Cloud, or setting the file in
ReadFromText
to a local file.
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.
The following example reads mock transaction data from resources/products.csv, performs a simple filter for "Electronics", then calculates the revenue and number of products sold for each product type.
pipeline:
transforms:
- type: ReadFromCsv
name: ReadInputFile
config:
path: gs://apache-beam-samples/beam-yaml-blog/products.csv
- type: Filter
name: FilterWithCategory
input: ReadInputFile
config:
language: python
keep: category == "Electronics"
- type: Combine
name: CountNumberSold
input: FilterWithCategory
config:
group_by: product_name
combine:
num_sold:
value: product_name
fn: count
total_revenue:
value: price
fn: sum
- type: WriteToCsv
name: WriteOutputFile
input: CountNumberSold
config:
path: output
# Expected:
# Row(product_name='Headphones', num_sold=2, total_revenue=119.98)
# Row(product_name='Monitor', num_sold=1, total_revenue=249.99)
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: 'π₯'
amount: 3
- produce: 'π₯'
amount: 2
- produce: 'π'
amount: 1
- produce: 'π
'
amount: 4
- produce: 'π
'
amount: 5
- produce: 'π
'
amount: 3
- type: Combine
config:
language: python
group_by: produce
combine:
biggest:
value: amount
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 2
- type: LogForTesting
# Expected:
# Row(produce='π₯', biggest=[3, 2])
# Row(produce='π', biggest=[1])
# Row(produce='π
', biggest=[5, 4])
pipeline:
type: chain
transforms:
- type: Create
name: Create produce
config:
elements:
- produce: 'π₯'
amount: 3
- produce: 'π₯'
amount: 2
- produce: 'π'
amount: 1
- produce: 'π
'
amount: 4
- produce: 'π
'
amount: 5
- produce: 'π
'
amount: 3
- type: Combine
name: Smallest N values per key
config:
language: python
group_by: produce
combine:
smallest:
value: amount
fn:
type: 'apache_beam.transforms.combiners.TopCombineFn'
config:
n: 2
reverse: true
- type: LogForTesting
# Expected:
# Row(produce='π₯', smallest=[2, 3])
# Row(produce='π', smallest=[1])
# Row(produce='π
', smallest=[3, 4])
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- produce: ['πStrawberry', 'π₯Potato']
season: 'spring'
- produce: ['π₯Carrot', 'πEggplant', 'π
Tomato']
season: 'summer'
- type: Explode
name: Flatten lists
config:
fields: [produce]
- type: LogForTesting
# Expected:
# Row(produce='πStrawberry', season='spring')
# Row(produce='π₯Carrot', season='summer')
# Row(produce='πEggplant', season='summer')
# Row(produce='π
Tomato', season='summer')
# Row(produce='π₯Potato', season='spring')
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- 'icon': 'π'
'name': 'Strawberry'
'duration': 'perennial'
- 'icon': 'π₯'
'name': 'Carrot'
'duration': 'biennial'
- 'icon': 'π'
'name': 'Eggplant'
'duration': 'perennial'
- 'icon': 'π
'
'name': 'Tomato'
'duration': 'annual'
- 'icon': 'π₯'
'name': 'Potato'
'duration': 'perennial'
- type: Filter
name: Filter perennials
config:
language: python
keep:
callable: "lambda plant: plant.duration == 'perennial'"
- type: LogForTesting
# Expected:
# Row(icon='π', name='Strawberry', duration='perennial')
# Row(icon='π', name='Eggplant', duration='perennial')
# Row(icon='π₯', name='Potato', duration='perennial')
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- '# πStrawberry'
- '# π₯Carrot'
- '# πEggplant'
- '# π
Tomato'
- '# π₯Potato'
- type: MapToFields
name: Strip header
config:
language: python
fields:
element:
callable: "lambda row: row.element.strip('# \\n')"
- type: LogForTesting
# Expected:
# Row(element='πStrawberry')
# Row(element='π₯Carrot')
# Row(element='πEggplant')
# Row(element='π
Tomato')
# Row(element='π₯Potato')
pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {sdk: MapReduce, year: 2004}
- {sdk: MillWheel, year: 2008}
- {sdk: Flume, year: 2010}
- {sdk: Dataflow, year: 2014}
- {sdk: Apache Beam, year: 2016}
- type: MapToFields
name: ToRoman
config:
language: python
fields:
tool_name: sdk
year:
callable: |
import roman
def convert(row):
return roman.toRoman(row.year)
dependencies:
- 'roman>=4.2'
- type: LogForTesting
# Expected:
# Row(tool_name='MapReduce', year='MMIV')
# Row(tool_name='MillWheel', year='MMVIII')
# Row(tool_name='Flume', year='MMX')
# Row(tool_name='Dataflow', year='MMXIV')
# Row(tool_name='Apache Beam', year='MMXVI')
pipeline:
type: chain
transforms:
- type: Create
config:
elements:
- {sdk: MapReduce, year: 2004}
- {sdk: MillWheel, year: 2008}
- {sdk: Flume, year: 2010}
- {sdk: Dataflow, year: 2014}
- {sdk: Apache Beam, year: 2016}
- type: MapToFields
name: ToRoman
config:
language: java
fields:
tool_name: sdk
year:
callable: |
import org.apache.beam.sdk.values.Row;
import java.util.function.Function;
import com.github.chaosfirebolt.converter.RomanInteger;
public class MyFunction implements Function<Row, String> {
public String apply(Row row) {
return RomanInteger.parse(
String.valueOf(row.getInt64("year"))).toString();
}
}
dependencies:
- 'com.github.chaosfirebolt.converter:roman-numeral-converter:2.1.0'
- type: LogForTesting
# Expected:
# Row(tool_name='MapReduce', year='MMIV')
# Row(tool_name='MillWheel', year='MMVIII')
# Row(tool_name='Flume', year='MMX')
# Row(tool_name='Dataflow', year='MMXIV')
# Row(tool_name='Apache Beam', year='MMXVI')
pipeline:
type: chain
transforms:
- type: Create
name: Gardening plants
config:
elements:
- '# πStrawberry'
- '# π₯Carrot'
- '# πEggplant'
- '# π
Tomato'
- '# π₯Potato'
- type: MapToFields
name: Strip header
config:
language: python
fields:
element: "element.strip('# \\n')"
# Transform-specific hint overrides top-level hint.
resource_hints:
min_ram: 3GB
- type: LogForTesting
# Top-level resource hint.
resource_hints:
min_ram: 1GB
# Expected:
# Row(element='πStrawberry')
# Row(element='π₯Carrot')
# Row(element='πEggplant')
# Row(element='π
Tomato')
# Row(element='π₯Potato')
This pipline creates a series of {plant: description} key pairs, matches all elements to a valid regex, filters out non-matching entries, then logs the output.
pipeline:
type: chain
transforms:
- type: Create
name: Garden plants
config:
elements:
- plant: 'π, Strawberry, perennial'
- plant: 'π₯, Carrot, biennial ignoring trailing words'
- plant: 'π, Eggplant, perennial'
- plant: 'π
, Tomato, annual'
- plant: 'π₯, Potato, perennial'
- plant: '# π, invalid, format'
- plant: 'invalid, π, format'
- type: MapToFields
name: Parse plants
config:
language: python
fields:
plant:
callable: |
import re
def regex_filter(row):
match = re.match("(?P<icon>[^\s,]+), *(\w+), *(\w+)", row.plant)
return match.group(0) if match else match
# Filters out None values produced by values that don't match regex
- type: Filter
config:
language: python
keep: plant
- type: LogForTesting
# Expected:
# Row(plant='π, Strawberry, perennial')
# Row(plant='π₯, Carrot, biennial')
# Row(plant='π, Eggplant, perennial')
# Row(plant='π
, Tomato, annual')
# Row(plant='π₯, Potato, perennial')
This examples reads from a public file stored on Google Cloud. This
requires authenticating with Google Cloud, or setting the file in
ReadFromText
to a local file.
To set up Application Default Credentials, see https://cloud.google.com/docs/authentication/external/set-up-adc.
The following example reads mock transaction data from resources/products.csv then performs a simple filter for "Electronics".
pipeline:
transforms:
- type: ReadFromCsv
name: ReadInputFile
config:
path: gs://apache-beam-samples/beam-yaml-blog/products.csv
- type: Filter
name: FilterWithCategory
input: ReadInputFile
config:
language: python
keep: category == "Electronics"
- type: WriteToCsv
name: WriteOutputFile
input: FilterWithCategory
config:
path: output
# Expected:
# Row(transaction_id='T0012', product_name='Headphones', category='Electronics', price=59.99)
# Row(transaction_id='T0104', product_name='Headphones', category='Electronics', price=59.99)
# Row(transaction_id='T0302', product_name='Monitor', category='Electronics', price=249.99)
The pipeline reads from Iceberg table 'db.users.NY' on GCS with Hadoop catalog configured. The table, if not exists already, can be created and populated using the iceberg_write.yaml pipeline.
Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name. If this example is run locally then replace '/path/to/service/account/key.json' with the correct path to your service account key .json file on your machine. Otherwise, if Dataflow runner is used then omit the 'config_properties' field.
pipeline:
type: chain
transforms:
- type: ReadFromIceberg
name: ReadFromAnIcebergTable
config:
table: "db.users.NY"
catalog_name: "hadoop_catalog"
catalog_properties:
type: "hadoop"
warehouse: "gs://MY-WAREHOUSE"
# Hadoop catalog config required to run pipeline locally
# Omit if running on Dataflow
config_properties:
"fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
"fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"
- type: LogForTesting
- type: WriteToCsv
name: OutputToCSVFile
config:
path: "gs://MY-WAREHOUSE/my-csv.csv"
# Expected:
# Row(id=3, name='Smith', email='[email protected]', zip='NY')
# Row(id=4, name='Beamberg', email='[email protected]', zip='NY')
The pipeline uses Dynamic destinations (see https://cloud.google.com/dataflow/docs/guides/managed-io#dynamic-destinations) to dynamically create and select a table destination based on field values in the incoming records.
Replace 'gs://MY-WAREHOUSE' with the correct GCS bucket name. If this example is run locally then replace '/path/to/service/account/key.json' with the correct path to your service account key .json file on your machine. Otherwise, if Dataflow runner is used then omit the 'config_properties' field.
pipeline:
type: chain
transforms:
- type: Create
name: CreateSampleData
config:
elements:
- { id: 1, name: "John", email: "[email protected]", zip: "WA" }
- { id: 2, name: "Jane", email: "[email protected]", zip: "CA" }
- { id: 3, name: "Smith", email: "[email protected]",zip: "NY"}
- { id: 4, name: "Beamberg", email: "[email protected]", zip: "NY" }
- type: LogForTesting
- type: WriteToIceberg
name: WriteToAnIcebergTable
config:
# Dynamic destinations
table: "db.users.{zip}"
catalog_name: "hadoop_catalog"
catalog_properties:
type: "hadoop"
warehouse: "gs://MY-WAREHOUSE"
# Hadoop catalog config required to run pipeline locally
# Omit if running on Dataflow
config_properties:
"fs.gs.auth.type": "SERVICE_ACCOUNT_JSON_KEYFILE"
"fs.gs.auth.service.account.json.keyfile": "/path/to/service/account/key.json"
# Expected:
# Row(id=1, name='John', email='[email protected]', zip='WA')
# Row(id=2, name='Jane', email='[email protected]', zip='CA')
# Row(id=3, name='Smith', email='[email protected]', zip='NY')
# Row(id=4, name='Beamberg', email='[email protected]', zip='NY')
A pipeline that both writes to and reads from the same Kafka topic.
pipeline:
transforms:
- type: ReadFromText
name: ReadFromGCS
config:
path: gs://dataflow-samples/shakespeare/kinglear.txt
- type: MapToFields
name: BuildKafkaRecords
input: ReadFromGCS
config:
language: python
fields:
value:
callable: |
def func(row):
return row.line.encode('utf-8')
output_type: bytes
- type: WriteToKafka
name: SendRecordsToKafka
input: BuildKafkaRecords
config:
format: "RAW"
topic: "{{ TOPIC }}"
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
producer_config_updates:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
password={{ PASSWORD }};"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "PLAIN"
- type: ReadFromKafka
name: ReadFromMyTopic
config:
format: "RAW"
topic: "{{ TOPIC }}"
bootstrap_servers: "{{ BOOTSTRAP_SERVERS }}"
auto_offset_reset_config: earliest
consumer_config:
sasl.jaas.config: "org.apache.kafka.common.security.plain.PlainLoginModule required \
username={{ USERNAME }} \
password={{ PASSWORD }};"
security.protocol: "SASL_PLAINTEXT"
sasl.mechanism: "PLAIN"
- type: MapToFields
name: ParseKafkaRecords
input: ReadFromMyTopic
config:
language: python
fields:
text:
callable: |
def func(row):
# Kafka RAW format reads messages as bytes
# in the 'payload' field of a Row
return row.payload.decode('utf-8')
- type: LogForTesting
input: ParseKafkaRecords
# Since the pipeline both writes to and reads from a Kafka topic, we expect
# the first pipeline component to write the rows containing the `value`
# field as bytes to Kafka, and the second pipeline component to read the byte
# messages from Kafka before parsing them as string in the new `text` field.
# Expected:
# Row(value=b'Fool\tThou shouldst not have been old till thou hadst')
# Row(value=b'\tbeen wise.')
# Row(value=b'KING LEAR\tNothing will come of nothing: speak again.')
# Row(value=b'\tNever, never, never, never, never!')
# Row(text='Fool\tThou shouldst not have been old till thou hadst')
# Row(text='\tbeen wise.')
# Row(text='KING LEAR\tNothing will come of nothing: speak again.')
# Row(text='\tNever, never, never, never, never!')
pipeline:
transforms:
# Reading data from a Spanner database. The table used here has the following columns:
# shipment_id (String), customer_id (String), shipment_date (String), shipment_cost (Float64), customer_name (String), customer_email (String)
# ReadFromSpanner transform is called using project_id, instance_id, database_id and a query
# A table with a list of columns can also be specified instead of a query
- type: ReadFromSpanner
name: ReadShipments
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
query: 'SELECT * FROM shipments'
# Filtering the data based on a specific condition
# Here, the condition is used to keep only the rows where the customer_id is 'C1'
- type: Filter
name: FilterShipments
input: ReadShipments
config:
language: python
keep: "customer_id == 'C1'"
# Mapping the data fields and applying transformations
# A new field 'shipment_cost_category' is added with a custom transformation
# A callable is defined to categorize shipment cost
- type: MapToFields
name: MapFieldsForSpanner
input: FilterShipments
config:
language: python
fields:
shipment_id: shipment_id
customer_id: customer_id
shipment_date: shipment_date
shipment_cost: shipment_cost
customer_name: customer_name
customer_email: customer_email
shipment_cost_category:
callable: |
def categorize_cost(row):
cost = float(row[3])
if cost < 50:
return 'Low Cost'
elif cost < 200:
return 'Medium Cost'
else:
return 'High Cost'
# Writing the transformed data to a CSV file
- type: WriteToCsv
name: WriteBig
input: MapFieldsForSpanner
config:
path: shipments.csv
# On executing the above pipeline, a new CSV file is created with the following records
# Expected:
# Row(shipment_id='S1', customer_id='C1', shipment_date='2023-05-01', shipment_cost=150.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Medium Cost')
# Row(shipment_id='S3', customer_id='C1', shipment_date='2023-05-10', shipment_cost=20.0, customer_name='Alice', customer_email='[email protected]', shipment_cost_category='Low Cost')
pipeline:
transforms:
# Step 1: Creating rows to be written to Spanner
# The element names correspond to the column names in the Spanner table
- type: Create
name: CreateRows
config:
elements:
- shipment_id: "S5"
customer_id: "C5"
shipment_date: "2023-05-09"
shipment_cost: 300.0
customer_name: "Erin"
customer_email: "[email protected]"
# Step 2: Writing the created rows to a Spanner database
# We require the project ID, instance ID, database ID and table ID to connect to Spanner
# Error handling can be specified optionally to ensure any failed operations aren't lost
# The failed data is passed on in the pipeline and can be handled
- type: WriteToSpanner
name: WriteSpanner
input: CreateRows
config:
project_id: 'apache-beam-testing'
instance_id: 'shipment-test'
database_id: 'shipment'
table_id: 'shipments'
error_handling:
output: my_error_output
# Step 3: Writing the failed records to a JSON file
- type: WriteToJson
input: WriteSpanner.my_error_output
config:
path: errors.json
# Expected:
# Row(shipment_id='S5', customer_id='C5', shipment_date='2023-05-09', shipment_cost=300.0, customer_name='Erin', customer_email='[email protected]')
pipeline:
type: chain
transforms:
# Step 1: Creating a collection of elements that needs
# to be enriched. Here we are simulating sales data
- type: Create
config:
elements:
- sale_id: 1
customer_id: 1
product_id: 1
quantity: 1
# Step 2: Enriching the data with Bigtable
# This specific bigtable stores product data in the below format
# product:product_id, product:product_name, product:product_stock
- type: Enrichment
config:
enrichment_handler: 'BigTable'
handler_config:
project_id: 'apache-beam-testing'
instance_id: 'beam-test'
table_id: 'bigtable-enrichment-test'
row_key: 'product_id'
timeout: 30
# Step 3: Logging for testing
# This is a simple way to view the enriched data
# We can also store it somewhere like a json file
- type: LogForTesting
options:
yaml_experimental_features: Enrichment
# Expected:
# Row(sale_id=1, customer_id=1, product_id=1, quantity=1, product={'product_id': '1', 'product_name': 'pixel 5', 'product_stock': '2'})
pipeline:
transforms:
# Step 1: Read orders details from Spanner
- type: ReadFromSpanner
name: ReadOrders
config:
project_id: 'apache-beam-testing'
instance_id: 'orders-test'
database_id: 'order-database'
query: 'SELECT customer_id, product_id, order_date, order_amount FROM orders'
# Step 2: Enrich order details with customers details from BigQuery
- type: Enrichment
name: Enriched
input: ReadOrders
config:
enrichment_handler: 'BigQuery'
handler_config:
project: "apache-beam-testing"
table_name: "apache-beam-testing.ALL_TEST.customers"
row_restriction_template: "customer_id = 1001 or customer_id = 1003"
fields: ["customer_id"]
# Step 3: Map enriched values to Beam schema
# TODO: This should be removed when schema'd enrichment is available
- type: MapToFields
name: MapEnrichedValues
input: Enriched
config:
language: python
fields:
customer_id:
callable: 'lambda x: x.customer_id'
output_type: integer
customer_name:
callable: 'lambda x: x.customer_name'
output_type: string
customer_email:
callable: 'lambda x: x.customer_email'
output_type: string
product_id:
callable: 'lambda x: x.product_id'
output_type: integer
order_date:
callable: 'lambda x: x.order_date'
output_type: string
order_amount:
callable: 'lambda x: x.order_amount'
output_type: integer
# Step 4: Filter orders with amount greater than 110
- type: Filter
name: FilterHighValueOrders
input: MapEnrichedValues
config:
keep: "order_amount > 110"
language: "python"
# Step 6: Write processed order to another spanner table
# Note: Make sure to replace $VARS with your values.
- type: WriteToSpanner
name: WriteProcessedOrders
input: FilterHighValueOrders
config:
project_id: '$PROJECT'
instance_id: '$INSTANCE'
database_id: '$DATABASE'
table_id: '$TABLE'
error_handling:
output: my_error_output
# Step 7: Handle write errors by writing to JSON
- type: WriteToJson
name: WriteErrorsToJson
input: WriteProcessedOrders.my_error_output
config:
path: 'errors.json'
options:
yaml_experimental_features: Enrichment
# Expected:
# Row(customer_id=1001, customer_name='Alice', customer_email='[email protected]', product_id=2001, order_date='24-03-24', order_amount=150)