이 게시물에서는 끝까지 반응할 것입니다!
일부 카우치베이스 고객은 Vert.x
완전히 비동기식 애플리케이션을 작성하기 위한 프레임워크인 카우치베이스 자바 SDK
는 처음부터 비동기식이며, 이 그림에 잘 맞고 RxJava
-기반 비동기 API를 사용합니다.
이제 Couchbase로 빠르게 진행하는 방법을 살펴보겠습니다. 수직
에 대한 연결을 스핀업합니다. 클러스터
그리고 버킷
그런 다음 Java 8을 사용하여 데이터베이스에서 JSON 문서를 제공합니다.
이 블로그 게시물에서는 사용자가 Vert.x의 기본 사항. 다음은 간단한 목차입니다:
새 Vert.x 프로젝트 시작
먼저 프로젝트의 루트 폴더를 만들고 Maven 디렉토리 구조를 초기화(또는 선호하는 Maven 아키타입 사용)하여 새 Maven 기반 프로젝트를 만들어 보겠습니다. 예를 들어 다음 명령을 사용할 수 있습니다: "mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/
“.
이제 pom.xml
를 프로젝트의 루트에 추가합니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml 버전="1.0" 인코딩="UTF-8"?--> 4.0.0 com.카우치베이스.데모 cbvertx 1.0-스냅샷 io.vertx vertx-핵심 3.1.0 io.vertx vertx-rx-자바 3.1.0 com.카우치베이스.클라이언트 자바-클라이언트 2.2.2 <!-- 이 는 실제로 이미 a 전이적 종속성 의 의 Java SDK--> io.리액티브엑스 rxjava 1.0.15 log4j log4j 1.2.17 maven-컴파일러-플러그인 3.3 1.8 1.8 |
보시다시피 Vert.x
버전 3.1.0
및 RxJava의 바인딩을 위한 확장 기능입니다, 카우치베이스 자바 SDK
버전 2.2.2
그리고 RxJava
버전 1.0.15
…
버티클의 스켈레톤
저희는 카우치베이스 버티클
에서 AbstractVerticle
in io.vertx.rxjava.core
(vertx-rx-java 확장자에서). 프로젝트에서 스켈레톤을 만들어 보겠습니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
패키지 com.카우치베이스.데모.vertx; 가져오기 자바.활용.ArrayList; 가져오기 자바.활용.목록; 가져오기 com.카우치베이스.클라이언트.자바.카우치베이스비동기클러스터; 가져오기 io.vertx.핵심.컨텍스트; 가져오기 io.vertx.핵심.Vertx; 가져오기 io.vertx.핵심.json.JsonArray; 가져오기 io.vertx.핵심.json.JsonObject; 가져오기 io.vertx.핵심.로깅.로거; 가져오기 io.vertx.핵심.로깅.로거 팩토리; 가져오기 io.vertx.rxjava.핵심.AbstractVerticle; public 클래스 카우치베이스 버티클 확장 AbstractVerticle { 비공개 정적 final 로거 로거 = 로거 팩토리.getLogger(카우치베이스 버티클.클래스); 비공개 카우치베이스비동기클러스터 클러스터; } |
이 직후에 작성할 초기화 단계에서는 Vert.x 구성을 사용하여 런타임에 Couchbase 클러스터에서 부트스트랩할 노드를 결정하는 방법을 보여줄 것입니다. 인스턴스화 카우치베이스클러스터
는 여전히 초기화 중에 이렇게 할 수 있을 정도로 가볍습니다.
다음 init 메서드를 카우치베이스 버티클
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@오버라이드 public void init(Vertx vertx, 컨텍스트 컨텍스트) { super.init(vertx, 컨텍스트); //설정 JSON 가져오기 JsonObject 구성 = 컨텍스트.구성(); //부트스트랩 노드를 JSON 배열로 가져옵니다(기본값은 localhost). JsonArray 시드 노드 배열 = 구성.getJsonArray("couchbase.seedNodes", new JsonArray().추가("localhost")); //목록으로 변환 목록 seedNodes = new ArrayList<>(시드 노드 배열.크기()); 에 대한 (개체 seedNode : 시드 노드 배열) { seedNodes.추가((문자열) seedNode); } //이를 사용하여 클러스터를 부트스트랩합니다. 이.클러스터 = 카우치베이스비동기클러스터.create(seedNodes); } |
비동기적으로 버킷 가져오기
Couchbase API의 주요 진입점은 다음과 같습니다. 버킷
인터페이스 또는 비동기 버킷
를 사용해야 합니다. 버킷에 대한 연결을 설정('열기')하는 작업은 훨씬 더 무겁기 때문에 비동기적으로 수행해야 합니다.
먼저 버티클의 수명 기간 동안 사용할 버킷을 열어 버티클을 시작하는 방법을 살펴보겠습니다. 이 버킷에 대한 참조를 유지하면서 시작(미래 시작미래)
메서드를 사용하여 Vert.x에 버티클이 준비되었음을 비동기적으로 알릴 수 있습니다:
1 2 3 4 5 6 7 8 9 10 11 |
비공개 휘발성 비동기 버킷 버킷; @오버라이드 public void 시작(미래 startFuture) 던지기 예외 { 클러스터.오픈버킷(구성().getString("couchbase.bucketName", "default"), 구성().getString("couchbase.bucketPassword", "")) .doOnNext(열린버킷 -> 로거.정보("버킷이 열렸습니다." + 열린버킷.이름())) .구독( 열린버킷 -> 버킷 = 열린버킷, startFuture::실패, startFuture::완료); } |
먼저 Vert.x 설정에서 버킷의 이름(및 관련 비밀번호)을 동적으로 가져옵니다. 버킷을 비동기적으로 열어 SDK 내부에 연결과 리소스를 설정합니다.
그리고 doOnNext
메서드를 사용하여 버킷의 열림을 기록합니다.
그런 다음 관찰 가능
스트리밍하고 최종 데이터를 '소비'하는 방법을 설명합니다:
- 버킷 참조를 받으면 나중에 사용할 수 있도록 필드에 저장합니다.
- 도중에 오류가 발생하면 버티클의 시작에 실패합니다.
Future#fail
메서드를 사용합니다. - 그렇지 않으면 Vert.x에 버티클이 성공적으로 시작되었음을 알립니다.
미래#완료
메서드를 사용합니다.
꽤 좋은 시작입니다!
SDK를 우아하게 해체하기
버티클이 중지되면 SDK에서 생성한 리소스를 적절히 폐기해야 합니다. 버티클의 클러스터
객체에는 연결 끊기
메서드를 재귀적으로 호출하여 이 작업을 수행하는 닫기
모든 버킷
열렸습니다(닫으면 하나의 버킷을 폐기할 수 있습니다).
또한 1.0.15
RxJava에는 내부 스레드를 종료하는 메서드가 있습니다: Schedulers.shutdown
. 하지만 이 함수는 애플리케이션에서 이후 RxJava를 사용하지 않을 경우에만 호출해야 하므로 Vert.x 종료 시 호출하는 것이 좋습니다...
다시 한 번 수직선을 비동기적으로 중지하기 위해 미래
를 사용하여 프레임워크에 완료 중지를 알립니다:
1 2 3 4 5 6 7 8 9 |
@오버라이드 public void 중지(미래 stopFuture) 던지기 예외 { 클러스터.연결 끊기() .doOnNext(isDisconnectedCleanly -> 로거.정보("연결이 끊긴 클러스터(정리된 스레드: " + isDisconnectedCleanly + ")")) .구독( isDisconnectedCleanly -> stopFuture.완료(), stopFuture::실패, 스케줄러::종료); } |
(SDK 연결 해제 완료 시 RxJava를 종료하도록 선택했습니다.)
참고 SDK를 조정할 수 있습니다.
카우치베이스 환경
생성 시클러스터
. 이 경우 다음과 같이 전화하는 것도 사용자의 책임입니다.종료
를 사용하여 전체 SDK를 종료할 때(즉, 환경을 사용하던 모든 클러스터(일반적으로는 한 클러스터만)를 종료할 때) 환경을 종료할 수 있습니다.특정 환경을 생성하지 않은 경우 SDK는 내부적으로 환경을 생성하고 제대로 종료하며, 그 결과는 위에 있는
isDisconnectedCleanly
변수입니다.
실제로 보기
빠른 메인
를 호출하여 버티클을 배포한 다음 중지합니다. 이것은 카운트다운래치를 사용하는 매우 순진한 구현이며, 일반적으로 명령줄이나 런처
를 메인 클래스로 사용합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public 정적 void 메인(문자열[] args) 던지기 중단된 예외 { Vertx vertx = Vertx.vertx(); final 카운트다운 래치 startLatch = new 카운트다운 래치(1); vertx.배포 버티클(new 카우치베이스 버티클(), 이벤트 -> { 만약 (이벤트.성공()) 로거.정보("버티클 배포됨 - " + 이벤트.결과()); else 로거.오류("버티클 배포 오류", 이벤트.원인()); startLatch.카운트다운(); }); startLatch.기다림(); final 카운트다운 래치 stopLatch = new 카운트다운 래치(1); vertx.닫기(이벤트 -> { 만약 (이벤트.성공()) 로거.정보("Vert.x 중지됨 - " + 이벤트.결과()); else 로거.오류("Vert.x 중지 오류", 이벤트.원인()); stopLatch.카운트다운(); }); stopLatch.기다림(); } |
이 작업을 실행하면 다음과 같은 내용이 표시됩니다(타임스탬프 형식이 달라진 것이 보이시나요?). 2015-12-11
는 SDK에서 가져온 것이고 Dec 11, 2015
는 Vert.x에서 가져온 것입니다):
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 정보 노드:135 - 연결됨 에 노드 localhost 2015-12-11 16:21:20 정보 구성 공급자:263 - 열림 버킷 기본값 12월 11, 2015 4:21:20 PM com.카우치베이스.데모.vertx.카우치베이스 버티클 정보: 버킷 열린 기본값 12월 11, 2015 4:21:20 PM com.카우치베이스.데모.vertx.카우치베이스 버티클 정보: 수직 배포됨 - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 정보 구성 공급자:284 - 닫힘 버킷 기본값 2015-12-11 16:21:20 정보 노드:145 - 연결 해제 에서 노드 localhost 12월 11, 2015 4:21:22 PM com.카우치베이스.데모.vertx.카우치베이스 버티클 정보: 연결 해제 클러스터 (청소 스레드: true) 12월 11, 2015 4:21:22 PM com.카우치베이스.데모.vertx.카우치베이스 버티클 정보: Vert.x 중지됨 - null |
오류 동작을 어떻게 확인하나요? 로그를 확인하기 위해 비밀번호를 잘못된 비밀번호로 변경하기만 하면 됩니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 경고 엔드포인트:283 - [null][키값 엔드포인트]: 인증 실패. 2015-12-11 16:25:45 경고 응답 상태 변환기:129 - 알 수 없음 응답 상태 와 함께 프로토콜 HTTP: 401 2015-12-11 16:25:45 경고 응답 상태 변환기:129 - 알 수 없음 응답 상태 와 함께 프로토콜 HTTP: 401 12월 11, 2015 4:25:45 PM com.카우치베이스.데모.vertx.카우치베이스 버티클 심각: 수직 배포 오류 com.카우치베이스.클라이언트.자바.오류.InvalidPasswordException: 비밀번호 에 대한 버킷 "default" do not 일치. 에서 com.카우치베이스.클라이언트.자바.카우치베이스비동기클러스터$1.call(카우치베이스비동기클러스터.자바:210) 에서 com.카우치베이스.클라이언트.자바.카우치베이스비동기클러스터$1.call(카우치베이스비동기클러스터.자바:200) 에서 rx.내부.연산자.연산자 오류 재개 다음 비아 함수$1.onError(연산자 오류 재개 다음 비아 함수.자바:99) 에서 rx.내부.연산자.오퍼레이터 맵$1.onError(오퍼레이터 맵.자바:48) 에서 rx.관찰자.구독자$5.onError(구독자.자바:229) 에서 rx.내부.연산자.오퍼레이터 옵저브온$옵저브온 구독자.pollQueue(오퍼레이터 옵저브온.자바:197) 에서 rx.내부.연산자.오퍼레이터 옵저브온$옵저브온 구독자$2.call(오퍼레이터 옵저브온.자바:170) 에서 rx.내부.스케줄러.ScheduledAction.실행(ScheduledAction.자바:55) 에서 자바.활용.동시.실행자$실행 가능한 어댑터.통화(실행자.자바:511) 에서 자바.활용.동시.미래 작업.실행(미래 작업.자바:266) 에서 자바.활용.동시.스케줄된 스레드 풀 실행기$예약된 미래 작업.액세스$201(스케줄된 스레드 풀 실행기.자바:180) 에서 자바.활용.동시.스케줄된 스레드 풀 실행기$예약된 미래 작업.실행(스케줄된 스레드 풀 실행기.자바:293) 에서 자바.활용.동시.스레드풀 실행자.런워커(스레드풀 실행자.자바:1142) 에서 자바.활용.동시.스레드풀 실행자$Worker.실행(스레드풀 실행자.자바:617) 에서 자바.lang.스레드.실행(스레드.자바:745) 12월 11, 2015 4:25:45 PM com.카우치베이스.데모.vertx.카우치베이스 버티클 정보: Vert.x 중지됨 - null |
그래서 저희는 첫 번째 Couchbase Verticle을 성공적으로 배포(그리고 중단)했습니다!
하이파이브!
/! 비밀번호를 올바른 비밀번호로 다시 변경하는 것을 잊지 마세요.
더 나아가기
이 Verticle로 조금 더 해보겠습니다. Couchbase에서 샘플 데이터를 준비하여 Vert.x에서 관리하는 REST 엔드포인트에서 제공해 보도록 해 보겠습니다.
시작 시 Couchbase에서 샘플 데이터 만들기
Verticle을 시작하는 동안 Couchbase에서 두 개의 샘플 문서, 사용자 Alice와 Bob을 만들어 보겠습니다.
두 가지를 사용하여 Couchbase에 JSON을 저장할 수 있습니다. 문서
구현:
JsonDocument
가 기본값입니다. 이는 SDK에서 제공하는 간단한 JSON 표현인JsonObject
.RawJsonDocument
는 애플리케이션에 이미 JSON 마샬링/언마샬링이 있을 때 유용합니다(또는 Vert.x 자체의JsonObject
). 이 구현에서 전달되는 것은 원시 JSON 문자열 표현입니다.
다음은 두 가지 대안을 모두 사용하여 만든 앨리스와 밥입니다:
1 2 |
JsonDocument.create("user1", com.카우치베이스.클라이언트.자바.문서.json.JsonObject.create() .put("name", "Alice").put("age", 26)) |
그리고
1 |
RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) |
이제 시작 방법에 약간의 조정이 필요합니다. 구독에 버킷에 대한 참조를 저장하는 대신 구독의 doOnNext
. 그런 다음, 문서를 생성하고 Observable.just
. 다음을 사용하여 삽입을 위해 SDK로 전달할 수 있습니다. 플랫맵
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@오버라이드 public void 시작(미래 startFuture) 던지기 예외 { 클러스터.오픈버킷(구성().getString("couchbase.bucketName", "default"), 구성().getString("couchbase.bucketPassword", "")) .doOnNext(열린버킷 -> 로거.정보("버킷이 열렸습니다." + 열린버킷.이름())) .doOnNext(열린버킷 -> 버킷 = 열린버킷) .플랫맵(nowIgnoreBucket -> 관찰 가능.그냥( JsonDocument.create("user1", com.카우치베이스.클라이언트.자바.문서.json.JsonObject.create() .put("name", "Alice").put("age", 26)), RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) )) .플랫맵(doc -> 버킷.업서트(doc)) .구독(작업.빈(), startFuture::실패, startFuture::완료); } |
사용 업서트
를 사용하면 키가 데이터베이스에 이미 있는지 여부에 관계없이 문서가 생성됩니다.
Couchbase에서 JSON 데이터 서비스하기
바로 멈추지 않고 데이터베이스에서 json 문서를 검색하여 클라이언트로 전송하는 HTTP 서버를 돌리는 대신 경로가 다음과 같도록 수정해 보겠습니다. user/{id}
가 사용됩니다:
다음은 Vert.x를 빠르고 간단하게 사용하는 방법입니다. 런처
를 클릭하여 프로그램을 시작합니다(Vert.x 코어가 바로 중지되지는 않음). 우리의 메인
메서드에 다음과 같이 추가합니다:
1 2 |
문자열[] vertxArgs = new 문자열[] { "run", "com.couchbase.demo.vertx.CouchbaseVerticle" }; 런처.메인(vertxArgs); |
참고: 실제 애플리케이션에서,
런처
가 보통 jar의 메인 클래스가 되고 명령줄 인수를 직접 전달하게 됩니다.
이제 Verticle에서 HTTP 서버를 시작하겠습니다. 체인에 다음 코드를 추가합니다. 시작
메서드 바로 뒤에 flatMap(doc -> bucket.upsert(doc))
호출합니다:
1 2 3 4 |
.마지막() .플랫맵(무시 -> vertx.createHttpServer() .요청 핸들러(이::핸들) .listenObservable(8080)) |
우리는 핸들
메서드를 사용하여 해당 경로를 설정합니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
비공개 void 핸들(HttpServerRequest r) { 문자열[] 경로 = r.경로().분할("/"); 만약 (경로.길이 == 3 && "users".같음(경로[1])) { 버킷.get(경로[2], RawJsonDocument.클래스) .switchIfEmpty(관찰 가능.오류(new 문서가 존재하지 않음 예외(경로[2]))) .구독(doc -> r.응답() .putHeader("콘텐츠 유형", "application/json") .끝(doc.콘텐츠()), 오류 -> { r.응답() .putHeader("콘텐츠 유형", "application/json") .설정 상태 코드(500).설정 상태 메시지(오류.toString()) .끝("{"오류": "" + error.toString() + ""}"); }); } } |
테스트해 보겠습니다. 애플리케이션을 실행하고 이 URL로 이동합니다: http://localhost:8080/users/user1. Couchbase에서 직접 제공되는 Alice의 JSON을 보실 수 있습니다!
1 2 3 4 |
{ "name": "Alice", "age": 26 } |
For 또 다른 키를 클릭하면 JSON 형식으로 예외가 표시됩니다:
1 2 3 |
{ "오류": "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
HTTP 엔드포인트를 통해 버티클 중지하기
재미와 이익을 위해 Vert.x를 중지하는 경로를 빠르게 추가해 봅시다 :)
1 2 3 4 |
//...`handle`의 마지막 줄부터 바꾸기 } else 만약 (r.경로().같음("/stop")) { r.응답() .끝(" |
Couchbase 및 Vertx 종료
vertx 스타터에서 실행하면 메인 스레드가 죽지 않습니다.
"); vertx.close(); }
열기 http://localhost:8080/stop 를 호출하면 전체 Vert.x 애플리케이션이 중지되고 배포된 Verticles가 제거됩니다.
참고: 메시지에서 설명한 것처럼 IDE에서 실행해도 프로세스가 종료되지 않습니다.
결론
이 블로그 게시물에서는 다음과 같은 방법을 알아봤습니다. Vert.x
및 카우치베이스 자바 SDK
는 완전한 비동기 애플리케이션을 구축하기 위해 함께 작업할 수 있습니다.
행복한 비동기 코딩!
위의 구현을 사용할 수 있는 깃허브 저장소 링크가 있나요?