DoFn is the core API in Apache Beam. The function to use to process each element is specified by a DoFn, primarily via its ProcessElement method.
According to JavaDoc:
Conceptually, when a ParDo transform is executed, the elements of the input PCollection are first divided up into some number of “bundles”. These are farmed off to distributed worker machines (or run locally, if using the DirectRunner). For each bundle of input elements processing proceeds as follows:
- If required, a fresh instance of the argument DoFn is created on a worker, and the DoFn.Setup method is called on this instance. This may be through deserialization or other means. A PipelineRunner may reuse DoFn instances for multiple bundles. A DoFn that has terminated abnormally (by throwing an Exception) will never be reused.
- The DoFn’s DoFn.StartBundle method, if provided, is called to initialize it.
- The DoFn’s DoFn.ProcessElement method is called on each of the input elements in the bundle.
- The DoFn’s DoFn.FinishBundle method, if provided, is called to complete its work. After DoFn.FinishBundle is called, the framework will not again invoke DoFn.ProcessElement or DoFn.FinishBundle until a new call to DoFn.StartBundle has occurred.
- If any of DoFn.Setup, DoFn.StartBundle, DoFn.ProcessElement or DoFn.FinishBundle methods throw an exception, the DoFn.Teardown method, if provided, will be called on the DoFn instance.
- If a runner will no longer use a DoFn, the DoFn.Teardown method, if provided, will be called on the discarded instance.
Each of the calls to any of the DoFn’s processing methods can produce zero or more output elements. All of the of output elements from all of the DoFn instances are included in the output PCollection.
Here’s one example implementation:
public class MapBuyerSellerLkpFn extends DoFn<MapCheckout, MapCheckout> {
/**
*
*/
private static final long serialVersionUID = -451161312788562675L;
@Setup
public void setup(){
BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "setup");
}
@Teardown
public void close(){
BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "close");
}
@StartBundle
public void startBundle(Context c){
BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "startBundle");
}
@FinishBundle
public void finishBundle(Context c){
BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "finishBundle");
}
@Override
public void prepareForProcessing() {
BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "prepareForProcessing");
}
@ProcessElement
public void processElement(ProcessContext c) {
BeamComponentTracker.getInstance().reportUsage("MapBuyerSellerLkpFn", "processElement");
}
}
I add a tracker BeamComponentTracker to count how many times each function is invoked, which is shown as below to verify the back-end execution chain.
MapBuyerSellerLkpFn":{"prepareForProcessing":10,"setup":1,"startBundle":10,"finishBundle":10,"processElement":10}
MapBuyerSellerLkpFn":{"prepareForProcessing":15,"setup":1,"startBundle":15,"finishBundle":15,"processElement":15}
MapBuyerSellerLkpFn":{"prepareForProcessing":20,"setup":1,"startBundle":20,"finishBundle":20,"processElement":20}
MapBuyerSellerLkpFn":{"prepareForProcessing":27,"setup":1,"startBundle":27,"finishBundle":27,"processElement":27}
MapBuyerSellerLkpFn":{"prepareForProcessing":35,"setup":1,"startBundle":35,"finishBundle":35,"processElement":35}
MapBuyerSellerLkpFn":{"prepareForProcessing":46,"setup":1,"startBundle":46,"finishBundle":46,"processElement":46}
Great post. Do you anywhere talk about BeamComponentTracker and its usage? Couldn’t find on internet.