TabNewsLake - Um Datalake para nossas News
TabNewsLake
Consumo da API pública do TabNews para criação de um Datalake.
Sobre
Essa iniciativa tem como objetivo as seguintes ações em relação aos dados:
- Coletar
- Armazenar
- Organizar
- Analisar
Bem como compartilhar os resultados obtidos e fazer uso dos mesmos dados para treinamentos voltados à área de Engenharia, Ciência e Análise de dados.
Etapas
1. Ingestão
No primeiro momento estamos coletando os dados diretamente da API fornecida pelo time da TabNews. Utilizamos como referência este post do GabrielSozinho.
Para coletar os dados estamos utilizando um script Python com a biblioteca requests no endpoint https://www.tabnews.com.br/api/v1/contents . Onde a busca é feita do content mais recente para o mais antigo. Vale dizer que por hora, fazemos uma varredura iterativa até que sejam coletados todos os contents com os devidos status atualizado. Isto é, com o valor atualizado de TabCoins, número de contents filhos, etc.
Foi aberta uma issue sugerindo a possibilidade de receber os contents mais atulizados no lugar dos mais recentes e/ou relevantes, assim, não seria necessário passar por todos contents da plataforma para atualizar a base toda.
def get_data(**kwargs):
url = "https://tabnews.com.br/api/v1/contents"
resp = requests.get(url, params=kwargs)
return resp
O script Python criado, em sua primeira versão, realizaria a ingestão diretamente no Datalake utilizando Apache Spark, entretando, o custo envolvido para disponibilizar o cluster não faria sentido para o trabalho. Assim, como alternativa (e melhor opção na realidade), subimos um delivery stream no Kinesis Data Firehose na AWS. Desta forma, o script Python realiza um PUT diretamente no Firehose, onde o mesmo entrega o dado no S3 em formato JSON.
def save_with_firehose(data, firehose_client):
data = [[i] for i in data]
d = json.dumps(data)[1:-1].replace("], [", "]\n[") + "\n"
firehose_client.put_record(
DeliveryStreamName="tabnews-contents",
Record={"Data": d},
)
return None
Levou um tempo para descobrir o formato ideal de como enviar os dados para o Firehose de maneira que o Apache Spark fosse capaz de ler, mas deu certo!
Para executar o script python basta:
$ python src/raw/contents.py --help
usage: contents.py [-h] [--date DATE] [--save_type {firehose,spark}]
optional arguments:
-h, --help show this help message and exit
--date DATE Data limite para busca (mais antiga): YYYY-MM-DD
--save_type {firehose,spark}
Modo de salvar os dados
Assim, caso deseje coletar dados do dia atual até 01/06/2022, enviando os dados para seu delivery stream:
$ python src/raw/contents.py --date 2022-06-01 --save_type firehose
2. Consumo e criação de tabela em Bronze
Estamos trabalhando com camadas de dados. Isto é, separando em níveis a qualidade de nosso dado. O S3 onde o dado é entregue pelo Firehose chamamos de Raw. Nesta camada, pode haver dados duplicados, com muitos arquivos (pois é onde chega tudo no formato mais cru) e com possíveis inconsistências.
Para criar tabelas que sejam mais fáceis de se trabalhar, e aplicar a delicinha do SQL, criamos a camada Bronze, utilizando Apache Spark Structed Streaming com Auto Loader do Databricks. Com esses carinhas, podemos ler os dados em Raw e gravá-los em Bronze no formato Delta, facilitando e otimizando a leitura, bem como garantindo maior integridade dos dados.
Ainda que o cluster de Apache Spark não fique ligado 24/7 esperando os dados de Raw chegarem, utilizamos o CloudFiles com Stream, o que facilita nossa vida, pois o mesmo é capaz de gerenciar quais arquivos ele já leu e quais não, iteração por iteração. Ou seja, ao realizar o processamento dos dados em Raw, ele lê, processa e armazena em Bronze os dados, na sua próxima iteração (agendada), haverão dados novos (ou não) em Raw, assim, ele lerá apenas esses novos dados e fará seu processo normalmente (mesmo que estejam no mesmo bucket://folder que os dados antigos, ignorando-os).
df_stream = (spark.readStream
.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "json")
.load(raw_path))
stream = (df_stream.writeStream
.trigger(once=True)
.option("checkpointLocation", checkpoint_path)
.foreachBatch(lambda df, batchId: upsert(df, df_delta, table))
.start())
Próximos passos
- Realizar estatísticas descritivas dos posts e usuários da plataforma
Bônus
Algumas estatísticas a partir dos dados coletados:
Estatísticas por data
Quantidade histórica de conteúdos postados e usuários distintos realizando posts:
Claro que há interesse em entender o tamanho atual desta plataforma, assim podemos considerar os valores acumulados:
Estatísticas por usuário
Bora acompanhar os top 10 usuários que mais realizando posts por aqui?
Podemos fazer o mesmo para TabCoins? Está na mão!
Por enquanto é isso.
Muito legal os gráficos gerados, principalmente os dois primeiros. O TabNews tem a página /status que contém só algumas informações básicas. Será legal um dia poder consumir esse tipo de informação diretamente por lá, mesmo que os gráficos sejam atualizados apenas uma vez ao dia para evitar um consumo excessivo de processamento.
Excelente trabalho teomewhy! As ferramentas para datalake da AWS são muito boas, não só para estatística, como também para otimização de processos. Na empresa onde eu trabalho, conseguimos otimizar um processo importante de negócio (que não posso dar detalhes aqui, por questão de segurança) onde o processo demorava cerca de 12 horas para finalizar e conseguimos reduzir esse tempo para cerca 1h, somente utilizando datalake e lambda.
Que sensacional, tanto a abordagem dos dados quanto toda descrição do que foi feito aqui nessa publicação! Vou divulgar no meu LinkedIn amanhã 🎉
Uma dúvida: no último gráfico, ele representa a soma das TabCoins dos contents
de cada usuário? Se sim, isso inclui tanto contents com e sem parent_id
?
Muito legal Téo, meus parabéns. Seria legal subir essa base no Kaggle para que mais pessoas rodem análises. Vou pegar os dados e gerar algumas análises aqui.
Caramba, se eu te disser que inicei uma tentativa de webscraping, mas de uma forma bem mais caseira, acredita?!
Muito boa a sua ideia e conteúdo, parabéns!
Fala aí Teo!? Muito bacana o seu conteúdo... parabéns meu caro.
Deixo uma sugestão...
Fazer a infraestrutura do DL com Terraform e compartilhar junto com o código do projeto... T+
Muito bom! Caramba que baita exemplo do uso do delta e cloud files com streaming :D
Show de bola Teo!
Caramba! Que dahora! Ainda mais pra mim que estou começando nesse mundo Python!! Obrigado mesmo!
Muito show! Parabéns!!!