How to do Apache Beam Transform with MongoDB in Python

Apache beam is the best way to automate the Reading-Transforming-Writing process to make a robust pipeline.

For my work project, I needed to do read data from a collection and write it to another collection with a transform. It is easy as there is an official MongoDB IO reader and writer module. You can check out the mongodbiomodule here.

This is an example usage of how you read data from MongoDB.

pipeline | ReadFromMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='input')

This is an example usage of how you write data to MongoDB.

pipeline | WriteToMongoDB(uri='mongodb://localhost:27017',
db='testdb',
coll='output',
batch_size=10)

Note: When writing and reading data from Mongodb, you will encounter a saying something like ‘this is experimental’. It means there are no turning back, no backward compatibility guarantees from this action.

To read from MongoDB Atlas, set bucket_auto option to True to enable @bucketAuto MongoDB aggregation. Usage:

pipeline | ReadFromMongoDB(uri='mongodb+srv://user:pwd@cluster0.mongodb.net',
db='testdb',
coll='input',
bucket_auto=True)

Doing a Transform with the Data

In python, you can use Apache Beam SDK for Python and its key concepts to do a basic transform. Check out the Ptransformmodule here.

PCollectionrepresents a collection of data.

PTransform represents a computation that transforms PCollections. You can chain transforms together to create a pipeline that successively modifies input data.

Simple Transforms

Usually, for simple transforms, use a ParDo transform. Check out core beam transforms here.

Pardoconsiders each element in the input PCollection, performs actions and emits zero, one, or multiple elements to an output PCollection.

Example usage:

# The DoFn to perform on each element in the input PCollection
class
ComputeWordLengthFn(beam.DoFn):
def process(self, element):
return [len(element)]

Composite Transforms

Sometimes you need to do multiple simpler transforms (such as more than one ParDo, Combine, GroupByKey) when applying a transform to input data. These transforms are called composite transforms. In this case, you need to use Ptransform module. Here is an example of usage from the docs.

# The CountWords Composite Transform inside the WordCount pipeline.
class CountWords(beam.PTransform):
def expand(self, pcoll):
return (
pcoll
# Convert lines of text into individual words.
| 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
# Count the number of times each word occurs.
| beam.combiners.Count.PerElement()
# Format each word and count into a printable string.
| 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

A PTransform derived class needs to define the expand() method that describes how one or more PValuesare created by the transform.

Now, since you know how to read data, apply a transform and write the output; you can create a pipeline.

Let’s say you want to read the data from the database, trim a field and write to another collection or the same collection using Apache Beam.

In the case of a Trim transform, here is how you can achieve a simple transformation with beam.DoFn class.

class TrimTransform(beam.DoFn):
def process(self, element):
element = element.strip()
yield element

Inside your DoFn subclass, you define a process method where you provide the actual transform logic. The Beam SDKs handle extracting the elements from the input collection so you get the extracted element as a parameter.

Note: Once you output a value using yield or return, you should not modify that value in any way.

Example of Trim class which contained the pipeline:

class Trim():
def run(self):
database = self.database
collection = self.collection
output_collection = self.output_collection
field = self.operation['field'] # In my case this is the field I am going to trim
# Define the transform class you will use in the pipeline
class TrimTransform(beam.DoFn):
def process(self, element):

if field in element:
element[field] = element[field].strip()
yield element
# Define pipeline options
options = PipelineOptions()
options.view_as(StandardOptions).streaming = False
# Create pipeline
pipeline = beam.Pipeline(options=options)
(
pipeline | 'Read data' >> beam.io.ReadFromMongoDB(uri='mongodb://127.0.0.1', db=database,coll=collection)
| 'Apply Transform' >> beam.ParDo(TrimTransform())| 'Save data' >> beam.io.WriteToMongoDB(uri='mongodb://127.0.0.1', db=database, coll=output_collection))result = pipeline.run()

Conclusion

For simple transformations like trim or filter, you can use Mongodb aggregations. However, for complex transformations, using Apache Beam is a better choice. The Apache Beam SDK for Python provides access to Apache Beam classes and modules from the Python programming language. That’s why you can easily create pipelines, read from, or write to external sources with Apache Beam. Of course, there are a lot more capabilities you can do Apache Beam. For the next steps, you can explore windowing; grouping multiple elements, data encoding, type safety that Apache Beam provides out of the box.

Hope you enjoyed it!

Software Engineer at 42 Silicon Valley | Technical Writer at ContentLab.io

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store