Ao trabalhar com bancos de dados distribuídos como o Couchbase, o desempenho e a eficiência são considerações importantes, especialmente ao recuperar uma grande quantidade de dados. Muitas vezes, quando os clientes vêm de diferentes experiências em desenvolvimento ou banco de dados, eles perguntam sobre a capacidade do Couchbase de fazer operações “multi-get” ou “bulk get”. Muitos bancos de dados oferecem “multi-get” como um método pronto para uso para recuperar vários documentos a serem executados com base em suas chaves. A maioria dos SDKs do Couchbase não oferece APIs explícitas para a obtenção de lotes porque a programação reativa oferece a flexibilidade de implementar lotes adaptados ao seu caso de uso específico e, muitas vezes, é mais eficaz do que um método genérico de tamanho único.
O que é Bulk Get?
Uma operação de obtenção em massa permite que você solicite vários documentos em uma única operação, em vez de fazer chamadas GET individuais repetidas. Nos armazenamentos tradicionais de valores-chave, cada solicitação geralmente é direcionada a um nó específico. No entanto, em um ambiente distribuído como o Couchbase, a distribuição dessas operações entre os nós pode gerar sobrecarga se for gerenciada manualmente.
Suporte do SDK para operações em massa
Os SDKs do Couchbase (incluindo Java, .NET e Go) oferecem suporte integrado para operações de obtenção em massa. Esses métodos do SDK foram projetados para aceitar uma lista de chaves de documentos e gerenciar automaticamente a execução paralela de operações individuais de OBTER de forma eficiente devido a três motivos principais.
-
- Paralelismo: Em vez de buscar cada documento sequencialmente, os SDKs iniciam várias solicitações ao mesmo tempo.
- Direcionamento de nós: Os SDKs encaminham de forma inteligente cada solicitação para o nó correto no cluster em que os dados residem.
- Execução assíncrona: Aproveitando os recursos assíncronos de cada SDK, as operações são tratadas de forma não bloqueada, garantindo maior rendimento e melhor utilização dos recursos.
O Couchbase oferece duas maneiras principais de obter a capacidade de obtenção em massa usando a programação reativa e a programação assíncrona.
API reativa
Se o seu objetivo é otimizar as operações de obtenção em massa no Couchbase, a programação reativa oferece uma abordagem eficiente e mais fácil. O protocolo binário do Couchbase tem execução fora de ordem e oferece forte suporte a operações assíncronas no KV. Ao gerenciar com eficiência os fluxos de dados assíncronos, ele permite alta taxa de transferência e baixa latência, o que o torna ideal para sistemas distribuídos. Para aproveitar totalmente seus recursos, é ideal uma pilha totalmente reativa em que cada camada, do banco de dados ao cliente, ofereça suporte a fluxos reativos. Os recursos do Couchbase Coleção Reativa integra-se perfeitamente ao Project Reactor, permitindo o acesso totalmente sem bloqueio às operações de valor-chave (KV) do Couchbase. Essa integração se alinha perfeitamente às arquiteturas reativas modernas, permitindo que os aplicativos lidem com cargas de trabalho de alto rendimento de forma mais eficiente, evitando o bloqueio desnecessário de threads.
Dito isso, a migração de todo um aplicativo existente para uma arquitetura reativa pode envolver um trabalho significativo. Se for um projeto novo, é altamente recomendável adotar uma estrutura reativa como o Spring WebFlux. No entanto, mesmo em aplicativos não reativos, a introdução de uma abordagem reativa apenas na camada CRUD do Couchbase pode proporcionar ganhos significativos. Ao fazer isso, você pode minimizar o bloqueio de thread e reduzir o estrangulamento da CPU, o que leva a uma melhor eficiência de recursos e a uma escalabilidade aprimorada.
Abaixo está um exemplo de código Java que pode maximizar o desempenho do Couchbase usando a API reativa e pode funcionar com uma pilha não reativa.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
/** * @param collection The collection to get documents from. * @param documentIds The IDs of the documents to return. * @param mapSupplier Factory for the returned map. Suggestion: * Pass {@code TreeMap::new} for sorted results, * or {@code HashMap::new} for unsorted. * @param concurrency Limits the number of Couchbases requests in flight * at the same time. Each invocation of this method has a separate quota. * Suggestion: Start with 256 and tune as desired. * @param mapValueTransformerScheduler The scheduler to use for converting * the result map values. Pass {@link Schedulers#immediate()} * to use the SDK's IO scheduler. Suggestion: If your value converter does IO, * pass {@link Schedulers#boundedElastic()}. * @param mapValueTransformer A function that takes a document ID and a result, * and returns the value to associated with that ID in the returned map. * @param <V> The return map's value type. * @param <M> The type of the map you'd like to store the results in. * @return a Map (implementation determined by {@code mapSupplier}) * where each given document ID is associated with the result of * getting the corresponding document from Couchbase. */ public static <V, M extends Map<String, V>> Map<String, V> bulkGet( ReactiveCollection collection, Iterable<String> documentIds, int concurrency, Supplier<M> mapSupplier, Scheduler mapValueTransformerScheduler, BiFunction<String, SuccessOrFailure<GetResult>, V> mapValueTransformer ) { return Flux.fromIterable(documentIds) .flatMap( documentId -> Mono.zip( Mono.just(documentId), collection.get(documentId) .map(SuccessOrFailure::success) .onErrorResume(error -> Mono.just(SuccessOrFailure.failure(error))) ), concurrency ) .publishOn(mapValueTransformerScheduler) .collect( mapSupplier, (map, idAndResult) -> { String documentId = idAndResult.getT1(); SuccessOrFailure<GetResult> successOrFailure = idAndResult.getT2(); map.put(documentId, mapValueTransformer.apply(documentId, successOrFailure)); } ) .block(); } } |
Essa abordagem reativa está buscando documentos usando seus IDs e retornando um Mapa em que cada chave é um ID de documento e o valor é o resultado processado. Embora não seja errado coletar os resultados em uma lista e reprocessá-los posteriormente, uma estratégia melhor (tanto em termos de desempenho quanto de clareza do código) é coletar os resultados em um ConcurrentHashMap indexados por ID de documento. Isso evita a varredura repetida e torna as pesquisas de resultados operações em tempo constante. Vamos detalhar como isso funciona passo a passo.
-
- Criação de um fluxo reativo a partir de IDs de documentos
Na linha 19, estamos criando um Fluxo (fluxo reativo) a partir da lista de IDs de documentos. Para cada ID de documento, ele chama collection.get(documentId) para buscar o documento de forma reativa. - A embalagem resulta em SucessoOuFracasso
Para garantir a resiliência, cada operação assíncrona envolve o resultado em um SuccessOrFailure objeto. Esse wrapper captura as buscas bem-sucedidas e as que falharam. Por padrão, se collection.get(documentId) gera um erro (por exemplo, problema de rede, documento ausente), todo o Fluxo apresentará erro e interromperá o processamento. Isso não é ideal para operações em massa, pois queremos continuar processando outros documentos mesmo que um falhe. Portanto, em vez de propagar o erro, ele converte a falha em um SuccessOrFailure.failure(error) objeto. Dessa forma, o downstream ainda recebe um valor válido (SucessoOuFracasso) para cada documentID, seja ele bem-sucedido ou não. - Emparelhamento de IDs de documentos com resultados usando Mono.zip
Usando Mono.zip torna explícito que você está combinando o documentId e o async obter resultado em uma tupla. Isso ajuda a identificar a associação entre documentID e resultado, especialmente quando os resultados chegam fora de ordem devido à simultaneidade. - Concorrência controles quantas buscas de documentos são executadas em paralelo (quantas solicitações estão em andamento ao mesmo tempo).
- Paralelismo e transferência de agendador
Os fluxos reativos não são bloqueados por padrão, mas a lógica de transformação (por exemplo, análise de JSON, conversão de dados) pode consumir muita CPU. Antes de coletarmos as tuplas resultantes, o fluxo alterna para um agendador especificado pelo chamador usando publishOn(...). Isso descarrega o trabalho de transformação dos threads de IO em um pool de threads separado. Isso garante que os threads de IO não sejam bloqueados pelo trabalho de transformação devido à computação pesada.
- Coleta em um mapa
Quando todos os resultados são recebidos, o fluxo coleta os pares de tuplas em um mapa. Ele usa mapSupplier para criar o mapa. Para cada (documentId, result) par, ele se aplica mapValueTransformer para transformar o resultado bruto em um tipo específico do domínio V e, em seguida, coloca o valor transformado no mapa.
- Bloqueio para obter o resultado final
Como tudo aqui é assíncrono (sem bloqueio), block() é usado para aguardar a conclusão de todo o fluxo e retornar o mapa para o autor da chamada.
- Criação de um fluxo reativo a partir de IDs de documentos
API assíncrona
Embora recomendemos o uso das APIs reativas por seu desempenho, flexibilidade e tratamento de contrapressão incorporado, o Couchbase também oferece uma API assíncrona de baixo nível para cenários em que você precisa de um controle ainda mais refinado e ajuste de desempenho. No entanto, escrever um código assíncrono eficiente traz seus próprios desafios, pois requer um gerenciamento cuidadoso da concorrência e da contrapressão para evitar a exaustão de recursos e timeouts.
Veja a seguir um exemplo que demonstra como usar a API Async para melhorar o desempenho da obtenção em massa no Couchbase:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// async api to call get() for an array of keys List<CompletableFuture<GetResult>> futures = new LinkedList<>(); for (int i = 0; i < keys.size(); i++) { CompletableFuture<GetResult> f = collection.async().get( keys.get(i), (GetOptions) options ); futures.add(f); } // Wait for all Get operations to complete CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); // Convert results to JsonObjects List<JsonObject> results = new LinkedList<>(); for (CompletableFuture<GetResult> future : futures) { try { JsonObject json = future.join().contentAsObject(); results.add(json); } catch (CompletionException e) { e.printStackTrace(); results.add(null); // or skip / handle differently } } |
Vamos explicar passo a passo como isso funciona.
-
- Buscar documentos
Aqui, iteramos sobre as chaves e, para cada chave, chamamos collection.async().get(key, options), que retorna um CompletableFuture e, em seguida, armazenamos todos esses futuros em uma lista.
- Aguardar a conclusão de todas as buscas
CompletableFuture.allOf(...) cria um novo futuro que é concluído quando todos os futuros na matriz são concluídos..join() bloqueia o thread atual até que todas as buscas assíncronas sejam concluídas.
- Transformar resultados
Quando todas as buscas estiverem concluídas, criaremos outra lista para manter os valores finais em Lista. Para cada CompletableFuture, recuperamos e transformamos o resultado. Dependendo do requisito, você pode tratar o erro de falha adicionando nulo para a lista no lugar do resultado de falha ou de um objeto marcador de erro.
A etapa de transformação pressupõe que a busca dos documentos esteja concluída antes da transformação dos resultados; no entanto, se o objetivo for continuar encadeando operações assíncronas, poderemos criar uma lista de futuros Lista<CompletableFuture> e envolver a transformação em outro wrapper assíncrono.
- Buscar documentos
Recomendamos usar essa API somente se estiver escrevendo código de integração para mecanismos de simultaneidade de nível superior ou se realmente precisar da última gota de desempenho. Em todos os outros casos, a API reativa (para riqueza de operadores) é provavelmente a melhor opção.
Conclusão
A programação reativa oferece uma das maneiras mais eficientes de obter alto desempenho para operações de obtenção em massa com o Couchbase. Seu verdadeiro poder é revelado quando aplicado em uma pilha totalmente reativa, em que o comportamento sem bloqueio e a escalabilidade são totalmente otimizados.
Dito isso, você não precisa de uma arquitetura totalmente reativa para começar a colher os benefícios. Uma primeira etapa prática e impactante é migrar apenas a camada CRUD do Couchbase para reativa. Fazer isso pode reduzir drasticamente o bloqueio de thread e minimizar a limitação da CPU, levando a uma melhor capacidade de resposta do sistema e utilização de recursos sem exigir uma revisão completa da arquitetura.
Se o desempenho e a escalabilidade forem prioridades, vale a pena investir na programação reativa, mesmo em uma implementação parcial.
O autor agradece a equipe do SDK do Couchbase e sua excelente explicação sobre como podemos obter o agrupamento de forma eficiente sem a necessidade de uma função genérica de obtenção em massa.