Recentemente, vi uma pergunta em nossos fóruns em que alguém queria mover alguns dados do CouchDB para o Couchbase. Como costumo ajudar amigos que precisam de ajuda quando estão se mudando, pensei em ajudar. A mudança requer preparação, especialmente com móveis grandes como o Couch :D
Desculpe-me por essa metáfora, mas agora vou explicar como fiz isso. Meu objetivo é mover os dados do CouchDB para o Couchbase. Portanto, a primeira pergunta é: como obter os dados do CouchDB? Há várias opções disponíveis aqui e a mais direta para mim foi usar a API REST. Se você usar o endpoint _all_docs com o parâmetro include_doc definido como true, obterá todos os documentos. Isso é exatamente o que eu preciso.
Agora, em vez de fazer o download da resposta REST diretamente, posso usar a API de fluxo do Java 8. E como usarei o RxJava como parte do nosso SDK, preciso envolver esse fluxo em um observável. Na verdade, isso é bem simples:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
URL url = new URL(couchDBRequest); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod(“GET”); conn.setRequestProperty(“Accept”, “application/json”); //assume this is going to be a big file… conn.setReadTimeout(0); if (conn.getResponseCode() != 200) { throw new RuntimeException(“Failed : HTTP error code : ” + conn.getResponseCode()); } BufferedReader inp2 = new BufferedReader(new InputStreamReader(conn.getInputStream())); final long[] totalRows = new long[2]; int count = Observable.from(inp2.lines()::iterator).count().toBlocking().single(); |
Com isso, tenho um observável que emite documentos do CouchDB. A próxima etapa é, obviamente, pegar esses documentos e enviá-los ao Couchbase. Primeiro, usarei um flatMap para analisar cada linha da resposta. Isso é fácil de fazer porque cada linha da resposta contém um documento, como você pode ver:
|
1 2 3 4 5 6 |
{"total_rows":2,"offset":0,"rows":[ {"id":"f750a4273b48b6c1146fe4ead1000c1b","key":"f750a4273b48b6c1146fe4ead1000c1b","value":{"rev":"2-bea53b374bf5a427ab15245bc029cac0"},"doc":{"_id":"f750a4273b48b6c1146fe4ead1000c1b","_rev":"2-bea53b374bf5a427ab15245bc029cac0","title":"A title"}}, {"id":"f750a4273b48b6c1146fe4ead1000c24","key":"f750a4273b48b6c1146fe4ead1000c24","value":{"rev":"1-9baa68f46c29940ad7a6d57ae1a04002"},"doc":{"_id":"f750a4273b48b6c1146fe4ead1000c24","_rev":"1-9baa68f46c29940ad7a6d57ae1a04002","title":"Another Title"}} ]} |
Tenho que tratar a primeira e a última linha separadamente. Na primeira linha, obtenho o total_rows e as informações de deslocamento. Não preciso fazer nada na última linha. Simplesmente retorno Observable.empty() para essas duas linhas, pois não tenho nada para alimentar o próximo operador. Todas as outras linhas contêm uma linha editada pelo CouchDB. Cada uma dessas linhas contém um documento JSON que podemos envolver em um JsonNode.
O operador seguinte também é um flatMap. Aqui eu extraio a chave do documento e seu conteúdo como uma string. Como tenho um objeto Json como String, não preciso de nenhum tipo de mapeamento usando Jackson ou algo semelhante. Posso usar diretamente um RawJsonDocument. Depois de ter o RawJsonDocument, posso importá-lo para o Couchbase. Para fazer isso, uso o método upsert. É um pouco como uma coisa "sem perguntas". Não nos importamos se o documento existe ou não: se não existir, ele será criado, enquanto que, se existir, será substituído. Esse pode não ser o comportamento que você deseja, mas é o mais simples para esse cenário, pois significa que não preciso lidar com erros quando uma chave já existe.
Atribuo um tempo limite de 500 milissegundos à operação, pois ela realmente não deve demorar muito. Em seguida, uso o RetryBuilder. É um bom auxiliar adicionado por Simon Basle para gerenciar facilmente a repetição de erros. Aqui eu tento novamente até 100 vezes se receber uma RequestCancelledException. Adiciono um atraso arbitrário de 31 segundos antes de cada nova tentativa. Faço o mesmo com a TemporaryFailureException e a BackpressureException. Aqui, em vez disso, uso um atraso de 100 milissegundos.
Em seguida, uso doOnError e doOnNext para registrar a chave do documento no arquivo de registro de sucesso ou no arquivo de registro de erros. Os métodos doOnX não alteram a semântica central do fluxo (como a transformação de dados, por exemplo), mas adicionam algum comportamento adicional, "efeitos colaterais". Nesse caso, escrevo uma String em um arquivo de registro usando FileWriter. Infelizmente, isso é síncrono e bloqueador. Talvez eu altere isso para usar um registrador assíncrono.
Em seguida, uso onErrorResumeNext para garantir que a importação continue mesmo se houver erros. Por fim, uso count().toBlocking().single() para saber quantas inserções fiz no Couchbase. Comparo o resultado com o número de total_rows no final.
No final, o código tem a seguinte aparência:
Ainda há muitos aprimoramentos possíveis. Por exemplo, ter uma opção de configuração para escolher o nível de consistência que você deseja durante a importação (opções PersistTo e ReplicateTo). Também seria bom fornecer o registro de erros contendo apenas as chaves dos documentos que não foram importados como entrada para o script. Dessa forma, você pode reproduzir uma importação apenas dos documentos com erro.
De qualquer forma, espero que tenha ajudado.