7 Março 2018

Desenvolvimento de bases de dados em Hadoop – Hortonworks

Introdução

O avanço da tecnologia nos últimos anos e a enorme quantidade de dados que são gerados por dispositivos eletrónicos motivou o aparecimento do conceito de Big Data que conhecemos hoje. Para processar e armazenar tantos dados à escala do Big Data é preciso paralelizar a infraestrutura uma vez que servidores isolados têm sempre capacidade limitada. Esta infra-estrutura distribuída tem de ser capaz de processar e armazenar os dados de forma sincronizada. Para tirar partido de tal infraestrutura foram criadas tecnologias que permitissem implementar estes requisitos. Uma dessas tecnologias é o Hadoop que, de forma muito simplista, não é mais que uma framework baseada num sistema de ficheiros distribuído (HDFS – Hadoop File System).

Esta foi a motivação inicial para o aparecimento do Hadoop, contudo, hoje em dia, a utilização de Hadoop não tem de estar diretamente associada ao conceito de Big Data. Uma plataforma destas é por defeito escalável pelo que hoje, as organizações podem começar por instalar pequenos ambientes de Hadoop e aumentar a capacidade à medida que as necessidades o justificarem mesmo sem trabalharem com dados à escala do Big Data. Em todo o caso, o ecossistema de ferramentas, hoje em dia disponível em torno do Hadoop, está cada vez mais evoluído e permite assim implementar diferentes tipos de projetos que servem de base às iniciativas de Business Intelligence das organizações desde a implementação dos chamados Data Lakes, Enterprise Data Warehouses até a modelos dimensionais e analíticos, com diferentes padrões de carregamento e refrescamento, desde atualização de dados em tempo real, interativa ou os mais tradicionais com intervalos de atualização de 24 horas ou superior.

Por outro lado, o Hadoop não deve ser visto como substituto para outras tecnologias mais tradicionais para armazenamento e processamento de dados, como os motores de base de dados relacionais, mas sim como um acrescento às soluções existentes com vista a dar resposta a alguns novos desafios que se começam a colocar hoje em dia e que não acontecia há anos anteriores.

Relativamente à história, motivações e potencial do Big Data e Hadoop recomendo a leitura de um outro artigo do nosso blog.

Apesar do Hadoop ser uma tecnologia open source, hoje em dia existem empresas no mercado que fornecem distribuições de Hadoop estabilizadas e suportadas, o que facilita a instalação e configuração deste tipo de ambientes prontos a serem utilizados pelas empresas.

A BI4ALL ao fazer uma aposta nesta vertente de Big Data nos últimos anos, tem atualmente equipas e projetos a decorrer que consistem no desenvolvimento de projetos de integração e armazenamento de dados de suporte ao BI em ambientes de Hadoop. Estes projetos têm sido desenvolvidos nas distribuições de Hadoop da Cloudera e Hortonworks, que são atualmente o top 2 de fornecedores de Hadoop a nível mundial.

Assim, como acontece com outros fornecedores de plataformas e ferramentas de Business Intelligence que são atualmente parceiros da BI4ALL, existe neste momento uma aposta em diferentes certificações que estes dois fornecedores têm ao dispor, com vista a fortalecer as parcerias com estas empresas. Este processo, juntamente com a experiência que decorre nos últimos anos em projetos, fazem neste momento da BI4ALL uma empresa capaz de fornecer serviços de consultoria especializados nesta área de tecnologia ainda muito específica.

Juntamente com outros colegas na nossa organização, tenho feito parte de uma dessas equipas de projeto que tem trabalhado em Hadoop, mais propriamente na distribuição da Hortonworks, e queria apresentar, através de exemplos práticos, algumas das ferramentas que existem no ecossistema e que são usadas nos nossos projetos. Espero desta forma contribuir para motivar os nossos clientes e todas as pessoas que gostam de desenvolvimento de bases de dados a considerarem a utilização destas tecnologias para o efeito.

Assim, de seguida, vou apresentar um exemplo prático que explora os 3 passos essenciais a realizar para a implementação de uma solução na plataforma de dados da Hortonworks:

1- Ingestão de dados
2- Processamento de dados
3- Análise de dados

Para realizar estes exercícios, é utilizada a máquina virtual de teste da Hortonworks (sandbox) que está disponível no Azure e uma base de dados exemplo de MySQL que pode ser criada e no motor MySQL existente na sandbox. Esta base de dados, chamada classicmodels tem informação aleatória de encomendas e as entidades usadas neste exercício são: customers, products, orders e orderdetails. O objetivo é criar um modelo dimensional para análise dos dados de encomendas em Hive (um motor de base de dados SQL que usa o HDFS como base). O código escrito neste exercício encontra-se disponível para download no final do artigo.

1 - Ingestão de dados

No ecossistema Hadoop fornecido pela Hortonworks estão disponíveis duas ferramentas essenciais que servem para a ingestão dados no HDFS, são elas o Sqoop e o Flume. Neste exercício, exploro o Sqoop que é utilizado para importar dados de bases de dados relacionais, como SQL Server, Oracle ou MySQL. Se os sistemas fontes nos nossos projetos são sistemas operacionais baseados em bases de dados relacionais, este deve ser o método de ingestão preferencial. Para invocar o processo Sqoop, recorremos a um comando executado através da linha de comandos num dos nós do cluster Hadoop que tenha instalado o cliente Sqoop:

sqoop import --connect jdbc:mysql://localhost/classicmodels --username root --password hadoop --table customers --target-dir classicmodels/ingest/customers --split-by customerNumber --fields-terminated-by '\t' --null-string '' --null-non-string '' --driver com.mysql.jdbc.Drivercom.mysql.jdbc.Driver

Esta linha de comandos Sqoop permite fazer a importação da tabela customers da base de dados classicmodels da instância MySQL. Algumas configurações obrigatórias são:

• A ligação à base de dados é feita configurando os parâmetros connect, username e password.
• A diretoria destino do HDFS é especificada no parâmetro target-dir.
• A coluna usada para dividir a importação dos dados em diversas unidades de trabalho é especificada no parâmetro split-by.
• Para passar um delimitador de coluna no ficheiro destino, que seja diferente do valor por defeito, na importação de dados é usado o parâmetro fields-terminated-by.

Depois de executados os comandos Sqoop para importar todas as tabelas necessárias no HDFS, o resultado será o seguinte:

Ilustração 1 – Diretoria raiz da solução é a pasta classicmodels/ do utilizador no HDFS e todos os dados que resultaram da ingestão de dados estão em diferentes pastas por baixo da diretoria ingest/ criadas pelos comandos Sqoop.

Nota: A importação da tabela products tem apenas um ficheiro (em vez de quatro para as restantes importações). Isto deve-se ao facto desta tabela não ter um campo numérico que possa ser usado para dividir o trabalho e sendo assim foi forçada a utilização de apenas uma unidade de trabalho especificando o parâmetro num-mappers com o valor 1.

Antes de passar ao passo seguinte, podemos desde já criar a base de dados em Hive usada para implementação da solução e tabelas externas para consultar o conteúdo de cada uma das importações recorrendo a linguagem SQL. Como exemplo, fica a criação da tabela externa para consulta dos dados da entidade customer:

CREATE DATABASE IF NOT EXISTS classicmodels
LOCATION '/user/maria_dev/classicmodels/';

USE classicmodels;

CREATE EXTERNAL TABLE IF NOT EXISTS customers (
customerNumber INT,
customerName STRING,
contactLastName STRING,
contactFirstName STRING,
phone STRING,
addressLine1 STRING,
addressLine2 STRING,
city STRING,
state STRING,
postalCode STRING,
country STRING,
salesRepEmployeeNumber STRING,
creditLimit STRING
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
LOCATION '/user/maria_dev/classicmodels/ingest/customers/';

Este excerto do script DDL faz parte do script para criação da base de dados e todas as tabelas utilizadas neste exercício. Para executar este script podemos recorrer a um comando hive executado na linha de comandos do nó do cluster Hadoop que tenha instalado o cliente Hive:

hive -f classicmodels.hql

Esta tabela bem como toda a estrutura da base de dados fica disponível na instância do Hive. O Hive também tem disponível um programa do tipo shell, denominado beeline, que permite executar consultas SQL de forma interativa:

beeline -u jdbc:hive2://localhost:10000/classicmodels -n maria_dev

Uma vez dentro do programa shell é possível verificar que a base de dados e tabelas estão criadas após a execução do script DDL:

Ilustração 2 – Os comandos show databases e show tables são usados para verificar as base de dados e tabelas criadas no Hive. Incluí as tabelas externas que permitem aceder aos dados importados recorrendo a linguagem SQL.

 

Ilustração 3 – Exemplo de uma consulta de dados a uma tabela externa do Hive recorrendo a linguagem SQL.

 

2 - Transformação de dados

Uma das principais ferramentas no ecossistema Hadoop da Hortonworks para transformação de dados é o Pig cuja sintaxe e palavras reservadas tem algumas semelhanças com o SQL o que a torna bastante simples de aprender no que diz respeito a operações mais básicas.

De seguida, são apresentados dois programas em Pig que servem para processar e carregar os dados numa tabela dimensão e na tabela factual do modelo (DimCustomer e FactOrder):

--use user defines functions already available in Hortonworks Data Platform
DEFINE Stitch org.apache.pig.piggybank.evaluation.Stitch;
DEFINE Over org.apache.pig.piggybank.evaluation.Over('id:int');

--load customers data imported via sqoop
a = LOAD '/user/maria_dev/classicmodels/ingest/customers/' USING PigStorage();

--select fields required for customer dimension
b = FOREACH a GENERATE
(chararray)$0 AS customerNumber,
(chararray)$1 AS customerName,
(chararray)$7 AS city,
(chararray)$8 AS state,
(chararray)$10 AS country;

--use Over (window function in Pig), Similar do SQL OVER clause to calculate dimension surrogate key.
c = GROUP b ALL;
d = FOREACH c {
d1 = ORDER b BY customerNumber;
GENERATE flatten(Stitch(d1, Over(d1,'row_number')));
}

e = FOREACH d GENERATE
stitched::id AS idcustomer,
stitched::customerNumber AS codcustomer,
stitched::customerName AS dsccustomername,
stitched::city AS dsccity,
stitched::state AS dscstate,
stitched::country AS dsccountry;

--creates static record for dimension unknown member limiting one record in initial relation.
f = LIMIT b 1;

g = FOREACH f GENERATE
(int)'-1' AS idcustomer,
(chararray)'Unknown' AS codcustomer,
(chararray)'Unknown' AS dsccustomername,
(chararray)'Unknown' AS dsccity,
(chararray)'Unknown' AS dscstate,
(chararray)'Unknown' AS dsccountry;

h = UNION e, g;

--Stores the result in DimCustomer table in Hive.
STORE h INTO 'classicmodels.DimCustomer' USING org.apache.hive.hcatalog.pig.HCatStorer();

--load orders data imported via sqoop.
orders_a = LOAD '/user/maria_dev/classicmodels/ingest/orders/' USING PigStorage();

--load orderdetails data imported via sqoop.
orderdetails_a = LOAD '/user/maria_dev/classicmodels/ingest/orderdetails/' USING PigStorage();

--load dimCustomer data already processed in Hive.
dimcustomer = LOAD 'classicmodels.DimCustomer' USING org.apache.hive.hcatalog.pig.HCatLoader();

--load dimProduct data already processed in Hive.
dimproduct = LOAD 'classicmodels.DimProduct' USING org.apache.hive.hcatalog.pig.HCatLoader();

--load dimOrderStatus data already processed in Hive.
dimorderstatus = LOAD 'classicmodels.DimOrderStatus' USING org.apache.hive.hcatalog.pig.HCatLoader();

--select fields required from orders data.
orders_b = FOREACH orders_a GENERATE
(int)$0 AS orderNumber,
(datetime)$1 AS orderDate,
(datetime)$2 AS requiredDate,
(datetime)$3 AS shippedDate,
(chararray)$4 AS status,
(chararray)$6 AS customerNumber;

--select fields required from order details data.
orderdetails_b = FOREACH orderdetails_a GENERATE
(int)$0 AS orderNumber,
(chararray)$1 AS productCode,
(int)$2 AS quantityOrdered,
(bigdecimal)$3 AS priceEach,
(int)$4 AS orderLineNumber;

--lookup ids from customers, products and status dimension using left outer join in all dimensions.
c = JOIN orders_b BY orderNumber, orderdetails_b BY orderNumber;
d = JOIN c BY orders_b::customerNumber LEFT OUTER, dimcustomer BY codcustomer;
e = JOIN d BY c::orders_b::status LEFT OUTER, dimorderstatus BY codorderstatus;
f = JOIN e BY d::c::orderdetails_b::productCode LEFT OUTER, dimproduct BY codproduct;

--generate all fields required for FactOrder table including dimension ids and calculates date ids for each different date. Also calculates total order line amount based on quantity and unit price.
g = FOREACH f GENERATE
e::d::c::orders_b::orderNumber AS numorder,
e::d::c::orderdetails_b::orderLineNumber AS numorderline,
e::dimorderstatus::idorderstatus AS idorderstatus,
CASE WHEN e::d::c::orders_b::orderDate IS NULL
THEN -1
ELSE GetYear(e::d::c::orders_b::orderDate) * 10000 + GetMonth(e::d::c::orders_b::orderDate) * 100 + GetDay(e::d::c::orders_b::orderDate)
END AS idorderdate,
CASE WHEN e::d::c::orders_b::requiredDate IS NULL
THEN -1
ELSE GetYear(e::d::c::orders_b::requiredDate) * 10000 + GetMonth(e::d::c::orders_b::requiredDate) * 100 + GetDay(e::d::c::orders_b::requiredDate)
END AS idrequireddate,
CASE WHEN e::d::c::orders_b::shippedDate IS NULL
THEN -1
ELSE GetYear(e::d::c::orders_b::shippedDate) * 10000 + GetMonth(e::d::c::orders_b::shippedDate) * 100 + GetDay(e::d::c::orders_b::shippedDate)
END AS idshippeddate,
e::d::dimcustomer::idcustomer AS idcustomer,
dimproduct::idproduct AS idproduct,
e::d::c::orderdetails_b::quantityOrdered AS qtyorder,
e::d::c::orderdetails_b::priceEach AS amtorder,
e::d::c::orderdetails_b::quantityOrdered * e::d::c::orderdetails_b::priceEach AS amttotalorder;

--stores the result in FactOrder table in Hive.
STORE g INTO 'classicmodels.FactOrder' USING org.apache.hive.hcatalog.pig.HCatStorer();

Mais uma vez estes programas em Pig podem ser executados num dos nós do cluster Hadoop através da ferramenta cliente do Pig que está instalada e que é executada na linha de comandos:

pig -f classicmodels_dimCustomer.pig -useHCatalog -exectype tez
pig -f classicmodels_factOrder.pig -useHCatalog -exectype tez

Notas:

• Cada programa Pig carrega uma tabela no modelo. Primeiro são executados os programas para processamento das tabelas dimensões e só depois deve ser executado o programa para processamento dan tabela factual uma vez que depende dos dados previamente carregados nas dimensões.
• O parâmetro useHCatalog tem de ser passado na execução do cliente Pig para permitir ao Pig aceder ao Hcatalog que é o repositório com os meta dados associados à instância do Hive. Sem passar este parâmetro a leitura e escrita de dados de tabelas em Hive a partir do Pig não é possível.
• O parâmetro exectype com o valor tez, permite executar os programas Pig num job do tipo Tez que é uma framework mais rápida que a tradicional framework de processamento de dados em Hadoop uma vez que executa um job apenas e guarda resultados intermédios em memória, ao contrário da framework Map Reduce que guarda resultados intermédios em disco e lança múltiplos jobs por cada operação. A existência deste modo Tez está disponível de raiz na distribuição de Hortonworks sendo que no caso do Pig esta framework não é usada por defeito, daí ser necessário passar o parâmetro de forma explícita aquando da execução dos programas. Por outro lado, todas as consultas SQL em Hive executam por defeito em jobs Tez o que permite executar consultas SQL de forma interativa.

3 - Análise de dados

O último passo do exercício tem a ver com a análise de dados do modelo dimensional que foi implementado e carregado usando Pig e Hive. Como já foi referido neste artigo, o Hive tem como principal beneficio ter um interpretador de linguagem SQL sendo essa a principal ferramenta para explorar os dados nesta solução implementada em Hadoop.

De seguida segue o exemplo e uma consulta de dados SQL executada através do Beeline que devolve o número de encomendas submetidas no primeiro trimestre do ano 2004 distribuídas pelos países de origem dos respetivos clientes:

SELECT c.dsccountry, SUM(f.qtyorder) AS qtyorder2014q3
FROM factorder f
INNER JOIN dimcustomer c
ON f.idcustomer = c.idcustomer
INNER JOIN dimdate d
ON f.idorderdate = d.iddate
WHERE d.numyearquarter = 20043
GROUP BY c.dsccountry
ORDER BY qtyorder2014q3;

Ilustração 4 – Resultado devolvido pela consulta SQL desenvolvida para dar resposta à análise de dados pretendida.

 

Outras ferramentas self-service para exploração de dados podem ser utilizadas para aceder às bases de dados Hive. Neste exercício e como exemplo recorri ao Power BI Desktop da Microsoft. A Hortonworks disponibiliza no seu website o conector ODBC que tem de ser instalado e configurado nos PCs do cliente. É este conector que é usado pelo Power BI Desktop para estabelecer ligação e extrair os dados para a aplicação do cliente por forma a ser explorada e inclusivamente tratada ao gosto de cada utilizador:

Ilustração 5 – O conector ODBC devolve os meta dados das bases de dados Hive permitindo ao utilizador do Power BI Desktop selecionar as tabelas pretendidas da base de dados classicmodels.

 

Ilustração 6 – Depois de selecionadas as tabelas e importados os dados, o Power BI Desktop carrega automaticamente o modelo estrela com base nas relações das tabelas por nome e tipo de dados das colunas.

 

Ilustração 7 – Relatório exemplo em Power BI que permite explorar quantidade e valor das encomendas nas diferentes dimensões de análise recorrendo apenas a uma abordagem drag and drop.

 

Conclusão

Neste artigo foram apresentados 3 passos essenciais para a criação de uma solução de dados em Hadoop e em particular na plataforma de dados Hortonworks. Isto trata-se de um caso simples para utilização das ferramentas referidas e não reproduz de forma completamente fiel uma implementação preparada para produção. Isso dependerá dos requisitos funcionais e não funcionais de cada solução sendo que, muito provavelmente, os mesmos poderão ser implementados recorrendo a estas mesmas ou outras ferramentas que estão disponíveis no ecossistema Hadoop da Hortonworks (ou outro distribuidor de Hadoop), de forma mais ou menos customizada. Destaco as seguintes ferramentas que podem e devem ser utilizadas caso haja essa necessidade:

Oozie: orquestrador e agendamento de processos
Flume: Ingestão de dados em tempo real
Ranger: Segurança e autorização
Spark: Processamento de dados; data science; machine learning