7 March 2018

Database Development in Hadoop – Hortonworks

Introduction

The advancement of technology in recent years and the huge amount of data that is generated by electronic devices motivated the emergence of the concept of Big Data that we know today. To process and store so much data in a Big Data scale is necessary to parallelize the infrastructure since isolated servers will eventually be limited in capacity. This distributed infrastructure must be able to process and store data in a synchronized way. In order to take advantage of such infrastructure, technologies were created that would allow these requirements to be implemented. One of these technologies is Hadoop, which, in a very simplistic way, is nothing more than a framework based on a distributed file system (HDFS - Hadoop File System).

This was the initial motivation for the appearance of Hadoop, however, today, the use of Hadoop does not have to be directly associated with the Big Data concept. These platforms are scalable by default, so nowadays, organizations can start by provisioning small Hadoop environments and increase their capacity as needed even without working with data at a Big Data scale. In any case, the tooling ecosystem now available around Hadoop is increasingly evolved and allows to implement different types of projects that have been the basis of the organizations' Business Intelligence initiatives since the implementation of the so-called Data Lakes, Enterprise Data Warehouses to dimensional and analytical models with different loading and refresh patterns from real-time, interactive data updating, or the more traditional ones with 24-hour or greater refresh intervals. On the other hand, I think Hadoop should not be seen as a substitute for other traditional technologies for data storage and processing, such as relational database engines, but rather as an addition to existing solutions to meet some new challenges that organizations face today unlike in the past.

Regarding the history, motivations and potential of Big Data and Hadoop I recommend reading another article on our blog.

Although Hadoop is an open source technology, there are companies in the market today that provide stable and supported Hadoop distributions which makes relatively easy to install and configure these environments.

BI4ALL, when making a stake in this aspect of Big Data in recent years, currently has ongoing teams and projects that consist of the development of data integration and repository projects supporting BI, on top of Hadoop environments. These projects have been developed in the Hadoop distributions of Cloudera and Hortonworks, which are currently the top two Hadoop vendors worldwide.

As with other Business Intelligence platforms and tools vendors with whom BI4ALL collaborates, there has been an investment in certifications that these two vendors have in place with the goal of also establishing partnerships with these companies. This process, along with the experience that has taken place in recent years in projects, makes BI4ALL a company capable of providing specialist-consulting services in this very specific technological area.

Along with other colleagues in our organization, I have been part of one of these project teams who have been working with Hadoop, more specifically in the Hortonworks distribution, and I want to present, using practical examples, some of the tools that exist in the ecosystem and are used in our projects. This way, I hope to help motivate our customers and everyone who likes database development to consider the use of these technologies for that purpose.

Following in this article, is the practical example that explores the 3 essential steps that need to be taken to implement a solution on the Hortonworks data platform:

1- Data Ingestion,
2- Data Transformation
3- Data Analysis

To perform these exercises, I used the virtual test machine from Hortonworks (sandbox) that is available in Azure and a sample database that can be created in the existing MySQL engine in the sandbox. This database, called classicmodels has dummy orders information and the entities used in this exercise are: customers, products, orders and orderdetails. The goal is to create a dimensional model for analyzing orders data in Hive (a SQL database engine on top of Hadoop HDFS). You can find all artifacts used in this exercise available for download at the end of the article.

1 - Data ingestion

In the Hadoop ecosystem provided by Hortonworks there are two essential tools available for ingestion data into HDFS: Sqoop and Flume. Sqoop tool is used to import data from relational databases such as SQL Server, Oracle or MySQL. If the source systems in your projects are relational database based operating systems this should be the preferred method of data ingestion. Therefore, in this exercise I explore this tool to import sample data installed in MySQL engine in the sandbox. To invoke the Sqoop process we use a command executed in the command line tool on one of the Hadoop nodes in the cluster that has installed the Sqoop client:

sqoop import --connect jdbc:mysql://localhost/classicmodels --username root --password hadoop --table customers --target-dir classicmodels/ingest/customers --split-by customerNumber --fields-terminated-by '\t' --null-string '' --null-non-string '' --driver com.mysql.jdbc.Drivercom.mysql.jdbc.Driver

This Sqoop command imports the customers table from the classicmodels database of the MySQL instance. Some mandatory parameters are:

• The database connections is defined by configuring the parameters connect, username and password.
• The HDFS target directory is specified in the target-dir parameter.
• The column used to split the data import into multiple work units is specified in the split-by parameter.
• To specify a column delimiter in the destination file, which is different from the default value, the fields-terminated-by parameter needs to be provided.

After all, Sqoop commands are executed to import necessary tables in HDFS, the result will be as follows:

Figure 1 – The root directory of the solution is the classicmodels/ folder of the current user in HDFS and all the data that resulted from the ingestion processes are in different folders under the ingest/ directory created by the Sqoop commands.

Note: The result of the import process of products table has only one file (instead of four for the remaining imports). This is due to the fact that this table does not have a numerical field that can be used to split the work and thus, it was forced to use only one unit of work by specifying the parameter num-mappers with the value 1.

Before proceeding to the next step, we can create the Hive database used to implement the solution including the external tables to look into the content of each of the imports using the SQL language. The following example show the creation of the external table to query the data of the customers entity:

CREATE DATABASE IF NOT EXISTS classicmodels
LOCATION '/user/maria_dev/classicmodels/';

USE classicmodels;

CREATE EXTERNAL TABLE IF NOT EXISTS customers (
customerNumber INT,
customerName STRING,
contactLastName STRING,
contactFirstName STRING,
phone STRING,
addressLine1 STRING,
addressLine2 STRING,
city STRING,
state STRING,
postalCode STRING,
country STRING,
salesRepEmployeeNumber STRING,
creditLimit STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/maria_dev/classicmodels/ingest/customers/';

This is part of the major DDL script used to create the database and all the tables Hive used in this exercise. To execute this script, it is used the hive command in the command line of the node in the Hadoop cluster that has the Hive client installed:

hive -f classicmodels.hql

This table as well as the entire database structure is available in the Hive instance. Hive also has a shell-type program, called beeline that allows the execution of SQL queries interactively:

beeline -u jdbc:hive2://localhost:10000/classicmodels -n maria_dev

After the DDL script is executed in the previous step and once inside the beeline shell program is possible to verify that the database and tables are created:

Figure 2 – The show databases and show tables commands are used to check the database and tables created in Hive. It includes the external tables that allow access to the imported data using the SQL language.

 

Figure 3 – Example of a data retrieved from an external table in Hive using SQL language.

 

2 - Data transformation

One of the main tools in Hortonworks' Hadoop ecosystem for data transformation is Pig where the syntax and reserved words have some similarities to SQL, which makes it quite simple to learn with regard to operations that are more basic.

Following are two programs in Pig that are used to process and load the data in a dimension table and in the factual table of the model (DimCustomer and FactOrder):

--use user defines functions already available in Hortonworks Data Platform
DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch;
DEFINE Over org.apache.pig.piggybank.evaluation.Over('id:int');

--load customers data imported via sqoop
a = LOAD '/user/maria_dev/classicmodels/ingest/customers/' USING PigStorage();

--select fields required for customer dimension
b = FOREACH a GENERATE
(chararray)$0 AS customerNumber,
(chararray)$1 AS customerName,
(chararray)$7 AS city,
(chararray)$8 AS state,
(chararray)$10 AS country;

--use Over (window function in Pig), Similar do SQL OVER clause to calculate dimension surrogate key.
c = GROUP b ALL;
d = FOREACH c {
d1 = ORDER b BY customerNumber;
GENERATE flatten(Stitch(d1, Over(d1,'row_number')));
}

e = FOREACH d GENERATE
stitched::id AS idcustomer,
stitched::customerNumber AS codcustomer,
stitched::customerName AS dsccustomername,
stitched::city AS dsccity,
stitched::state AS dscstate,
stitched::country AS dsccountry;

--creates static record for dimension unknown member limiting one record in initial relation.
f = LIMIT b 1;

g = FOREACH f GENERATE
(int)'-1' AS idcustomer,
(chararray)'Unknown' AS codcustomer,
(chararray)'Unknown' AS dsccustomername,
(chararray)'Unknown' AS dsccity,
(chararray)'Unknown' AS dscstate,
(chararray)'Unknown' AS dsccountry;

h = UNION e, g;

--Stores the result in DimCustomer table in Hive.
STORE h INTO 'classicmodels.DimCustomer' USING org.apache.hive.hcatalog.pig.HCatStorer();

--load orders data imported via sqoop.
orders_a = LOAD '/user/maria_dev/classicmodels/ingest/orders/' USING PigStorage();

--load orderdetails data imported via sqoop.
orderdetails_a = LOAD '/user/maria_dev/classicmodels/ingest/orderdetails/' USING PigStorage();

--load dimCustomer data already processed in Hive.
dimcustomer = LOAD 'classicmodels.DimCustomer' USING org.apache.hive.hcatalog.pig.HCatLoader();

--load dimProduct data already processed in Hive.
dimproduct = LOAD 'classicmodels.DimProduct' USING org.apache.hive.hcatalog.pig.HCatLoader();

--load dimOrderStatus data already processed in Hive.
dimorderstatus = LOAD 'classicmodels.DimOrderStatus' USING org.apache.hive.hcatalog.pig.HCatLoader();

--select fields required from orders data.
orders_b = FOREACH orders_a GENERATE
(int)$0 AS orderNumber,
(datetime)$1 AS orderDate,
(datetime)$2 AS requiredDate,
(datetime)$3 AS shippedDate,
(chararray)$4 AS status,
(chararray)$6 AS customerNumber;

--select fields required from order details data.
orderdetails_b = FOREACH orderdetails_a GENERATE
(int)$0 AS orderNumber,
(chararray)$1 AS productCode,
(int)$2 AS quantityOrdered,
(bigdecimal)$3 AS priceEach,
(int)$4 AS orderLineNumber;

--lookup ids from customers, products and status dimension using left outer join in all dimensions.
c = JOIN orders_b BY orderNumber, orderdetails_b BY orderNumber;
d = JOIN c BY orders_b::customerNumber LEFT OUTER, dimcustomer BY codcustomer;
e = JOIN d BY c::orders_b::status LEFT OUTER, dimorderstatus BY codorderstatus;
f = JOIN e BY d::c::orderdetails_b::productCode LEFT OUTER, dimproduct BY codproduct;

--generate all fields required for FactOrder table including dimension ids and calculates date ids for each different date. Also calculates total order line amount based on quantity and unit price.
g = FOREACH f GENERATE
e::d::c::orders_b::orderNumber AS numorder,
e::d::c::orderdetails_b::orderLineNumber AS numorderline,
e::dimorderstatus::idorderstatus AS idorderstatus,
CASE WHEN e::d::c::orders_b::orderDate IS NULL
THEN -1
ELSE GetYear(e::d::c::orders_b::orderDate) * 10000 + GetMonth(e::d::c::orders_b::orderDate) * 100 + GetDay(e::d::c::orders_b::orderDate)
END AS idorderdate,
CASE WHEN e::d::c::orders_b::requiredDate IS NULL
THEN -1
ELSE GetYear(e::d::c::orders_b::requiredDate) * 10000 + GetMonth(e::d::c::orders_b::requiredDate) * 100 + GetDay(e::d::c::orders_b::requiredDate)
END AS idrequireddate,
CASE WHEN e::d::c::orders_b::shippedDate IS NULL
THEN -1
ELSE GetYear(e::d::c::orders_b::shippedDate) * 10000 + GetMonth(e::d::c::orders_b::shippedDate) * 100 + GetDay(e::d::c::orders_b::shippedDate)
END AS idshippeddate,
e::d::dimcustomer::idcustomer AS idcustomer,
dimproduct::idproduct AS idproduct,
e::d::c::orderdetails_b::quantityOrdered AS qtyorder,
e::d::c::orderdetails_b::priceEach AS amtorder,
e::d::c::orderdetails_b::quantityOrdered * e::d::c::orderdetails_b::priceEach AS amttotalorder;

--stores the result in FactOrder table in Hive.
STORE g INTO 'classicmodels.FactOrder' USING org.apache.hive.hcatalog.pig.HCatStorer();

Again, these programs in Pig can be run on one of the nodes of the Hadoop cluster through the Pig client tool that is installed and running on the command line:

pig -f classicmodels_dimCustomer.pig -useHCatalog -exectype tez
pig -f classicmodels_factOrder.pig -useHCatalog -exectype tez

Notes:

• Each Pig program loads a table into the model. First the programs for processing the dimension tables are executed and only then must the program be executed to process the fact table since it depends on the data previously loaded in the dimensions.
• The useHCatalog parameter must be passed in the execution of the Pig client to allow Pig to access the Hcatalog that is the repository with the metadata associated with the Hive instance. Without specifying this parameter, reading and writing data from tables in Hive from Pig is not possible.
• The exectype parameter with the tez value enables Pig programs to run in a Tez job that is a faster framework than the traditional data processing framework in Hadoop. This happens because only one job is executed, contrary to map reduce that launches multiple jobs, and stores intermediate results in memory, in reverse of the map reduce framework that saves intermediate results in disk. The existence of this Tez mode is available by default in Hortonworks distribution. However, in Pig programs this framework is not used by default, hence, it is necessary to specify this parameter explicitly when executing the programs. On the other hand, all SQL queries in Hive run by default on Tez jobs, which offer great performance and enables the execution of SQL queries interactively.

3 - Data Analysis

The last step of the exercise has to do with the data analysis of the dimensional model that was implemented and loaded using Pig and Hive. As already mentioned in this article, Hive has as its main benefit to have an interpreter of SQL language, which makes it the main tool to explore the data in this solution implemented in Hadoop.

The following example shows a SQL query executed through Beeline that returns the number of orders submitted in the first quarter of the year 2004 and the same distributed by the countries of origin of the respective clients:

SELECT c.dsccountry, SUM(f.qtyorder) AS qtyorder2014q3
FROM factorder f
INNER JOIN dimcustomer c
ON f.idcustomer = c.idcustomer
INNER JOIN dimdate d
ON f.idorderdate = d.iddate
WHERE d.numyearquarter = 20043
GROUP BY c.dsccountry
ORDER BY qtyorder2014q3;

Figure 4 – Result returned by the SQL query developed to answer to the intended data analysis.

 

Other self-service tools for data discovery can be used to access the Hive database. In this exercise, I turned to Microsoft's Power BI Desktop. Hortonworks has available on its website the ODBC connector that must be installed and configured on the client PCs. This connector is used by Power BI Desktop to connect and extract the data to the client application in order to be discovered and even treated differently by each user if necessary.

Figure 5 – The ODBC connector returns the metadata from the Hive databases allowing the Power BI Desktop user to select the desired tables from the classicmodels database.

 

Figure 6 – After selecting the tables and importing the data, Power BI Desktop automatically loads the star model based on table relationships by name and data type of the columns.

 

Figure 7 – A sample report in Power BI, developed using only a drag and drop approach, that allows you to explore quantity and value of orders in the different dimensions.

 

Conclusion

In this article were presented 3 essential steps for creating a data solution in Hadoop and in particular in the Hortonworks data platform. This is a simple case using the referenced tools and does not accurately reproduce a production-ready implementation. This will depend on the functional and non-functional requirements of each solution, however will most likely be implemented using these same or other tools that are available in the Hortonworks Hadoop ecosystem (or other Hadoop distributor) in a more or less customized way. I highlight the following tools that can and should be used in real projects if needed:

Oozie: workflow and scheduling
Flume: Real time data ingestion
Ranger: Security and Authorization
Spark: Data processing; Data science; Machine learning

Blog