이 블로그 게시물에서는 새로운 Java SDK의 필수 구성 요소 중 하나로 RxJava를 선택한 이유와 동기를 설명합니다.

동기 부여

API를 설계하는 방법에는 여러 가지가 있으며 각 방법마다 고유한 장점과 단점이 있습니다. 새로운 API를 설계하는 과정에서 주요 질문 중 하나는 사용자에게 어떻게 노출할 것인가 하는 것이었습니다. 

우리가 스스로에게 물어볼 필요가 없었던 한 가지 질문은 동기식인가 비동기식인가 하는 것이었습니다. 저희는 비동기 API가 여러분이 자주 필요로 하는 성능과 확장성을 얻을 수 있는 유일한 합리적인 방법이며, 비동기에서 동기식으로 전환하는 것이 그 반대보다 훨씬 쉽다고 굳게 믿고 있습니다. 현재 안정적인 SDK(작성 시점 기준 1.4.3)는 이미 비동기 응답을 제공하기 위해 다양한 방식으로 퓨처를 많이 활용하고 있으며, 이는 spymemcached가 처음 이 개념을 API에 도입한 2006/7년으로 거슬러 올라갑니다.

Java Future 인터페이스는 다른 솔루션(예: Scala 퓨처)에 비해 매우 제한적이라는 것은 잘 알려져 있습니다. 또한, 하나의 계산이 다른 계산에 의존하는 비동기 데이터 흐름을 구축해야 하고 모든 것을 비동기화하려는 경우 코딩하기가 약간 까다롭습니다. 최근 버전에서는 리스너에 대한 지원이 추가되어 상황이 상당히 개선되었지만 여전히 이상적인 솔루션은 아닙니다.

지난 몇 년 동안 다른 라이브러리와 패턴이 등장했고, 저희는 이를 면밀히 따랐습니다. 성숙한 개념 중 하나는 Microsoft와 .NET에서 시작된 반응형 확장으로 알려져 있습니다. 이 개념은 애플리케이션이 이벤트 지향적이어야 하고 비동기 방식으로 이벤트에 반응해야 한다는 아이디어를 기반으로 합니다. 데이터로 수행할 수 있는 작업(데이터 수정, 결합, 필터링 등)에 대한 매우 풍부한 연산자 집합을 정의합니다. 최근 넷플릭스는 이를 Java로 포팅하고 RxJava라는 별명을 붙였습니다(현재 이 프로젝트는 넷플릭스 네임스페이스 아래에 있지만 조만간 "io.reactivex"로 이전될 예정입니다). 이 프로젝트는 매우 안정적이며 Scala, Groovy, JRuby와 같은 다른 JVM 언어에 대한 어댑터도 제공하여 지원을 확대하려는 계획과도 잘 맞습니다.

컨셉

Rx의 주요 아이디어는 옵저버블과 그 옵저버를 중심으로 전개됩니다. 이 개념을 처음 접하는 분들을 위해 설명하자면, 옵저버블은 이터러블의 비동기 및 푸시 기반 사촌(또는 더 정식으로 이중이라고도 함)이라고 생각하시면 됩니다. 좀 더 구체적으로 이들의 관계를 설명하면 다음과 같습니다:

이벤트 반복 가능(풀) 관찰 가능(푸시)
데이터 검색 T next() onNext(T)
오류 발견 예외를 던집니다. onError(예외)
완료 반환 onCompleted()

옵저버블에 데이터가 푸시될 때마다, 구독된 모든 옵저브는 onNext() 메서드에서 데이터를 받습니다. 옵저버블이 최종적으로 완료되면(항상 그럴 필요는 없지만) onCompleted 메서드가 호출됩니다. 이제 프로세스의 어느 지점에서든 오류가 발생하면 onError 메서드가 호출되고 Observable도 완료된 것으로 간주됩니다.

문법을 좋아한다면 계약서는 다음과 같습니다: 

온넥스트*(온완료 | 온에러)?

특히 1 또는 N개의 데이터만 반환되는지 여부는 구분되지 않으며, 이는 일반적으로 호출하는 메서드와 문서화 방식에서 유추할 수 있습니다. 어쨌든 프로그래밍 흐름은 바뀌지 않습니다. 다소 추상적이므로 구체적인 예를 살펴보겠습니다. CouchbaseCluster 클래스에는 필요한 모든 리소스를 초기화한 다음 작업할 수 있는 Bucket 인스턴스를 반환하는 openBucket이라는 메서드가 있습니다. 소켓을 열고, 구성을 가져오는 등의 작업에 시간이 걸리는 것을 상상할 수 있으므로 이 메서드는 완벽한 후보입니다. 차단 API는 다음과 같습니다:

인터페이스 클러스터 {
        버킷 열기 버킷(문자열 이름, 문자열 암호);
}

어떻게 비동기로 만들 수 있을까요? 옵저버블로 래핑해야 합니다:

인터페이스 클러스터 {
        관찰 가능 openBucket(문자열 이름, 문자열 비밀번호);
}

이제 사용할 수 있는 버킷 인스턴스와 함께 반환되는 옵저버블을 반환합니다. 옵저버를 추가해 보겠습니다:

클러스터.오픈버킷().구독(new 옵저버<버킷>() {
    오버라이드
    public void on완료() {
        시스템.out.println("관찰 가능 완료!");
    }

    오버라이드
    public void onError(던지기 가능 e) {
        시스템.err.println("무슨 일이 생겼어요");
        e.프린트스택트레이스();
    }

    오버라이드
    public void onNext(버킷 버킷) {
        시스템.out.println("버킷을 받았습니다: " + 버킷);
    }
});

이 메서드는 다른 스레드에서 호출되므로 코드를 이렇게 남겨두고 메인 스레드를 종료하면 아무 것도 표시되지 않을 수 있습니다. 이제 나머지 코드를 모두 onNext 메서드에 작성할 수도 있지만, 이것이 최선의 방법은 아닐 수 있습니다. 버킷은 
를 미리 열고 싶을 경우, 이를 차단한 다음 나머지 코드를 진행할 수 있습니다. 모든 옵저버블은 이터러블처럼 느껴지는 블로킹 옵저버블로 변환할 수 있습니다:

차단 관찰 가능 블로킹옵저버블 = cluster.openBucket().toBlocking();

차단 방식으로 수신된 그타를 반복하는 방법은 여러 가지가 있지만, 단 하나의 값만 기대하는 경우(저희의 경우처럼) 속기 방법도 있습니다:

버킷 bucket = cluster.openBucket().toBlocking().single();

여기서 내부적으로 일어나는 일은 onNext에서 호출된 값을 저장했다가 onComplete가 호출되면 반환하는 것입니다. onError가 호출되면 throwable이 직접 던져지고 이를 잡을 수 있습니다.

API 통합

지금까지 보신 것은 이제 겨우 표면에 불과합니다. 버킷 오프닝은 미래에도 매우 잘 처리될 수 있습니다. 혼자서. Observables가 빛을 발하는 경우는 둘 이상의 결과를 반환해야 할 때입니다. 이 경우, Future 는 더 이상 적합하지 않으며 향후

다시 구체적인 예를 살펴보겠습니다. SDK는 하나의 문서를 반환하는 get 메서드를 노출합니다. 다음과 같이 보입니다:

인터페이스 버킷 {
        관찰 가능 get(String id);
}

그러나 잠재적으로 둘 이상의 결과를 반환할 수 있는 쿼리(보기, N1QL)도 지원합니다(또는 전혀 반환하지 않을 수도 있음). Observable 계약 덕분에 이와 같은 API를 구축할 수 있습니다:

인터페이스 버킷 {
        관찰 가능 쿼리(뷰 쿼리 쿼리);
}

보이시죠? 컨트랙트에서는 "쿼리를 전달하면 N개의 ViewResults를 반환한다"라고 암시적으로 말하는데, 이는 Observable이 어떻게 동작해야 하는지 알고 있기 때문입니다. 그리고 더 큰 그림을 위해 직관적으로 예상대로 작동하는 더 많은 메서드가 있습니다.

인터페이스 버킷 {
    <D 확장 문서> 관찰 가능<D> insert(D 문서);
<D extends 문서> 관찰 가능<D> 업서트(D 문서);
    <D 확장 문서> 관찰 가능<D> 대체(D 문서);

    관찰 가능<ViewResult> 쿼리(뷰 쿼리 쿼리);
    관찰 가능<QueryResult> 쿼리(쿼리 쿼리);
    관찰 가능<QueryResult> 쿼리(문자열 쿼리);

    관찰 가능<부울> 플러시();
}

내 데이터 흐름을 비동기화하세요!

지금까지 Observables가 어떤 기능을 제공하며, 일관되고 단순하면서도 비동기적인 API를 제공하는 데 어떻게 도움이 되는지 살펴보았습니다. 하지만 Observables는 구성 가능성 측면에서 정말 빛을 발합니다. 이 글에서 모두 다룰 수는 없지만 Observables로 많은 일을 할 수 있습니다. RxJava에는 여기에서 찾을 수 있는 매우 훌륭한 참조 문서가 있으니 확인해 보세요. 비동기 데이터 흐름의 작동 방식을 보여주기 위해 마블 다이어그램을 사용하고 있는데, 이 또한 향후 문서의 일부로 제공하고자 하는 것입니다.

실제 예를 들어 보겠습니다: 사용자 세부 정보가 포함된 완전한 JSON 객체인 카우치베이스에서 문서를 로드하고 싶지만 코드의 더 아래쪽에서 이름에 대한 작업을 수행하려고 합니다. 맵 함수를 사용하여 JsonDocument에서 이름 문자열로 매핑할 수 있습니다:

버킷
    .get("user::1")
    .지도(new Func1<JsonDocument, 문자열>() {
        오버라이드
        public 문자열 통화(제이슨문서 제이슨문서) {
            반환 jsonDocument.콘텐츠().getString("이름");
        }
    })
    .구독(new Action1<문자열>() {
        오버라이드
        public void 통화(문자열 이름) {
            시스템.out.println(이름);
        }
    });

여기에는 두 가지 중요한 측면이 있습니다: 여기서 체인화된 모든 메서드는 비동기적으로 실행되므로 원래 스레드를 차단하지 않습니다. 카우치베이스에 대한 get 호출이 반환되면 JSON 문서에서 이름을 매핑한 다음 마지막으로 인쇄합니다. 완전한 Observer를 제공할 필요는 없으며, onNext 값에만 관심이 있는 경우 해당 값만 구현하면 됩니다(여기에 표시된 것처럼). 더 많은 예제는 오버로드된 메서드를 참조하세요.

또한 여기에서는 일부러 Java 6/7 스타일의 익명 클래스를 보여드리고 있다는 점에 유의하세요. Java 8도 지원하지만 이에 대해서는 나중에 자세히 설명하겠습니다. 이제 이름이 "a"로 시작하는 경우에만 이름을 출력하려면 이 체인을 어떻게 확장할 수 있을까요?

버킷
    .get("user::1")
    .지도(new Func1<JsonDocument, 문자열>() {
        오버라이드
        public 문자열 통화(제이슨문서 제이슨문서) {
            반환 jsonDocument.콘텐츠().getString("이름");
        }
    })
    .필터(new Func1<문자열, 부울>() {
        오버라이드
        public 부울 통화(문자열 s) {
            반환 s.startsWith("a");
        }
    })
    .구독(new Action1<문자열>() {
        오버라이드
        public void 통화(문자열 이름) {
            시스템.out.println(이름);
        }
    });

물론 간단한 if 문으로도 충분하지만, 필터링하는 코드가 훨씬 더 복잡할 수도 있고 다른 것을 호출할 수도 있다고 상상할 수 있습니다. 마지막으로 관측값 변환에 대한 마지막 예로, 문서를 로드하고 콘텐츠를 수정한 다음 다시 couchbase에 저장하는 매우 자주 발생하는 작업을 해보겠습니다:

버킷
    .get("user::1")
    .지도(new Func1<JsonDocument, 제이슨도큐먼트>() {
        오버라이드
        public JsonDocument 호출(JsonDocument 원본) {
            원본.콘텐츠().put("이름", "SomethingElse");
            반환 원본;
        }
    })
    .플랫맵(new Func1<JsonDocument, 관찰 가능<JsonDocument>>() {
        오버라이드
        public 관찰 가능<JsonDocument> 통화(JsonDocument 수정됨) {
            반환 버킷.대체(수정됨);
        }
    }).구독();

FlatMap은 맵과 매우 유사하게 작동하지만, 차이점은 옵저버블 자체를 반환하므로 비동기 연산을 통한 매핑에 매우 적합하다는 점입니다.

또 한 가지 측면은 Observables를 사용하면 정교한 오류 처리가 손끝에서 바로 가능하다는 점입니다. 2초의 타임아웃을 적용하고 호출이 실패하면 다른 것을 반환하는 예제를 구현해 보겠습니다:

버킷
    .get("user::1")
    .시간 초과(2, TimeUnit.)
    .onErrorReturn(new Func1<던지기 가능, JsonDocument>() {
        오버라이드
        public JsonDocument 호출(던지기 가능 던지기 가능) {
            반환 JsonDocument.create("사용자::익명", JsonObject.().put("이름", "john-doe"));
        }
    });

여기서는 2초 내에 get 호출이 반환되지 않으면 더미 문서가 반환됩니다(이 예제에서는 합리적인 기본값을 가정하여). 이것은 간단한 예시일 뿐이지만 재시도, 다른 관측값으로 분기하는 등 예외를 사용하여 많은 작업을 수행할 수 있습니다. 올바른 사용 방법은 공식 문서(및 Rx의 문서)를 참조하세요.

잠깐, 더 있습니다

서로 다른 관측값을 결합(병합, 압축, 연결)하고, 결과를 시간 간격으로 일괄 처리하고, 부작용을 수행하는 등 훨씬 더 많은 기능을 사용할 수 있습니다. 개념을 이해하는 초기 (작은) 장애물을 넘고 나면 매우 자연스러워지고 다시 돌아가고 싶지 않을 것입니다(하지만 저희가 틀렸다면 언제든지 Observable을 차단하거나 미래로 전환할 수 있습니다).

RxJava는 Java 8도 제대로 지원하므로 이미 프로젝트에 사용할 수 있는 운이 좋은 사람이라면 위의 예제를 다음과 같이 단순화할 수 있습니다:

버킷
    .get("user::1")
    .map(jsonDocument -> jsonDocument.content().getString("firstname"))
    .filter(s -> s.startsWith("a"))
    .subscribe(System.out::println);

멋지지 않나요? RxJava는 또한 그 위에 다양한 언어 어댑터를 제공하는데, 현재 Scala, Clojure, Groovy, JRuby 및 Kotlin을 지원합니다. 이러한 어댑터는 더 많은 언어별 통합을 제공하는 데 사용될 수 있으며, 향후 수요에 따라 각 언어에 대한 카우치베이스 지원을 강화하는 데도 일부 사용할 계획입니다. Java SDK를 제외한 최우선 순위는 단연 Scala이므로 조만간 발표될 내용을 기대해 주세요!

여러분도 저희만큼이나 기대가 크시길 바라며, 일반적인 채널을 통해 여러분의 피드백과 질문을 기다리겠습니다!

작성자

게시자 마이클 니칭거

마이클 니칭어는 Couchbase의 수석 소프트웨어 엔지니어로 일하고 있습니다. 그는 JVM에서 최초의 완전 반응형 데이터베이스 드라이버 중 하나인 Couchbase Java SDK의 설계자이자 유지 관리자입니다. 또한 Couchbase Spark Connector를 작성하고 유지 관리하고 있습니다. Michael은 오픈 소스 커뮤니티에서 활발히 활동 중이며, RxJava 및 Netty와 같은 다양한 프로젝트에 기여하고 있습니다.

댓글 하나

  1. 알렉산더 자비스 9월 6, 2014에서 10:04 오전

    스칼라와 관련된 발표가 정말 기대됩니다. 방금 살펴본 http://reactivecouchbase.org/ 하지만 현재 1.4 Java SDK에 의존하고 있습니다. 현재 mongodb와 ReactiveMongo를 사용하고 있는 애플리케이션을 포팅하기 전에 발표를 기다려도 되나요?

  2. [...] 비동기 코드. 일부 데이터베이스 드라이버는 이미 비동기 드라이버에서 Observable을 사용하고 있습니다. MongoDB는 [...]를 게시했습니다.

댓글 남기기