Best Practice - Pentaho Big Data On-Cluster Processing

Your feedback is important to us!  Email us how we can improve these documents.

On-Cluster Processing Introduction

This document covers best practices to push ETL processes to Hadoop based implementations. When working on big data implementations, it is important we have in mind concepts like data locality, distributed processing, resource assignment isolation, and processing queue among other topics.  

Pentaho Data Integration tool includes multiple functions to push work to be done on the cluster using distributed processing and data locality acknowledgment. However, it is important to understand what are the use cases and situations when the use of one method or another is recommended or not.

The items covered in this document are

  • Pentaho Deployment Architecture Recommendations
  • Current On-Cluster processing options including Advantages and Disadvantages
  • Processing Big Data Files strategies and recommendations
  • Processing recommendations

Pentaho Deployment Architecture Recommendations

Pentaho Data Integration server and client tools require full access to the cluster in order to submit activities to the cluster. In general PDI server needs to communicate with HDFS Name Node and Data Nodes, Resource Manager, Hive Server, Impala Server and Oozie based on the activities to be performed by the ETL designed process. For this reason, Data Integration Server (DI) is recommended to be installed on an EDGE node of the cluster and it is recommended to be deployed in HA mode.
Please read “Pentaho DI Server and High Availability”.

 

 

 

PDI Clients connect to the DI Server repository via the Load Balancer (Use of SSL is recommended). In the Diagram above it is important to highlight that there are PDI Clients installed in edge nodes as well that there are PDI Clients outside of the Cluster Nodes, however with Network access to the Clusters as well.

Please refer to “Big Data Integrated Authentication Best Practices” to understand how and why there are PDI Clients as Edge nodes and in which cases PDI Clients can be used by end-users/developers from outside the cluster.

 

Hadoop/PDI Integration

Pentaho MapReduce relies on Hadoop's Distributed Cache to distribute PDI's libraries and plugins across the cluster.  The first time you run a PDI application on a Hadoop cluster, PDI will upload all of its' dependent JARS to HDFS in the hdfs://your.hdfshost.com:port/opt/pentaho/mapreduce/[pdi-version-hadoop-version] target directory.  If the target HDFS directory already exists, then it will not upload anything.

This target upload HDFS directory is set in the following plugin.properties files with the following parameter: pmr.kettle.dfs.install.dir. The plugin.properties file for individual Pentaho application are:

  • DI Server: [PENTAHO_INSTALL_HOME]/server/data-integration-server/pentaho-solutions/system/kettle/plugins/pentaho-big-data-plugin/plugin.properties
  • BI Server: [PENTAHO_INSTALL_HOME]/server/biserver-ee/pentaho-solutions/system/kettle/plugins/pentaho-big-data-plugin/plugin.properties
  • Report Designer: [PENTAHO_INSTALL_HOME]/design-tools/report-designer/plugins/pentaho-big-data-plugin/plugin.properties
  • Spoon/Spoon/Pan/Kitchen: [PENTAHO_INSTALL_HOME]/design-tools/data-integration/plugins/pentaho-big-data-plugin/plugin.properties
  • Metadata Editor: [PENTAHO_INSTALL_HOME]/design-tools/metadata-editor/plugins/pentaho-big-data-plugin/plugin.properties

When setting up HDFS, be sure that that you give PDI user permission to write to the root HDFS directory.  Alternatively, you can manually create the base directory (/opt/pentaho) and give the PDI user write permission to that directory or change the pmr.kettle.dfs.install.dir parameter to a location with write permission. See the plugin.properties file for more details about the directory structure.

PDI also allows you to have multiple versions of PDI to run on the same Hadoop Cluster. This is helpful if you need to have two clients using different versions of PDI running on the same Hadoop cluster for testing PDI upgrades.

If you change PDI's dependent JARs by using a patching process or you drop in JAR replacements where the Hadoop version or the PDI version does not change, then be sure to delete the pmr.kettle.dfs.install.dir/[pdi-version-hadoop-version] directory in HDFS so the new JARs will be uploaded to HDFS during the next run of a Pentaho MR.

 

 

Current On-Cluster processing options including Advantages and Disadvantages

 

Orchestration

Pentaho Data Integration (PDI) offers multiple ways to push work to the Cluster, some of this options are agnostic of the tool, where PDI works as an orchestrator, calling Cluster processes and waiting for the response/end of the process and coordinating next steps based in the SUCCESS or FAIL result; This method does not have control or influence on how the cluster executes the called process, however, pentaho provides the cluster with user’s credentials for any resource isolation configuration that is in place for the calling user.

 

Sample Image – Orchestration of Activities [SUCCESS] [FAIL]

 

The Orchestration process runs on the server/node where PDI process was started. In other words, if ETL Job process was started in the DI Server, then even if the independent STEP actions are running on the cluster, the orchestration is happening from the DI Server. Moreover, if the Job is executed from an end-user/developer user machine, then this machine is the one orchestrating all the activities; any shutdown/kill of the application on the desktop/server machine will stop the JOB submitted. For this reason, it is recommended that long processing tasks are pushed to be executed by the DI Server.

 

Running Pentaho Transformations on the cluster

Pentaho Data Integration includes in its tool set the possibility to write PDI Transformations that need to be executed and distributed over Hadoop nodes.

 

Pentaho Map Reduce

Pentaho Map Reduce job step wraps PDI transformations following Hadoop Map Reduce architecture. The transformation are classified in Mapper Transformation, and Reducer Transformations.

The following examples shoe Standard and more common usage of the Map Reduce functionality

 

 

 

Important Recommendations with Map Reduce

In General
  • Pentaho MapReduce Job Step
    1. In the cluster tab, you typically want to check the Enable Blocking options if you want to wait for the MapReduce processing to complete. If you do not enable blocking, then it will simply start the PMR job in Hadoop and return without waiting for it to complete. Only time you don't want this checked is if you want fire off the PMR job and forget about it.
    2. Even if you specify the number of Mappers you want to run, Hadoop may ignore it. However, the number of reducers is followed. For details see: http://wiki.apache.org/hadoop/HowManyMapsAndReduces
    3. If you set the number of reducers to zero, then Hadoop will not run the shuffle/sort phase and the unsorted mapper output will simply be moved to the configured output dir.
    4. If you do not provide a reducer implementation and the number of reducers is not set or it is greater than zero, then shuffle/sort phase will be run using a default reducer (known as the Identify Reducer). This will cause the mapper output to be shuffled/sorted and output to the configured dir.
  • Hadoop Input Step: If you want to process large files of data that are on HDFS, then write Map/Reduce apps. Don't be tempted to use the Hadoop File Input step to pull the data off HDFS and process it PDI. The main reasons to use Hadoop file input step is to:
    1. Pull data off HDFS and publish to an external system (database)
    2. Data volume you are pulling from HDFS is small to medium size

 

Extending MapReduce concepts with Pentaho implementation architecture

Pentaho’s ETL core engine is distributed and used in the implementation of Hadoop Map reduce functionality. This provides end-users with all the Pentaho toolset available while running either a mapper or combiner/reducer transformation. To explain further, Hadoop framework will manage all the data locality, node assignment, and even READ input file and WRITE output file. And Pentaho engine can collect the info READ by Hadoop, use any of the existing steps while running on a mapper or reducer, and deliver the info to the Hadoop writer.

 

There is no limitation or rule that blocks a mapper or a reducer, users/developers can add full BIG FILES reads, Database Connections to external sources, Calls to WebServices, among other things; This is why the it is important to read all the Tips included in this documentation to avoid wrong usage or bad practices of the use of the tool.

On the other hand, knowing the limitations and what are the best DO and NOT DO, there are multiple interesting paths that can be achieved using MapReduce framework as a wrapper like

  • Create a process with Mapper Only: By setting the reducer to 0, you can write a mapper only transformation that can Read an input file do Rows filtering/transformations of lookups , and WRITE the result. Keep in mind there can be multiple mappers reading parts of the input file so actions like sorting, or any task that suggest you have the full file is not a good approach.
  • Mapper transformation are required to have an Hadoop INPUT and a Hadoop OUTPUT, but there is no obligation to use them; some short usecases:
    • INPUT can be a LIST of actions to be performed like WebService calls, emails, etc…, and OUTPUT can be EMPTY
    • INPUT can be EMPTY or 1 LINE file can actually produce millions of rows for the OUTPUT, where real INPUT can be anything, from self-Generated to external Database Table.
      (to do this, set Number of mappers to 1 and reducer to 0 and use apache.hadoop.mapred.lib.NLineInputFormat as InputFormat as it reads line per line and does not use data locality for Mapper executing node assignment)

 

Connecting to Databases

In general, do NOT connect to traditional databases in your mappers or reducers. You can have 1000s of mappers and/or reducers running in parallel. Having all of them hit your DB (or any other resource) at the same time can overload the DB and network. There are a limited number of connections that a DB can handle. Making DB connections in a mapper that is instantiated 1000's of times will exceed the max number of connections that a DB can handle.

If you need to perform lookup as part of your MapReduce then use alternative technologies such as distribute key/value stores (HBase, MongoDB, Cassandra, etc ...) or one of the options in the section titled Joins and Data Lookup.

Processing Big XML files

XML files have a few challenges in Big Data world, however there are multiple interesting solutions in place to do this

Please read: Processing XML and JSON in Hadoop using PDI

As described in article above, there are multiple options. Apache Mahout Project for example introduces an InputFormat that can be easily included in Pentaho libraries for use in MapReduce implementations

Please refer to this URL that explains how to build a custom InputFormat and/or OutputFormat

http://wiki.pentaho.com/display/BAD/Using+a+Custom+Input+or+Output+Format+in+Pentaho+MapReduce

Apache Mahout Project, please refer to section/code

https://github.com/apache/mahout/blob/master/integration/src/main/java/org/apache/mahout/text/wikipedia/XmlInputFormat.java

Explanation of Mahout Approach:

XML files, by definition, non splittable, and therefore not suitable for MapReduce TextInputFormat format. Apache's Mahout Project has two classes capable of processing XML files as input formats for MapReduce. The one we use is org.apache.mahout.classifier.bayes.XmlInputFormat. This InputFormat requires to be specified an open tag and close tag of interest are defined as xmlinput.start and xmlinput.end in the Pentaho MapReduce “User Defined” section .

XmlInputFormat looks for complete XML sections with start and end tags, this means that in case a file is splitted and the any of the mappers is reading a section/split of the file where the XML was partial part of the XML, XmlInputFormat will skip those records until new START tag.

 

The result of this InputFormat, is a Key/Value => Key: file Offset and Value: XML section. That is now easy to process with the Pentaho “Get data from XML” step using XML source from field

Read: http://wiki.pentaho.com/display/EAI/Get+Data+From+XML

 

Joins and Data Lookup

In general, you should not do database lookups or PDI joins inside a Mapper/Reducer. When doing lookups for data that resides inside a Hadoop cluster, it is best to have both the data sets you want to join reside in HDFS.  You have several options when using PDI and Hadoop cluster technology to join data sets inside the Hadoop cluster.  The specific solution is dependent of the amount of data that is in the data sets:

  1. PDI's Hadoop File Input with Stream Value Lookup step
    1. This solution is best when the lookup data is very small (~10s of KB of rows)
    2. Put lookup file in Hadoop's Distributed Cache
  2. PDI's HBase Input Step with Stream Value Lookup
    1. This solution is best when the lookup data is small (~100s of KB of rows)
    2. This will perform full range scans of the HBase tables, so if the HBase table is large this will be slow.
    3. HBase configuration also impacts the performance.
  3. Directly use HBase API with PDI's User Defined Java Class
    1. This solution is best when the lookup data is large and is in HBase tables (around 1 MB or less of rows, or you are simply performing single item lookups against large HBase table)
    2. You will need to use the HBase Java API in the Pentaho UDJC. You must be familiar with the HBase API features to get the best performance (Use HBasePools as static variable and try to apply as many filters in a single request to minimize the calls out to HBase).
    3. HBase configuration also impacts the performance.
  4. MapReduce Joins
    1. This solution is best when both the number of input data rows and the total number lookup data is large.
    2. Recommendation is to split the issue in two, where the JOIN is done at HIVE level, storing the result in a staging location, and rest of processing or other lookups to be done in MapReduce.
      NOTE: Do not put all the logic in the SQL Join, this is normally a big maintenance point if we add too much logic here.
    3. In case MapReduce Join is preferred, please refer to the following items
      1. You can refer to the following link as a tutorial on how to do joins of large data sets using MapReduce: 
        http://chamibuddhika.wordpress.com/2012/02/26/joins-with-map-reduce/
        Although the Hadoop Java API this link refers to is not available in Pentaho MR, it is important to understand the general method of how to do joins with the MapReduce paradigm.
      2. An implementation of doing this type of Cartesian join can be found here: http://hgovind.wordpress.com/2013/04/25/how-to-join-big-data-sets-in-using-mapreduce-and-pdi/

 

Note that the data row volumes are general guidelines and performance is also affected by the row sizes.

Write to Multiple File Destinations from MapReduce

PDI does not support Hadoop's native feature that provides an output formatter that allows you to write MapReduce output to multiple destinations.

The following are suggestion on how to write output data to multiple destinations within a MapReduce:

  1. Write multiple map/reduces to create multiple files in HDFS. This is probably the easiest way, but would be slowest because you would need to go through the entire data set multiple times.
  2. Within the Mapper/Reducer, create multiple files in HDFS and write data out to these files as needed using Pentaho Hadoop Output step. Each instance of mapper/reducer must make sure that the filename you create in HDFS is unique in the HDFS name space.

 

Compression

Since Hadoop apps deal with large data sets, you should enable compression for various stages of MapReduce. Although some CPU is used for compression, Hadoop apps will gain the following benefits by using compression:

  • MapReduce is very disk IO intensive. Using compression significantly reduces the amount of data that is stored in HDFS and intermediate processing files.
  • MapReduce is network intensive. You can improve performance by reducing the amount of data that needs to be replicated by HDFS and by minimizing the amount of intermediate files sent over the network during MapReduce.

 

Suggested compression (from most effective to least effective):

  1. Use Snappy compression codecs in conjunction with container file formats that supports splitting. Snappy by itself does not support splitting. So large input files should not be just compressed with Snappy. Use it with container file formats that support both compression and splitting (such as Sequence File, RCFile, or AVRO).
  2. If input files are compressed and do not use a container file format, make sure you use compression codec that supports splitting. BZIP2 supports splitting but is slow. Hadoop also supports indexed LZO which is relatively fast and supports splitting. However, you will need to separately install the appropriate libraries on your Hadoop cluster because the indexed LZO implementation is GPL-licensed (see http://www.devops-blog.net/hadoop/compiling-and-installing-hadoop-lzo-compression-support-module).
  3. If the compression algorithm you choose for your input files does not support splitting, then you should pre-process the file by chunking it into smaller sizes (preferably <= to HDFS block size). If you do not, then you will effectively get a single mapper to process the entire file.
  4. Store the files uncompressed.

 

The following Hadoop parameters only compress the intermediate mapper output:

  • compress.map.output
  • map.output.compression.codec

 

The final output of MapReduce job is controlled by the following Hadoop parameters:

  • output.compress
  • output.compression.codec
  • output.compression.type

 

Snappy Compression Setup

CDH4+ and many of the newer distributions ship Snappy compression as part of their distribution. Hadoop can be configured to use Snappy compression for MapReduce output. In general it is a good idea to have intermediate files compressed using Snappy. This will have the following benefit:

  • Reduce temp storage requirements
  • Significantly reduce network traffic during shuffle/sort phase of MapReduce processing.
  • Good balance of compression/decompression and speed when compared to other compression technologies.

 

To Snappy compress intermediate files generated by the mapper, the following should be set (in PDI, they are set in the User Defined tab of PMR step):

mapred.compress.map.output = true
mapred.map.output.compression.codec = org.apache.hadoop.io.compress.SnappyCodec

 

In addition, to compress the final output of the MapReduce using Snappy compression, the following should als be set (again, in PDI, they are set in the User Defined tab of PMR step):

mapred.output.compress = true
mapred.output.compression.codec = org.apache.hadoop.io.compress.SnappyCodec
mapred.output.compression.type = block

 

For client tools such as Spoon to show these options, for various Input/Output steps the Snappy libraries must be installed on the client machine and have Spoon include them as part of its configuration. Steps are detailed here:

http://wiki.pentaho.com/display/BAD/Extracting+Data+from+Snappy+Compressed+Files

 

YARN Cluster

Pentaho Data Integration Yarn Cluster, uses YARN resource manager assignment and resource isolation functions to start multiple CARTE servers inside a Hadoop cluster. It is important to understand once the Carte Servers are up and running the behavior and use/cases are exactly the same as any other manually deployed Kettle Carte Cluster. Furthermore, Carte servers can READ/WRITE from within or outside of the Hadoop cluster to from/to files, relational databases, and others.

Please read:

 

YARN Cluster best use cases in the big data world are for

  • Data Ingestion processes
  • Real-Time on going transformation
  • Assignment of full Job/Transformation to be executed in one Carte server
  • Use of Hadoop YARN as an escalation platform for PDI Transformations when data is not stored in Hadoop

For all other use cases where Hadoop data needs to be processed, it is recommended to use Hadoop cluster data locality aware processes, like Map Reduce (see above)

 

 

Important Recommendations with Yarn Cluster

User impersonation limitation

Pentaho Yarn Cluster currently runs inside YARN container and uses SHIM configuration to generate a Kerberos Ticket, therefore, it cannot run as calling user. Kerberos Authentication section in Big Data Integrated Authentication Best Practices for more details.

Appendix

 

Custom JARs

In addition to PDI's dependent JARS, Many PDI applications require third-party Java libraries to perform tasks within PDI jobs and transformations. These libraries must be included in the class path of Hadoop mappers and reducers so PDI applications can use them in the Hadoop cluster. One solution for this is to use the features detailed here (see pmr.kettle.additional.plugins configuration): http://wiki.pentaho.com/display/BAD/Pentaho+MapReduce

However, many companies lock down the installs of Pentaho software and do not give users the ability to edit/add files in the Pentaho solutions. In addition, they have multiple people/groups writing apps and each app has it's own set of libs and versions of libs. In this type of environment the best way to add custom libs is to copy all dependent JARs to Hadoop’s Distributed Cache and add the following parameters to the Pentaho Map Reduce job step (in the User Defined tab):

  • mapred.cache.files
  • mapred.job.classpath.files

Some examples here:

 

Scripting Steps

PDI offers three steps to write your own processing code. In order of the slowest to the fastest performing options:

  1. JavaScript
  2. Java Expression
  3. User Defined Java Class (UDJC)

For bet performance, it is recommended that all processing that must be done for each row should use the UDJC instead of JavaScript or Java Expression. UDJC can perform up to 10x faster than JavaScript.

 

PDI Arguments/Variables/Parameters

Do not hardcode variable and environment settings such as file paths and Hadoop cluster configuration. Use variables for such settings. This will allow you to quickly move from different environments without having to actually change the KJB/KTR. For basics info, see: http://infocenter.pentaho.com/help/topic/pdi_user_guide/topic_vars_params_args.html.

It is recommended, that you store variables at the following locations:

  1. properties - Store global variables that are common for all applications (KTR/KJB) and do not change between runs. Typically defines your execution environment through various variables (such as dev, QA, production). For examples, settings such as Hadoop cluster configuration and database configurations.
  2. Transformation variable (right click on canvas of a job or transformation and select Job or Transformation settings) - The scope of variable is at job or transformation level. These can be set either at command line or at Spoon execute GUI. They can also be passed from Jobs to Transformations.
  3. Create a config file (i.e. config.properties) - Put parameters that are application specific in this file and read them in as the first step of the job using the Set Variable entry.

 

Variables can be passed from a Pentaho job to a transformations that functions as a mapper or reducer. However, the following limitations should be noted:

  1. Variables cannot be set in a mapper/reducer and be passed back to a parent jobs.
  2. Variables cannot be set in a mapper and passed to a reducer.

 

General PDI Best Practices

In addition to the best practices detailed in the sections above, the following best practices should be noted:

  1. As with any large-scale data processing, always test your application with similar data sizes sets that you expect in your production environment. This includes data such as raw input data, lookup/join data, and output data.
  2. In addition to tuning PDI applications, performance is dependent on Hadoop technology configurations. You must tune the Hadoop cluster and any related technologies to fit your load and application scenarios.
  3. Do not install Pentaho products on NFS mounts. It will work for small loads, but it has known performance issues under heavy loads. Pentaho engineering does not test its' products with NFS technology and NFS is officially NOT supported.
  4. If you are expecting some steps to take a long time complete then you can parallelize them by starting multiple instance of the step. However, be sure you do not start multiple instances of steps that write to the same resource (such as file I/O). This type of parallelizing may affect resources that are being parallelized (such disk, CPU and memory), so you must be aware of the extra resources that will be used when parallelizing a step adjust resource requirements and step parallelization accordingly.
  5. Do not perform row level logging in production environments that use large data sets. This can significantly degrade performance.
  6. Sort Rows: When working with large data sets that are in HDFS, do not sort rows on the DI Server. Write MapReduce application to perform the sorting (Use Hadoop's shuffle/sort abilities).
  7. Do not assume that all values will flow through the same mapper. The mapper will receive a random set of values.
  8. Do code the reducer knowing that all values for the same key will flow through the same reducer. Reducer logic should presume operations for one key at a time.
  9. Do not expect a reducer to sum or count values across keys. A reducer gets all data associated with a single key.
  10. Do not expect a combiner to always run. Combiners may or may not run even if you specify one. Hadoop decides when to run Combiners.
  11. For Linux systems, increase the ulimit of system and user that PDI apps run to around 16k. If using a carte cluster that uses Repo, then you may need much more depending on how many simultaneous jobs you are running.
  12. If using Carte cluster and you have 1000s of KTR/KJB and you will be running 100+ jobs/transformation simultaneously, than do not use the Pentaho Repository. Communications between the Carte server and the Repo is very chatty and can cause performance issues under heavy loads. For this type of loads, it is recommended you use files only.
  13. Large row sizes: If the size of your rows are large (100s KB+) then you should decrease the number of simultaneous rows your transformation processes and/or increase the JVM heap space (and tune the JVM GC).
  14. When replacing JARs in a Pentaho directory, do not just rename the old JAR. You MUST delete/move it outside of any directory that is used by Pentaho to load classes. Pentaho products will load all files in directories that it reads for JARs, not just JARs.
  15. Pentaho does not have native support for performing MapReduce directly on MongoDB in a MongoDB cluster.

Sub-transformation are not supported in transformation that are used as mappers/reducers/combiners. When Spoon or DI Server creates a Hadoop job, it passes the entire transformation's XML in the Hadoop job definition. It does not recursively add all sub-transformations to the definitions. This will be added in future release of PDI.

MapReduce JVM Settings

By default, Hadoop will create a new JVM for each instance of a mapper and reducer.  You can set various JVM options (such as -Xms and –Xmx) using the following Hadoop parameters:

  • child.java.opts – applies to all task JVMs  (mappers and reducers)
  • map.child.java.opts – if present, then it is used over mapred.child.java.opts for mapper
  • reduce.child.java.opts – if present, then it is used over mapred.child.java.opts for reducer

These can all be set cluster wide in mapred-site.xml. In addition, if you want to override the system setting on a per job basis, then you can also set them in PMR under the User Defined tab.

 

Hadoop Error Handling

In addition to PDI's step level error handling, a transformation can also enable Hadoop's skip bad records feature. You can set the skip bad records options in the User Defined tab in the PMR job entry. Here's a mapping of API call and properties that can be set:

  • setAttemptsToStartSkipping() -> mapred.skip.attempts.to.start.skipping
  • setAutoIncrMapperProcCount() -> mapred.skip.map.auto.incr.proc.count
  • setAutoIncrReducerProcCount() -> mapred.skip.reduce.auto.incr.proc.count
  • setSkipOutputPath() -> mapred.skip.out.dir
  • setMapperMaxSkipRecords() -> mapred.skip.map.max.skip.records
  • setReducerMaxSkipGroups() -> mapred.skip.reduce.max.skip.groups

 

Pentaho Application Monitoring

Pentaho provides real-time performance statistic by using the "Output steps metrics".  This can be used to extract step level statistics. However, if using this in a transformation that works within Hadoop Mapper/Reducer/Combiner via the Pentaho MapReduce job step, you will only get statistics relating to a specific Mapper/Reducer/Combiner process instance.  You will not get statistics that are aggregated for all the Mapper/Combiner/Reducer instances.  You will need to have mapper or reducer store instance data to an external data store (HBase, DB, file, etc...) and aggregate this data to get a metrics for the entire Hadoop job.

 

PDI Operations Mart

For transformations that are used as mapper/reducers and Hadoop jobs that have more than a few mapper/reducer instances, do not collect the operational data using POM.  When transformations are configured to use POM, they will open a connection to the POM DB to add a record for collected stats. When this is done in a Hadoop cluster, you could have 1000's of mappers/reducers opening connection to the DB and overwhelming the DB with too many connections. An alternate way to collect stats is the following:

  1. In the transformations you want to collect stats for, use the Output Steps Metrics step. This step will generate a row of stats every second.
  2. Write the output of the Output Steps Metrics to a file in HDFS. Be sure the file name is unique for each mapper/reducer instance. Alternatively you can out the data to HBase, MongoDB or Cassandra (NOTE: Output Steps Metrics out to HBase has issues: http://jira.pentaho.com/browse/PDI-10269 ).
  3. Then write a Pentaho MR app that will aggregate the data and publish the stats you want. Alternatively, take the data and populate the POM DB.

Debugging Pentaho MapReduce

For debugging purposes, Pentaho is working on a Spoon feature to allow you to view Hadoop counters of a PMR application. Stay tuned for a new feature called Vizor (http://wiki.pentaho.com/display/BAD/Pentaho+Map+Reduce+Vizor )

 

Processing XML and JSON in Hadoop using PDI

(Extract from Hemal Govind post at https://hgovind.wordpress.com/2014/11/20/processing-xml-and-json-in-hadoop-using-pdi/ )

Hadoop can efficiently process large amounts of data by splitting large files into smaller chunks (typically 128 MB) and process each chunk in parallel.  The “splittability” of a file is central to the efficient handling of the file by MapReduce. A file format is splittable if you can start processing records anywhere in the file.  A splittable file cannot have a file structure that requires you to start reading from the first line of a file.  Therefore XML and JSON documents that span multiple lines are not splittable. You cannot just start processing an XML document in the middle because you need the starting and ending tags of the XML hierarchy. There are three possible ways to handle these types of files in Hadoop:

  1. Store the XML in HBase an column. The process the HBase table, extract the column that has the XML, and process the the XML as needed in a Mapper. The main downside of this solution is that it forces you to use HBase. If you decide to use this method, you can use PDI’s HBase Row Decoder step to get the HBase data into a Pentaho MapReduce (See: this posting to see how to use this step in PMR )
  2. Remove all line breaks from doc and create a single line that contains the entire XML/JSON data as a single record. If using the a text based input formatter for the mapper/reducer, then you can strip all the line breaks from that XML doc or JSON object so that the entire XML doc or JSON object appears as a single string record in the text file. You then put the single line in an HDFS file. You put multiple XML docs or JSON objects in separate lines in the same HDFS file.  When your mapper function receives the record, it will get the entire XML doc or JSON obj as a single line. Although I have seen this solution implemented, I do not recommend this solution because if your XML/JSON has data that requires line breaks that are not encoded then you will loose the line break formatting (i.e. you may have this issue if using CDATA XML tag).
  3. Use a custom input formatter. This is probably the most popular solution. This requires you to implement Hadoop’s Java classes for writing custom input formatter and record reader classes. The Apache Mahout projects has one of the most widely used implementations of this solution (see Mahout’sXmlInputFormat implementation).  Although this is probably the most popular method of processing XML files in MapReduce application, I find this not to be the most efficient because you effectively have to parse the XML document twice: once to get the entire doc as a single record that gets passed to your mapper, and a second time in you mapper when your mapper code will probably parse it for further processing.
  4. Store the data in binary/hex format. In this solution you would take the text data and put in a binary (byte array) format that can be stored in a single line. Then write binary format as a single line of string to an HDFS file. You put multiple “stringified” binary representations of XML/JSON text in separate lines in the same HDFS file. Then in the mapper you reverse the binary string format to the same textual string format. This will preserve all the formatting and special characters that your XML/JSON contains.

The last option is preferred method because it does not have the issues of the first few options.  The last option is also very easy to implement in PDI.  If you are writing your application in Java, you could implement it using Hadoop’s sequence file format. However, the current release of PDI cannot natively write out to a sequence file without. You could write custom Java code in PDI to write out to a Hadoop sequence file, but there is an easier way to get the same benefits of without using sequence files.

The attached sample PDI application demonstrates how to process non-splittable text formats such as XML and JSON in a MapReduce application. This example pre-processes XML docs into a hex string that can be written out to a single line. Each line can then be read into a mapper as a single record and converted back to XML and processed using PDI’s XML parsers. It uses a Java Expression PDI step to convert between XML string and its hex representation.

You can download this PDI solution here: xml_processing

You should start looking at the PDI Job implemented in master_job.kjb. This PDI job performs the following tasks:

  • Configures variable for Hadoop and various other parameters used in the application. You will need to configure the Hadoop cluster as needed in the Set Variables job entry.
  • Calls the store_binary_hdfs transformation that will read all the XML docs in the data directory, convert them to Hex strings, and write them out to a single HDFS files.
  • Runs a Pentaho MapReduce (map only) application to parse the XML and extract the wanted fields to a CSV file that is sent to the mapper output.

The sample app has been tested with the following software:

  1. Pentaho Data Integration 5.2
  2. Cloudera  CDH5.1
Have more questions? Submit a request

Comments

Powered by Zendesk