Atualmente, a Framework de Spark é amplamente usada em várias ferramentas e ambientes e, podemos ver isso em soluções locais como Cloudera, em soluções cloud como Databricks entre muitas outras.
- Apache Spark é uma estrutura de processamento de dados que permite executar rapidamente tarefas de processamento em conjuntos de dados muito grandes e também permite distribuir tarefas de processamento de dados em vários computadores, por si só ou em conjunto com outras ferramentas de computação distribuída.[1]
Um dos muitos desafios que enfrentamos ao usar processamento em Spark para transformações de dados, é a gravação desses resultados no disco (DataLake / DeltaLake). Se não for feito de forma correta, pode levar à criação de um grande número de pequenos arquivos no disco. Do ponto de vista da escrita, isso não é um problema, mas do ponto de vista da gestão e do consumo futuro dos dados gerados, isso pode causar um grave impacto no desempenho geral do DataLake/DeltaLake.
Alguns dos problemas causados [2]:
- Armazenamento ineficiente.
- Desempenho dos processos de computação.
- Sobrecarga das tarefas de agendamento internas do Spark.
É difícil encontrar uma solução ideal para esse problema e não existe uma abordagem out-of-the-box para isso. O adequado particionamento dos dados pode potencialmente minimizar o impacto, mas mesmo assim, não endereça o problema do elevado número de arquivos gerados. Na secção abaixo, descrevemos algumas abordagens que podem minimizar esse problema.
Abordagens
Abordagem 1 – Número fixo de arquivos
Solução
- Ao gravar em disco, podemos definir o número máximo de arquivos que podem ser gravados.
- Isso irá dividir os dados uniformemente, até o número máximo de partições definidas no método de repartição.
Exemplo
- Spark Geral
- DataBricks
Prós
- Fácil de implementar.
- Pode funcionar bem em âmbitos bem definidos, como tabelas não particionadas, onde o número de registos não altera muito.
Contras
- Âmbito limitado.
- Não funciona bem em tabelas particionadas não balanceadas. Como exemplo, situações em que temos partições de país e arquivos grandes para a China e arquivos muito pequenos para Luxemburgo.
Abordagem 2 – Redimensionamento dos arquivos pós-escrita
Solução
- Cria um processo independente que compactará os arquivos gerados pelo Spark.
- Este processo poderá será executado em uma schedule.
- Este irá ler os arquivos em uma pasta ou grupo de pastas e compactá-los de acordo com o tamanho especificado por arquivo.
Exemplo
- Spark Geral [3]
- O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.
- DataBricks [4]
- O código é separado em 2 partes, uma calcula o número ideal de partições para o valor definido por arquivo e a outra grava os dados com o tamanho especificado.
Prós
- Separa o processo de compactação do processo normal de carregamento de dados.
- Pode agregar a compactação de múltiplos carregamentos isolados.
- Pode ser feito de forma programada e fora dos horários de pico do projeto.
- É possível ter limites de quando compactar dados ou não.
- O esforço de implementação não é alto.
Contras
- Requer uma leitura e gravação de dados adicional. Isso significa que os dados são gravados uma vez pelo processo normal, lidos e gravados novamente por este processo.
- Pode não ser fácil encontrar um horário para agendar esse processo, especialmente em processos que estão constantemente a gravar novos dados.
Abordagem 3 – Reparticionamento Dinâmico [5]
Solução
- Com base no particionamento natural da tabela, esta abordagem cria um cálculo dinâmico do número esperado de linhas para gravar por arquivo.
- Esta solução, usa o número de linhas como o fator de divisão, em vez do tamanho dos arquivos descritos na abordagem 2. Isso é necessário porque não sabemos o tamanho que os dados terão no disco até que os gravemos fisicamente.
- A ideia aqui é, com base nas partições definidas e no número de linhas desejadas por arquivo de saída, criar um repartition_seed que será usado ao gravar no disco, para definir o número de linhas que se espera que sejam gravadas por arquivo.
Exemplo
- Spark Geral
Explicação do Código
A primeira componente é para a inicialização da Sessão de Spark e Leitura de Dados, já na 2ª componente, são definidas as variáveis de particionamento:
- partition_by_columns – define as colunas de partição (se tivermos mais do que uma, temos de as separar por vírgulas).
- desired_rows_per_ouput_file – define o número de linhas que serão gravadas por arquivo.
- partition_count – define o número total de linhas por repartição.
- list_cols – como a coluna repartition_seed, usada para a divisão, fará parte da lista de saída das colunas a serem gravadas no disco, armazenamos numa variável as colunas originais que tínhamos no dataframe, para que apenas sejam escritas as colunas originais.
Na componente de particionamento:
- Há uma join entre o dataframe de entrada e o número de registos por ficheiro, pelas chaves de partição definidas. Isso replicará o número total de linhas por ficheiro em todas as linhas do dataframe de dados.
- Uma nova coluna é criada, chamada repartition_seed, que é um valor aleatório multiplicado pelo número de partições e dividido pelo número esperado de linhas por arquivo de saída.
- Este cálculo, junto com as colunas de partição originalmente definidas, é usado para a repartição de dados. Nota importante – a expansão do tuplo *, só funciona no Spark0 ou superior. Se estivermos a usar uma versão mais antiga, será preciso escrever explicitamente todas as chaves de partição.
- Finalmente, como não queremos salvar a coluna repartition_seed no disco, apenas selecionamos as colunas originais dos dados de entrada.
Como esta abordagem pode, numa primeira análise, não ser tão fácil de compreender, fizemos alguns testes com diferentes de linhas por ficheiro, para demonstrar o seu funcionamento:
Exemplo 1 – com um limite de linhas por ficheiro de 10.000
Exemplo 2 – para os mesmos dados, mas com um limite de linhas por ficheiro de 1 Milhão de registos
Exemplo 3 – diferença entre as as diferentes partições para uma definição de limite de linhas por ficheiro de 3.5 Milhões de registos
- DataBricks
Prós
- Não há necessidade de ler e escrever os dados duas vezes, para vermos o tamanho esperado e distribuí-los.
- Funcionam bem em tabelas particionadas não balanceadas. Por exemplo, para casos em que temos partições por país e arquivos grandes, por exemplo para a China, e arquivos muito pequenos, por exemplo para o Luxemburgo. Como esta abordagem se baseia no número de linhas por ficheiro, os arquivos para a China serão mais que os gerados para o Luxemburgo.
- A flexibilidade da solução é igualmente bastante interessante, apesar do código não trivial necessário para implementá-la.
Contras
- Como não sabemos, de antemão, qual será o espaço o espaço em disco que será usado por cada linha, precisamos de uma primeira de fazer um teste de escrita para ter uma ideia sobre o número de linhas a escrever por arquivo. Assim conseguimos ter uma ideia do tamanho desejado por arquivo.
- Se alterarmos a estrutura da tabela / arquivo, para ter mais ou menos colunas, podemos ter de recalcular as linhas desejadas por arquivo de escrita.
- Para implementar esta abordagem, é necessário incorporar a lógica em cada um dos processos existentes de Spark.
Conclusão
O objetivo deste artigo é o de partilhar algumas das formas mais comuns para otimizar o tamanho dos arquivos gerados em Spark. O objetivo não é o de selecionar a melhor abordagem, já que a sua seleção dependerá de uma série de diferentes fatores, mas sim mostrar algumas das soluções disponíveis para abordar este tema.
A aplicabilidade das 3 abordagens apresentadas acima, dependerá das necessidades do projeto e do esforço envolvido:
- Abordagem 1 – Número fixo de arquivos – Esta é a solução mais simples, que pode ser facilmente aplicada em muitos contextos, apesar das limitações descritas.
- Abordagem 2 – Redimensionamento dos arquivos pós-escrita – Esta solução tem custos de computação potencialmente mais elevados, mas apresenta grandes vantagens relacionadas à segregação de qualquer código Spark. Isso pode ser visto como um processo separado, usado exclusivamente para agregação de arquivos.
- Abordagem 3 – Particionamento Dinâmico – Esta é uma abordagem muito interessante para este problema, pois permite-nos ter uma abordagem mais flexível. A necessidade de alterar o código Spark original e a definição do número de linhas a guardar por arquivo, em contextos específicos, pode ser aceitável tendo em conta os benefícios obtidos.
Como nota final, a gestão correta deste problema pode ter um impacto significativo no seu projeto, com benefícios não só do ponto de vista de desempenho, mas também nos potenciais custos.
Bibliografia
- [1] Optimizing Delta/Parquet Data Lakes for Apache Spark (Matthew Powers)
- [2] Partitioning a large Skewed Dataset in S3 with Spark Partition By Method (Nick Chammas)
- [3] Tuning the Number of Partitions
- [4] What is Apache Spark? The big data platform that crushed Hadoop
- [5] Too Small Data — Solving Small Files issue using Spark
Pedro Nunes António Vilares
BIG DATA LEAD SENIOR CONSULTANT