Contagem de palavras com PySpark
Link da documentação oficial do Hadoop:
http://hadoop.apache.org/
Link do meu Github:
https://github.com/toticavalcanti
- O SPARK É UMA ESTRUTURA DE PROCESSAMENTO BIG DATA, OPENSOURCE DESENVOLVIDA PARA TER VELOCIDADE, FACILIDADE DE USO E POSSIBILTAR ANÁLISE DE DADOS SOFISTICADAS
- O SPARK PODE SER 100X MAIS RÁPIDO QUE O HADOOP PARA PROCESSAMENTO DE DADOS EM LARGA ESCALA, EXPLORANDO COMPUTAÇÃO EM MEMÓRIA E OUTRAS OTIMIZAÇÕES
- TEM APIS FÁCEIS DE USAR PARA OPERAR EM GRANDES CONJUNTOS DE DADOS
- VEM COM BIBLIOTECAS DE ALTO NÍVEL, INCLUINDO SUPORTE PARA CONSULTAS SQL, DADOS DE FLUXO CONTÍNUO (STREAMING DE DADOS), APRENDIZADO DE MÁQUINA E PROCESSAMENTO DE GRAFOS
- PYSPARK É A LIGAÇÃO PYTHON PARA A PLATAFORMA SPARK E A API SPARK
- NÃO MUITO DIFERENTE DAS VERSÕES JAVA / SCALA
- O PYTHON É DINAMICAMENTE TIPADO, PORTANTO, OS RDDS (RESILIENT DISTRIBUTED DATASET) PODEM CONTER OBJETOS DE VÁRIOS TIPOS
- QUANDO EXECUTAMOS QUALQUER APLICATIVO SPARK, UM PROGRAMA DRIVER QUE TEM A FUNÇÃO MAIN É INICIADO E SEU SPARKCONTEXT É INICIADO TAMBÉM
- O DRIVER EM SEGUIDA EXECUTA AS OPERAÇÕES DENTRO DOS EXECUTORES NOS NÓS TRABALHADORES (WORKER NODES)
- O SPARKCONTEXT USA O PY4J PARA INICIAR UMA JVM E CRIA UM JAVASPARKCONTEXT
- POR PADRÃO, O PYSPARK TEM O SPARKCONTEXT DISPONÍVEL COMO “SC“, PORTANTO, CRIAR UM NOVO SPARKCONTEXT PODERÁ NÃO FUNCIONAR OU MOSTRAR UM WARNING
- RESILIENT DISTRIBUTED DATASETS É A ABSTRAÇÃO DE DADOS DO APACHE SPARK, OS RECURSOS COM OS QUAIS SÃO CRIADOS E IMPLEMENTADOS SÃO RESPONSÁVEIS POR SUA VELOCIDADE SIGNIFICATIVA
- OS RDDS SÃO ARMAZENAMENTOS DE DADOS PARTICIONADOS, SOMENTE PARA LEITURA, SÃO DISTRIBUÍDOS EM VÁRIAS MÁQUINAS (NORMALMENTE EM UM CLUSTER)
- OS PRINCIPAIS MOTIVOS PELOS QUAIS OS RDDS FUNCIONAM MELHOR PARA O PROCESSAMENTO DISTRIBUÍDO DE DADOS É PORQUE ELES NÃO APRESENTAM ALGUNS DOS PROBLEMAS QUE O PARADIGMA MAIS ANTIGO DE PROCESSAMENTO DE DADOS − O MAPREDUCE − TEM, (O SPARK ESTÁ SUBSTITUINDO CADA VEZ MAIS O HADOOP MAP-REDUCE)
PORQUE?
- REPLICAÇÃO: REPLICAÇÃO DE DADOS EM DIFERENTES PARTES DE UM CLUSTER É UM RECURSO DO HDFS QUE PERMITE QUE OS DADOS SEJAM ARMAZENADOS DE MANEIRA TOLERANTE A FALHAS. OS RDDS DO SPARK TRATAM A TOLERÂNCIA A FALHAS USANDO UM GRAFO DE LINHAGEM. O NOME DIFERENTE (RESILIENTE, EM OPOSIÇÃO A REPLICADO) INDICA ESSA DIFERENÇA DE IMPLEMENTAÇÃO NA FUNCIONALIDADE PRINCIPAL DO SPARK
- SERIALIZAÇÃO: A SERIALIZAÇÃO NO MAPREDUCE PREJUDICA, EM TERMOS DE VELOCIDADE, OPERAÇÕES COMO SHUFFLING E SORTING
- DISK IO: EMBORA O APACHE SPARK POSSA ARMAZENAR EM CACHE E PERSISTIR RDDS PARA ECONOMIZAR TEMPO DURANTE A COMPUTAÇÃO EM MEMÓRIA, OS RDDS SÃO PRINCIPALMENTE UM MECANISMO DE PROCESSAMENTO EM MEMÓRIA , O I/O DE DISCO É DISPENDIOSO E CONSOME TEMPO EM COMPUTAÇÕES PESADAS. EM CADA ESTÁGIO DE UM MAP-REDUCE HÁ I/O DE DISCO, O QUE NÃO ACONTECE COM O SPARK
VAMOS CRIAR UM APLICATIVO SPARK USANDO PYTHON QUE LEIA UM ARQUIVO E CONTE O NÚMERO DE VEZES QUE AS PALAVRAS IRÃO OCORRER NO ARQUIVO E TAMBÉM IGNORE TODAS AS LINHAS VAZIAS
- PASSO 1: ENTRE NO PYSPARK ABRINDO UM TERMINAL E DIGITANDO:
pyspark
- PASSO 2: CRIAR UM APLICATIVO SPARK, PRIMEIRO IMPORTANDO O SPARKCONTEXT E O SPARKCONF PARA O PYSPARK COM O COMANDO:
from pyspark import SparkContext, SparkConf
- PASSO 3: CRIE UM OBJETO DE CONFIGURAÇÃO E DEFINA O NOME DO APLICATIVO:
#Cria a app com o nome WordCount
conf = SparkConf().setAppName("WordCount")
#Instacia o SparkContext
sc = SparkContext.getOrCreate()
- PASSO 4: CARREGUE O(S) TEXTO(S) PARA O HDFS, NESSE CASO O TEXTO shakespeare.txt
#Dentro da pasta local da máquina Cloudera, no meu caso /Toti/textos/ rode o
#comando abaixo para copiar o shakespeare.txt para o HDFS
hadoop fs -put shakespeare.txt /user/toti/textos/
#Cria o RDD com o conteúdo do shakespeare.txt
contentRDD = sc.textFile("/user/toti/textos/shakespeare.txt")
- PASSO 5: FILTRAR LINHAS NÃO VAZIAS DO ARQUIVO CARREGADO (shakespeare.txt)
#Elimina as linha em branco
filter_empty_lines = contentRDD.filter(lambda x: len(x) > 0)
- PASSO 6: DIVIDIR O CONTEÚDO COM BASE NO ESPAÇO
#Splita as palavras pelo espaço em branco entre elas
words = filter_empty_lines.flatMap(lambda x: x.split(' '))
- PASSO 7: CONTE AS PALAVRAS
#Map-Reduce da contagem das palavras
wordcount = words.map(lambda x:(x,1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (x[1], x[0])).sortByKey(False)
- PASSO 8: VISUALIZAR O ARQUIVO APÓS O FILTRO
#Imprime o resultado
for word in wordcount.collect():
print(word)
#Salva o resultado no HDFS dentro da pasta /user/toti/textos/Wordcount/
wordcount.saveAsTextFile("/user/toti/textos/Wordcount")
- PASSO 9: SALVE OS DADOS FINAIS NO HDFS
#Salva o resultado no HDFS dentro da pasta /user/toti/textos/Wordcount/
wordcount.saveAsTextFile("/user/toti/textos/Wordcount")