O RxJava é uma ferramenta incrível para programação reativa que também é útil como um leitor de CSV em Java.
Caso você nunca tenha usado ou sequer ouvido falar do RxJava, ele é uma forma de programar com fluxos de dados assíncronos. É também uma programação baseada em eventos em que você pode observar ou ouvir os dados do fluxo e fazer algo quando esse fluxo for descoberto. Os fluxos de dados podem ser qualquer coisa, desde variáveis até estruturas de dados.
Para este exemplo, digamos que uma linha de dados CSV seja um fluxo de dados. Muitos desenvolvedores precisam carregar dados CSV em seus bancos de dados com mais frequência do que esperam. Na maioria dos cenários, não é possível simplesmente ler as linhas de um arquivo CSV e inseri-las em um banco de dados. Neste exemplo, usaremos o RxJava para assinar um fluxo de dados e transformá-lo em Java para ler os arquivos CSV que serão salvos no Couchbase.
Os requisitos
Não há muitos requisitos para colocar esse projeto em funcionamento. No mínimo, você precisará do seguinte:
- JDK 1.8+
- Apache Maven 3.3+
- Servidor Couchbase 4.1+
Todo o desenvolvimento e o trabalho ocorrerão com o JDK 1.8 e o Maven, e isso inclui a execução do aplicativo.
Entendendo o conjunto de dados e o modelo de dados
Uma ótima maneira de se familiarizar com o RxJava é obter um conjunto de dados de amostra para brincar. Para simplificar, vamos inventar nosso próprio arquivo de valores separados por vírgula (CSV), mas se quiser algo mais extravagante, visite o site de ciência de dados, Kaggle.
Vamos supor que nosso conjunto de dados CSV simples tenha as seguintes colunas por linha:
- id
- primeiro_nome
- sobrenome
Do ponto de vista de consulta e análise, trabalhar com os dados no formato CSV é quase impossível. Em vez disso, esses dados serão armazenados como dados NoSQL para que possam ser processados posteriormente. Não entraremos na análise de números e nas consultas aqui, mas isso será abordado em um artigo futuro. No momento, queremos apenas colocá-los no formato NoSQL.
Quando carregado no Couchbase, cada linha do CSV terá uma aparência semelhante à seguinte:
1 2 3 4 5 6 7 8 |
{ "id": 1, "first_name": "Nic", "last_name": "Raboy", "twitter": "nraboy" } |
Sim, o bloco de dados acima é um documento JSON, que é o que o Couchbase suporta. Agora que conhecemos os objetivos dos dados, podemos começar a carregar os dados CSV no Couchbase com o RxJava.
Transformando os dados brutos e gravando no Couchbase
Para usar o RxJava para carregar dados CSV por meio de um aplicativo Java, algumas dependências devem ser incluídas. Precisamos incluir o RxJava, o OpenCSV e o Couchbase Java SDK. Como estamos usando o Maven, todas podem ser incluídas por meio do Maven pom.xml arquivo. Para incluir o RxJava, inclua a seguinte dependência em seu arquivo Maven:
1 2 3 4 5 6 7 |
io.reativox rxjava 1.1.2 |
Como os dados brutos estarão na forma de CSV, podemos usar a biblioteca CSV de código aberto para Java chamada OpenCSV. A dependência do Maven para o OpenCSV pode ser adicionada desta forma:
1 2 3 4 5 6 7 |
com.opencsv opencsv 3.7 |
Por fim, o Java precisa estar conectado ao Servidor Couchbase. Isso pode ser feito por meio do SDK Java do Couchbase. Para adicionar essa dependência em seu projeto Maven, adicione o seguinte ao seu arquivo pom.xml file:
1 2 3 4 5 6 7 |
com.couchbase.cliente java-cliente 2.2.0 |
Todas as dependências do projeto estão prontas para funcionar!
Para carregar, mas não ler, o arquivo CSV, você criará um novo arquivo CSVReader
da seguinte forma:
1 2 3 |
CSVReader leitor = novo CSVReader(novo Leitor de arquivos("PATH_TO_CSV_FILE")); |
Como esses dados acabarão sendo processados no Couchbase, devemos nos conectar ao nosso servidor e abrir nosso bucket.
1 2 3 |
Balde balde = CouchbaseCluster.criar("http://localhost:8091").openBucket("default", ""); |
O procedimento acima pressupõe que o Couchbase esteja sendo executado localmente e que os dados serão salvos no bucket padrão sem uma senha.
Para processar o conjunto de dados CSV, pode ser criado um observável RxJava:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
Observável .de(leitor) .mapa( csvRow -> { JsonObject objeto = JsonObject.criar(); objeto .colocar("first_name", csvRow[1]) .colocar("last_name", csvRow[2]) .colocar("twitter", csvRow[3]); retorno JsonDocument.criar(csvRow[0], objeto); } ) .assinar(documento -> balde.upsert(documento), erro -> Sistema.fora.println(erro)); |
Para detalhar o que está acontecendo no Observable, ocorrem as seguintes etapas.
O CSVReader
cria um Iterável
. O Observable usará o Iterável
como a fonte de dados usando o .de
método.
Os dados lidos serão uma matriz de strings, não algo que possa ser armazenado diretamente no banco de dados. Usando o .mapa
a matriz de strings pode ser transformada em qualquer coisa que decidirmos. Nesse caso, o objetivo é mapear cada linha do CSV para um documento do Couchbase. Durante esse processo de mapeamento, podemos fazer uma limpeza adicional dos dados. Por exemplo, poderíamos fazer algo como csvRow[*].trim()
para remover qualquer espaço em branco à esquerda e à direita em cada coluna CSV.
Finalmente, com cada linha de leitura processada, ela deve ser salva no Couchbase. O .assinar
se inscreverá nas notificações que o Observable emite, nesse caso, os dados manipulados.
Conclusão
Você acabou de experimentar o carregamento de dados CSV sujos no Couchbase usando o RxJava e o Couchbase Java SDK. Ao fazer programação reativa, você pode executar ações em qualquer fluxo de dados que estiver observando. Em nosso cenário, queríamos apenas carregar um arquivo CSV no Couchbase.
Se você tiver um conjunto de dados enorme, poderá levar este tutorial para o próximo nível usando o Apache Spark. Escrevi um carregador de CSV muito semelhante encontrado aqui que faz uso do Spark.