특히 대량의 데이터를 검색할 때 Couchbase와 같은 분산 데이터베이스로 작업할 때 성능과 효율성은 핵심적인 고려 사항입니다. 다양한 개발 또는 데이터베이스 배경을 가진 고객들이 “다중 가져오기” 또는 “대량 가져오기” 작업을 수행할 수 있는 Couchbase의 기능에 대해 문의하는 경우가 많습니다. 많은 데이터베이스는 키를 기반으로 수행할 여러 문서를 검색하는 기본 방법으로 “다중 가져오기”를 제공합니다. 반응형 프로그래밍은 특정 사용 사례에 맞게 일괄 처리를 구현할 수 있는 유연성을 제공하며 획일적이고 일반적인 방법보다 더 효과적인 경우가 많기 때문에 대부분의 Couchbase SDK는 일괄 처리를 위한 명시적인 API를 제공하지 않습니다.
대량 가져오기란 무엇인가요?
일괄 가져오기 작업을 사용하면 개별 GET 호출을 반복하지 않고 한 번의 작업으로 여러 문서를 요청할 수 있습니다. 기존의 키-값 저장소에서는 각 요청이 일반적으로 특정 노드를 대상으로 합니다. 하지만 카우치베이스와 같은 분산 환경에서는 이러한 작업을 여러 노드에 분산하면 수동으로 관리할 경우 오버헤드가 발생할 수 있습니다.
대량 작업을 위한 SDK 지원
Couchbase SDK(Java, .NET, Go 포함)는 일괄 가져오기 작업을 기본적으로 지원합니다. 이러한 SDK 메서드는 문서 키 목록을 받아들이고 개별 문서 키의 병렬 실행을 자동으로 관리하도록 설계되었습니다. GET 요청을 효율적으로 처리하는 데는 크게 세 가지 이유가 있습니다.
-
- 병렬 처리: SDK는 각 문서를 순차적으로 가져오는 대신 여러 요청을 동시에 시작합니다.
- 노드 타겟팅: SDK는 각 요청을 데이터가 있는 클러스터의 올바른 노드로 지능적으로 라우팅합니다.
- 비동기 실행: 각 SDK의 비동기 기능을 활용하여 작업이 비차단 방식으로 처리되므로 처리량이 증가하고 리소스 활용도가 향상됩니다.
카우치베이스는 반응형 프로그래밍과 비동기 프로그래밍을 사용하여 대량 가져오기 기능을 달성하는 두 가지 주요 방법을 제공합니다.
리액티브 API
Couchbase에서 대량 가져오기 작업을 최적화하려는 경우, 리액티브 프로그래밍이 효율적이고 쉬운 접근 방식을 제공합니다. 카우치베이스의 바이너리 프로토콜은 순서 외 실행을 지원하며 KV에서 비동기 작업을 강력하게 지원합니다. 비동기 데이터 흐름을 효율적으로 관리함으로써 높은 처리량과 짧은 지연 시간을 구현하여 분산 시스템에 이상적입니다. 이 기능을 최대한 활용하려면 데이터베이스부터 클라이언트까지 각 계층이 반응형 스트림을 지원하는 완전한 반응형 스택이 이상적입니다. 카우치베이스의 반응형 컬렉션 는 Project Reactor와 원활하게 통합되어 Couchbase 키-값(KV) 작업에 대한 완전한 비차단 액세스를 가능하게 합니다. 이러한 통합은 최신 반응형 아키텍처와 완벽하게 일치하므로 애플리케이션이 불필요한 스레드 차단을 방지하여 처리량이 많은 워크로드를 보다 효율적으로 처리할 수 있습니다.
즉, 기존 애플리케이션 전체를 리액티브 아키텍처로 마이그레이션하려면 상당한 작업이 필요할 수 있습니다. 새로운 프로젝트인 경우 Spring WebFlux와 같은 반응형 프레임워크를 채택할 것을 강력히 권장합니다. 그러나 반응형 애플리케이션이 아닌 경우에도 Couchbase CRUD 계층에서 반응형 접근 방식을 도입하는 것만으로도 의미 있는 이점을 얻을 수 있습니다. 이렇게 하면 스레드 차단을 최소화하고 CPU 스로틀링을 줄여 리소스 효율성을 높이고 확장성을 개선할 수 있습니다.
아래는 리액티브 API를 사용하여 Couchbase의 성능을 극대화하고 비리액티브 스택과 함께 작동할 수 있는 Java 코드의 예시입니다.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
/** * @param collection The collection to get documents from. * @param documentIds The IDs of the documents to return. * @param mapSupplier Factory for the returned map. Suggestion: * Pass {@code TreeMap::new} for sorted results, * or {@code HashMap::new} for unsorted. * @param concurrency Limits the number of Couchbases requests in flight * at the same time. Each invocation of this method has a separate quota. * Suggestion: Start with 256 and tune as desired. * @param mapValueTransformerScheduler The scheduler to use for converting * the result map values. Pass {@link Schedulers#immediate()} * to use the SDK's IO scheduler. Suggestion: If your value converter does IO, * pass {@link Schedulers#boundedElastic()}. * @param mapValueTransformer A function that takes a document ID and a result, * and returns the value to associated with that ID in the returned map. * @param <V> The return map's value type. * @param <M> The type of the map you'd like to store the results in. * @return a Map (implementation determined by {@code mapSupplier}) * where each given document ID is associated with the result of * getting the corresponding document from Couchbase. */ public static <V, M extends Map<String, V>> Map<String, V> bulkGet( ReactiveCollection collection, Iterable<String> documentIds, int concurrency, Supplier<M> mapSupplier, Scheduler mapValueTransformerScheduler, BiFunction<String, SuccessOrFailure<GetResult>, V> mapValueTransformer ) { return Flux.fromIterable(documentIds) .flatMap( documentId -> Mono.zip( Mono.just(documentId), collection.get(documentId) .map(SuccessOrFailure::success) .onErrorResume(error -> Mono.just(SuccessOrFailure.failure(error))) ), concurrency ) .publishOn(mapValueTransformerScheduler) .collect( mapSupplier, (map, idAndResult) -> { String documentId = idAndResult.getT1(); SuccessOrFailure<GetResult> successOrFailure = idAndResult.getT2(); map.put(documentId, mapValueTransformer.apply(documentId, successOrFailure)); } ) .block(); } } |
이 반응형 접근 방식은 ID를 사용하여 문서를 가져와서 맵 여기서 각 키는 문서 ID이고 값은 처리된 결과입니다. 결과를 목록으로 수집하고 나중에 재처리하는 것이 잘못된 것은 아니지만, (성능 및 코드 명확성 측면에서) 더 나은 전략은 결과를 ConcurrentHashMap 문서 ID로 색인됩니다. 이렇게 하면 반복적인 스캔을 피하고 결과 조회를 상시적으로 처리할 수 있습니다. 어떻게 작동하는지 단계별로 살펴보겠습니다.
-
- 문서 ID에서 리액티브 스트림 만들기
19번 라인에서는 플럭스 (반응형 스트림)을 문서 ID 목록에서 선택합니다. 각 문서 ID에 대해 collection.get(documentId) 를 사용하여 문서를 반응형으로 가져올 수 있습니다. - 래핑 결과는 다음과 같습니다. 성공 또는 실패
복원력을 보장하기 위해 각 비동기 작업은 결과를 래핑하여 성공 또는 실패 객체입니다. 이 래퍼는 성공적인 가져오기와 실패한 가져오기를 모두 캡처합니다. 기본적으로 collection.get(documentId) 가 오류(예: 네트워크 문제, 누락된 문서)를 발생시키면 전체 플럭스 스트림에 오류가 발생하여 처리가 중지됩니다. 이는 하나의 문서가 실패하더라도 다른 문서를 계속 처리해야 하는 대량 작업에는 적합하지 않습니다. 따라서 오류를 전파하는 대신 실패를 성공 또는 실패.실패(오류) 객체를 반환합니다. 이렇게 하면 다운스트림은 여전히 유효한 값(성공 또는 실패), 성공 또는 실패 여부에 관계없이 모든 문서ID에 대해. - 다음을 사용하여 문서 ID와 결과 페어링하기 Mono.zip
사용 Mono.zip 를 사용하면 명시적으로 documentId 및 비동기 get 결과를 튜플로 변환합니다. 이는 특히 동시성으로 인해 결과가 순서대로 도착하지 않을 때 문서 ID와 결과 간의 연관성을 식별하는 데 도움이 됩니다. - 동시성 컨트롤 병렬로 실행되는 문서 가져오기 수(한 번에 실행되는 요청 수)입니다.
- 병렬 처리 및 스케줄러 핸드오프
반응형 스트림은 기본적으로 차단되지 않지만 변환 로직(예: JSON 구문 분석, 데이터 변환)은 CPU를 많이 사용할 수 있습니다. 결과 튜플을 수집하기 전에 스트림은 다음을 사용하여 호출자가 지정한 스케줄러로 전환합니다. publishOn(...). 이렇게 하면 변환 작업이 IO 스레드에서 별도의 스레드 풀로 오프로드됩니다. 이렇게 하면 과중한 계산으로 인해 변환 작업으로 인해 IO 스레드가 차단되지 않습니다.
- 맵으로 수집하기
모든 결과가 입력되면 스트림은 튜플 쌍을 맵으로 수집합니다. 여기에는 mapSupplier 를 클릭하여 지도를 만듭니다. 각 (문서Id, 결과) 쌍을 적용하면 맵값 트랜스포머 를 사용하여 원시 결과를 도메인별 유형으로 변환합니다. V 를 클릭한 다음 변환된 값을 맵에 넣습니다.
- 최종 결과 검색을 위한 차단
여기의 모든 것이 비동기식(비차단)이기 때문입니다, block() 는 전체 스트림이 완료될 때까지 기다렸다가 생성된 스트림을 반환하는 데 사용됩니다. 지도 를 발신자에게 전달합니다.
- 문서 ID에서 리액티브 스트림 만들기
비동기 API
성능, 유연성, 내장된 백프레셔 처리 기능을 위해 반응형 API를 사용하는 것이 좋지만, 더 세밀한 제어와 성능 조정이 필요한 시나리오를 위해 낮은 수준의 비동기 API도 제공합니다. 그러나 효율적인 비동기 코드를 작성하려면 리소스 고갈을 방지하고 시간 초과를 방지하기 위해 동시성 및 백프레셔를 신중하게 관리해야 하는 등 나름의 어려움이 따릅니다.
아래는 비동기 API를 사용하여 Couchbase에서 대량 가져오기 성능을 향상시키는 방법을 보여주는 예제입니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 |
// async api to call get() for an array of keys List<CompletableFuture<GetResult>> futures = new LinkedList<>(); for (int i = 0; i < keys.size(); i++) { CompletableFuture<GetResult> f = collection.async().get( keys.get(i), (GetOptions) options ); futures.add(f); } // Wait for all Get operations to complete CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); // Convert results to JsonObjects List<JsonObject> results = new LinkedList<>(); for (CompletableFuture<GetResult> future : futures) { try { JsonObject json = future.join().contentAsObject(); results.add(json); } catch (CompletionException e) { e.printStackTrace(); results.add(null); // or skip / handle differently } } |
어떻게 작동하는지 단계별로 살펴보겠습니다.
-
- 문서 가져오기
여기서는 키를 반복하고 각 키에 대해 다음과 같이 호출합니다. collection.async().get(key, options), 를 반환하는 CompletableFuture 를 클릭한 다음 모든 선물을 목록에 저장합니다.
- 모든 가져오기가 완료될 때까지 기다립니다.
CompletableFuture.allOf(...) 는 배열의 모든 퓨처가 완료되면 완료되는 새로운 퓨처를 생성합니다...join() 는 모든 비동기 가져오기가 완료될 때까지 현재 스레드를 차단합니다.
- 변환 결과
모든 가져오기가 완료되면 최종 값을 일반 값으로 저장하기 위해 또 다른 목록을 만듭니다. List. 각 CompletableFuture, 를 호출하면 결과를 검색하고 변환합니다. 요구 사항에 따라 다음 중 하나를 추가하여 실패 오류를 처리할 수 있습니다. null 를 실패한 결과 또는 오류 마커 개체 대신 목록에 추가합니다.
변환 단계에서는 결과를 변환하기 전에 문서 가져오기가 완료되었다고 가정하지만, 비동기 연산을 계속 연쇄하는 것이 목표라면 다음과 같은 경우 목록을 만들 수 있습니다. 목록<완성 가능한 미래> 를 호출하고 다른 비동기 래퍼로 변환을 래핑합니다.
- 문서 가져오기
더 높은 수준의 동시성 메커니즘을 위한 통합 코드를 작성하거나 마지막 한 방울의 성능이 정말 필요한 경우에만 이 API를 사용하는 것이 좋습니다. 그 외의 모든 경우에는 반응형 API(풍부한 연산자를 위한)가 더 나은 선택일 수 있습니다.
결론
리액티브 프로그래밍은 카우치베이스에서 대량 가져오기 작업에서 고성능을 달성할 수 있는 가장 효율적인 방법 중 하나입니다. 논블로킹 동작과 확장성이 완전히 최적화된 전체 리액티브 스택에 적용될 때 그 진정한 성능을 발휘합니다.
하지만 완전한 반응형 아키텍처를 도입해야만 이점을 누릴 수 있는 것은 아닙니다. 실용적이고 영향력 있는 첫 번째 단계는 Couchbase CRUD 레이어만 반응형으로 마이그레이션하는 것입니다. 이렇게 하면 스레드 차단을 획기적으로 줄이고 CPU 스로틀링을 최소화하여 아키텍처를 완전히 개편할 필요 없이 시스템 응답성과 리소스 활용도를 향상시킬 수 있습니다.
성능과 확장성이 우선순위라면 반응형 프로그래밍은 부분적으로 구현하더라도 충분히 투자할 가치가 있습니다.
일반적인 대량 가져오기 기능 없이도 효율적으로 일괄 처리를 수행할 수 있는 방법에 대해 설명해 주신 Couchbase SDK 팀에 감사의 말씀을 전합니다.