Contagem de palavras com PySpark

Link da documentação oficial do Hadoop:

http://hadoop.apache.org/

Link do meu Github:

https://github.com/toticavalcanti

Contagem de palavras com PySpark

  • O SPARK É UMA ESTRUTURA DE PROCESSAMENTO BIG DATA, OPENSOURCE DESENVOLVIDA PARA TER VELOCIDADE, FACILIDADE DE USO E POSSIBILTAR ANÁLISE DE DADOS SOFISTICADAS
  • O SPARK PODE SER 100X MAIS RÁPIDO QUE O HADOOP PARA PROCESSAMENTO DE DADOS EM LARGA ESCALA, EXPLORANDO COMPUTAÇÃO EM MEMÓRIA E OUTRAS OTIMIZAÇÕES
  • TEM APIS FÁCEIS DE USAR PARA OPERAR EM GRANDES CONJUNTOS DE DADOS
  • VEM COM BIBLIOTECAS DE ALTO NÍVEL, INCLUINDO SUPORTE PARA CONSULTAS SQL, DADOS DE FLUXO CONTÍNUO (STREAMING DE DADOS), APRENDIZADO DE MÁQUINA E PROCESSAMENTO DE GRAFOS
  • PYSPARK É A LIGAÇÃO PYTHON PARA A PLATAFORMA SPARK E A API SPARK
  • NÃO MUITO DIFERENTE DAS VERSÕES JAVA / SCALA
  • O PYTHON É DINAMICAMENTE TIPADO, PORTANTO, OS RDDS (RESILIENT DISTRIBUTED DATASET) PODEM CONTER OBJETOS DE VÁRIOS TIPOS
  • QUANDO EXECUTAMOS QUALQUER APLICATIVO SPARK, UM PROGRAMA DRIVER QUE TEM A FUNÇÃO MAIN É INICIADO E SEU SPARKCONTEXT É INICIADO TAMBÉM
  • O DRIVER EM SEGUIDA EXECUTA AS OPERAÇÕES DENTRO DOS EXECUTORES NOS NÓS TRABALHADORES (WORKER NODES)
  • O SPARKCONTEXT USA O PY4J PARA INICIAR UMA JVM E CRIA UM JAVASPARKCONTEXT
  • POR PADRÃO, O PYSPARK TEM O SPARKCONTEXT DISPONÍVEL COMO “SC“, PORTANTO, CRIAR UM NOVO SPARKCONTEXT PODERÁ NÃO FUNCIONAR OU MOSTRAR UM WARNING
  • RESILIENT DISTRIBUTED DATASETS É A ABSTRAÇÃO DE DADOS DO APACHE SPARK, OS RECURSOS COM OS QUAIS SÃO CRIADOS E IMPLEMENTADOS SÃO RESPONSÁVEIS POR SUA VELOCIDADE SIGNIFICATIVA
  • OS RDDS SÃO ARMAZENAMENTOS DE DADOS PARTICIONADOS, SOMENTE PARA LEITURA, SÃO DISTRIBUÍDOS EM VÁRIAS MÁQUINAS (NORMALMENTE EM UM CLUSTER)
  • OS PRINCIPAIS MOTIVOS PELOS QUAIS OS RDDS FUNCIONAM MELHOR PARA O PROCESSAMENTO DISTRIBUÍDO DE DADOS É PORQUE ELES NÃO APRESENTAM ALGUNS DOS PROBLEMAS QUE O PARADIGMA MAIS ANTIGO DE PROCESSAMENTO DE DADOS − O MAPREDUCE − TEM, (O SPARK ESTÁ SUBSTITUINDO CADA VEZ MAIS O HADOOP MAP-REDUCE)

PORQUE?

  • REPLICAÇÃO: REPLICAÇÃO DE DADOS EM DIFERENTES PARTES DE UM CLUSTER É UM RECURSO DO HDFS QUE PERMITE QUE OS DADOS SEJAM ARMAZENADOS DE MANEIRA TOLERANTE A FALHAS. OS RDDS DO SPARK TRATAM A TOLERÂNCIA A FALHAS USANDO UM GRAFO DE LINHAGEM. O NOME DIFERENTE (RESILIENTE, EM OPOSIÇÃO A REPLICADO) INDICA ESSA DIFERENÇA DE IMPLEMENTAÇÃO NA FUNCIONALIDADE PRINCIPAL DO SPARK
  • SERIALIZAÇÃO: A SERIALIZAÇÃO NO MAPREDUCE PREJUDICA, EM TERMOS DE VELOCIDADE, OPERAÇÕES COMO SHUFFLING E SORTING
  • DISK IO: EMBORA O APACHE SPARK POSSA ARMAZENAR EM CACHE E PERSISTIR RDDS PARA ECONOMIZAR TEMPO DURANTE A COMPUTAÇÃO EM MEMÓRIA, OS RDDS SÃO PRINCIPALMENTE UM MECANISMO DE PROCESSAMENTO EM MEMÓRIA , O I/O DE DISCO É DISPENDIOSO E CONSOME TEMPO EM COMPUTAÇÕES PESADAS. EM CADA ESTÁGIO DE UM MAP-REDUCE HÁ I/O DE DISCO, O QUE NÃO ACONTECE COM O SPARK

VAMOS CRIAR UM APLICATIVO SPARK USANDO PYTHON QUE LEIA UM ARQUIVO E CONTE O NÚMERO DE VEZES QUE AS PALAVRAS IRÃO OCORRER NO ARQUIVO E TAMBÉM IGNORE TODAS AS LINHAS VAZIAS

  • PASSO 1: ENTRE NO PYSPARK ABRINDO UM TERMINAL E DIGITANDO:
pyspark
  • PASSO 2: CRIAR UM APLICATIVO SPARK, PRIMEIRO IMPORTANDO O SPARKCONTEXT E O SPARKCONF PARA O PYSPARK COM O COMANDO:
from pyspark import SparkContext, SparkConf
  • PASSO 3: CRIE UM OBJETO DE CONFIGURAÇÃO E DEFINA O NOME DO APLICATIVO:
#Cria a app com o nome WordCount
conf = SparkConf().setAppName("WordCount")
#Instacia o SparkContext
sc = SparkContext.getOrCreate()
  • PASSO 4: CARREGUE O(S) TEXTO(S) PARA O HDFS, NESSE CASO O TEXTO shakespeare.txt

#Dentro da pasta local da máquina Cloudera, no meu caso /Toti/textos/ rode o
#comando abaixo para copiar o shakespeare.txt para o HDFS
hadoop fs -put shakespeare.txt /user/toti/textos/
#Cria o RDD com o conteúdo do shakespeare.txt
contentRDD = sc.textFile("/user/toti/textos/shakespeare.txt")
  • PASSO 5:  FILTRAR LINHAS NÃO VAZIAS DO ARQUIVO CARREGADO (shakespeare.txt)
#Elimina as linha em branco
filter_empty_lines = contentRDD.filter(lambda x: len(x) > 0)
  • PASSO 6: DIVIDIR O CONTEÚDO COM BASE NO ESPAÇO
#Splita as palavras pelo espaço em branco entre elas
words = filter_empty_lines.flatMap(lambda x: x.split(' '))
  • PASSO 7: CONTE AS PALAVRAS
#Map-Reduce da contagem das palavras
wordcount = words.map(lambda x:(x,1)) \
.reduceByKey(lambda x, y: x + y) \
.map(lambda x: (x[1], x[0])).sortByKey(False)
  • PASSO 8: VISUALIZAR O ARQUIVO APÓS O FILTRO

#Imprime o resultado
for word in wordcount.collect():
    print(word)

#Salva o resultado no HDFS dentro da pasta /user/toti/textos/Wordcount/

wordcount.saveAsTextFile("/user/toti/textos/Wordcount")
  • PASSO 9: SALVE OS DADOS FINAIS NO HDFS
#Salva o resultado no HDFS dentro da pasta /user/toti/textos/Wordcount/
wordcount.saveAsTextFile("/user/toti/textos/Wordcount")

Obrigado

Até a próxima

Aula 06 – Contagem de palavras com PySpark

About The Author
-

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>