이 게시물에서는 끝까지 반응할 것입니다!
일부 카우치베이스 고객은 Vert.x완전히 비동기식 애플리케이션을 작성하기 위한 프레임워크인 카우치베이스 자바 SDK 는 처음부터 비동기식이며, 이 그림에 잘 맞고 RxJava-기반 비동기 API를 사용합니다.
이제 Couchbase로 빠르게 진행하는 방법을 살펴보겠습니다. 수직 에 대한 연결을 스핀업합니다. 클러스터 그리고 버킷 그런 다음 Java 8을 사용하여 데이터베이스에서 JSON 문서를 제공합니다.
이 블로그 게시물에서는 사용자가 Vert.x의 기본 사항. 다음은 간단한 목차입니다:
새 Vert.x 프로젝트 시작
먼저 프로젝트의 루트 폴더를 만들고 Maven 디렉토리 구조를 초기화(또는 선호하는 Maven 아키타입 사용)하여 새 Maven 기반 프로젝트를 만들어 보겠습니다. 예를 들어 다음 명령을 사용할 수 있습니다: "mkdir -p cbvertx/src/main/java/com/couchbase/demo/vertx/“.
이제 pom.xml 를 프로젝트의 루트에 추가합니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 |
<!--?xml version="1.0" encoding="UTF-8"?--> 4.0.0 com.couchbase.demo cbvertx 1.0-SNAPSHOT io.vertx vertx-core 3.1.0 io.vertx vertx-rx-java 3.1.0 com.couchbase.client java-client 2.2.2 <!-- this is actually already a transitive dependency of the Java SDK--> io.reactivex rxjava 1.0.15 log4j log4j 1.2.17 maven-compiler-plugin 3.3 1.8 1.8 |
보시다시피 Vert.x 버전 3.1.0 및 RxJava의 바인딩을 위한 확장 기능입니다, 카우치베이스 자바 SDK 버전 2.2.2 그리고 RxJava 버전 1.0.15...
버티클의 스켈레톤
저희는 카우치베이스 버티클 에서 AbstractVerticle in io.vertx.rxjava.core (vertx-rx-java 확장자에서). 프로젝트에서 스켈레톤을 만들어 보겠습니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package com.couchbase.demo.vertx; import java.util.ArrayList; import java.util.List; import com.couchbase.client.java.CouchbaseAsyncCluster; import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.core.json.JsonArray; import io.vertx.core.json.JsonObject; import io.vertx.core.logging.Logger; import io.vertx.core.logging.LoggerFactory; import io.vertx.rxjava.core.AbstractVerticle; public class CouchbaseVerticle extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(CouchbaseVerticle.class); private CouchbaseAsyncCluster cluster; } |
이 직후에 작성할 초기화 단계에서는 Vert.x 구성을 사용하여 런타임에 Couchbase 클러스터에서 부트스트랩할 노드를 결정하는 방법을 보여줄 것입니다. 인스턴스화 카우치베이스클러스터 는 여전히 초기화 중에 이렇게 할 수 있을 정도로 가볍습니다.
다음 init 메서드를 카우치베이스 버티클:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
@Override public void init(Vertx vertx, Context context) { super.init(vertx, context); //getting the configuration JSON JsonObject config = context.config(); //getting the bootstrap node, as a JSON array (default to localhost) JsonArray seedNodeArray = config.getJsonArray("couchbase.seedNodes", new JsonArray().add("localhost")); //convert to a List List seedNodes = new ArrayList<>(seedNodeArray.size()); for (Object seedNode : seedNodeArray) { seedNodes.add((String) seedNode); } //use that to bootstrap the Cluster this.cluster = CouchbaseAsyncCluster.create(seedNodes); } |
비동기적으로 버킷 가져오기
Couchbase API의 주요 진입점은 다음과 같습니다. 버킷 인터페이스 또는 비동기 버킷 를 사용해야 합니다. 버킷에 대한 연결을 설정('열기')하는 작업은 훨씬 더 무겁기 때문에 비동기적으로 수행해야 합니다.
먼저 버티클의 수명 기간 동안 사용할 버킷을 열어 버티클을 시작하는 방법을 살펴보겠습니다. 이 버킷에 대한 참조를 유지하면서 시작(미래 시작미래) 메서드를 사용하여 Vert.x에 버티클이 준비되었음을 비동기적으로 알릴 수 있습니다:
|
1 2 3 4 5 6 7 8 9 10 11 |
private volatile AsyncBucket bucket; @Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .subscribe( openedBucket -> bucket = openedBucket, startFuture::fail, startFuture::complete); } |
먼저 Vert.x 설정에서 버킷의 이름(및 관련 비밀번호)을 동적으로 가져옵니다. 버킷을 비동기적으로 열어 SDK 내부에 연결과 리소스를 설정합니다.
그리고 doOnNext 메서드를 사용하여 버킷의 열림을 기록합니다.
그런 다음 관찰 가능 스트리밍하고 최종 데이터를 '소비'하는 방법을 설명합니다:
- 버킷 참조를 받으면 나중에 사용할 수 있도록 필드에 저장합니다.
- 도중에 오류가 발생하면 버티클의 시작에 실패합니다.
Future#fail메서드를 사용합니다. - 그렇지 않으면 Vert.x에 버티클이 성공적으로 시작되었음을 알립니다.
미래#완료메서드를 사용합니다.
꽤 좋은 시작입니다!
SDK를 우아하게 해체하기
버티클이 중지되면 SDK에서 생성한 리소스를 적절히 폐기해야 합니다. 버티클의 클러스터 객체에는 연결 끊기 메서드를 재귀적으로 호출하여 이 작업을 수행하는 닫기 모든 버킷 열렸습니다(닫으면 하나의 버킷을 폐기할 수 있습니다).
또한 1.0.15 RxJava에는 내부 스레드를 종료하는 메서드가 있습니다: Schedulers.shutdown. 하지만 이 함수는 애플리케이션에서 이후 RxJava를 사용하지 않을 경우에만 호출해야 하므로 Vert.x 종료 시 호출하는 것이 좋습니다...
다시 한 번 수직선을 비동기적으로 중지하기 위해 미래 를 사용하여 프레임워크에 완료 중지를 알립니다:
|
1 2 3 4 5 6 7 8 9 |
@Override public void stop(Future stopFuture) throws Exception { cluster.disconnect() .doOnNext(isDisconnectedCleanly -> LOGGER.info("Disconnected Cluster (cleaned threads: " + isDisconnectedCleanly + ")")) .subscribe( isDisconnectedCleanly -> stopFuture.complete(), stopFuture::fail, Schedulers::shutdown); } |
(SDK 연결 해제 완료 시 RxJava를 종료하도록 선택했습니다.)
참고 SDK를 조정할 수 있습니다.
카우치베이스 환경생성 시클러스터. 이 경우 다음과 같이 전화하는 것도 사용자의 책임입니다.종료를 사용하여 전체 SDK를 종료할 때(즉, 환경을 사용하던 모든 클러스터(일반적으로는 한 클러스터만)를 종료할 때) 환경을 종료할 수 있습니다.특정 환경을 생성하지 않은 경우 SDK는 내부적으로 환경을 생성하고 제대로 종료하며, 그 결과는 위에 있는
isDisconnectedCleanly변수입니다.
실제로 보기
빠른 메인 를 호출하여 버티클을 배포한 다음 중지합니다. 이것은 카운트다운래치를 사용하는 매우 순진한 구현이며, 일반적으로 명령줄이나 런처 를 메인 클래스로 사용합니다.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 |
public static void main(String[] args) throws InterruptedException { Vertx vertx = Vertx.vertx(); final CountDownLatch startLatch = new CountDownLatch(1); vertx.deployVerticle(new CouchbaseVerticle(), event -> { if (event.succeeded()) LOGGER.info("Verticle Deployed - " + event.result()); else LOGGER.error("Verticle deployment error", event.cause()); startLatch.countDown(); }); startLatch.await(); final CountDownLatch stopLatch = new CountDownLatch(1); vertx.close(event -> { if (event.succeeded()) LOGGER.info("Vert.x Stopped - " + event.result()); else LOGGER.error("Vert.x stopping error", event.cause()); stopLatch.countDown(); }); stopLatch.await(); } |
이 작업을 실행하면 다음과 같은 내용이 표시됩니다(타임스탬프 형식이 달라진 것이 보이시나요?). 2015-12-11 는 SDK에서 가져온 것이고 Dec 11, 2015 는 Vert.x에서 가져온 것입니다):
|
1 2 3 4 5 6 7 8 9 10 11 12 |
2015-12-11 16:21:20 INFO Node:135 - Connected to Node localhost 2015-12-11 16:21:20 INFO ConfigurationProvider:263 - Opened bucket default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Bucket opened default Dec 11, 2015 4:21:20 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Verticle Deployed - caf06dd3-c8d1-4b89-8de0-58f09467b805 2015-12-11 16:21:20 INFO ConfigurationProvider:284 - Closed bucket default 2015-12-11 16:21:20 INFO Node:145 - Disconnected from Node localhost Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Disconnected Cluster (cleaned threads: true) Dec 11, 2015 4:21:22 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
오류 동작을 어떻게 확인하나요? 로그를 확인하기 위해 비밀번호를 잘못된 비밀번호로 변경하기만 하면 됩니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
2015-12-11 16:25:45 WARN Endpoint:283 - [null][KeyValueEndpoint]: Authentication Failure. 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 2015-12-11 16:25:45 WARN ResponseStatusConverter:129 - Unknown ResponseStatus with Protocol HTTP: 401 Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle SEVERE: Verticle deployment error com.couchbase.client.java.error.InvalidPasswordException: Passwords for bucket "default" do not match. at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:210) at com.couchbase.client.java.CouchbaseAsyncCluster$1.call(CouchbaseAsyncCluster.java:200) at rx.internal.operators.OperatorOnErrorResumeNextViaFunction$1.onError(OperatorOnErrorResumeNextViaFunction.java:99) at rx.internal.operators.OperatorMap$1.onError(OperatorMap.java:48) at rx.observers.Subscribers$5.onError(Subscribers.java:229) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber.pollQueue(OperatorObserveOn.java:197) at rx.internal.operators.OperatorObserveOn$ObserveOnSubscriber$2.call(OperatorObserveOn.java:170) at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Dec 11, 2015 4:25:45 PM com.couchbase.demo.vertx.CouchbaseVerticle INFO: Vert.x Stopped - null |
그래서 저희는 첫 번째 Couchbase Verticle을 성공적으로 배포(그리고 중단)했습니다!
하이파이브!
/! 비밀번호를 올바른 비밀번호로 다시 변경하는 것을 잊지 마세요.
더 나아가기
이 Verticle로 조금 더 해보겠습니다. Couchbase에서 샘플 데이터를 준비하여 Vert.x에서 관리하는 REST 엔드포인트에서 제공해 보도록 해 보겠습니다.
시작 시 Couchbase에서 샘플 데이터 만들기
Verticle을 시작하는 동안 Couchbase에서 두 개의 샘플 문서, 사용자 Alice와 Bob을 만들어 보겠습니다.
두 가지를 사용하여 Couchbase에 JSON을 저장할 수 있습니다. 문서 구현:
JsonDocument가 기본값입니다. 이는 SDK에서 제공하는 간단한 JSON 표현인JsonObject.RawJsonDocument는 애플리케이션에 이미 JSON 마샬링/언마샬링이 있을 때 유용합니다(또는 Vert.x 자체의JsonObject). 이 구현에서 전달되는 것은 원시 JSON 문자열 표현입니다.
다음은 두 가지 대안을 모두 사용하여 만든 앨리스와 밥입니다:
|
1 2 |
JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)) |
그리고
|
1 |
RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) |
이제 시작 방법에 약간의 조정이 필요합니다. 구독에 버킷에 대한 참조를 저장하는 대신 구독의 doOnNext. 그런 다음, 문서를 생성하고 Observable.just. 다음을 사용하여 삽입을 위해 SDK로 전달할 수 있습니다. 플랫맵:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
@Override public void start(Future startFuture) throws Exception { cluster.openBucket(config().getString("couchbase.bucketName", "default"), config().getString("couchbase.bucketPassword", "")) .doOnNext(openedBucket -> LOGGER.info("Bucket opened " + openedBucket.name())) .doOnNext(openedBucket -> bucket = openedBucket) .flatMap(nowIgnoreBucket -> Observable.just( JsonDocument.create("user1", com.couchbase.client.java.document.json.JsonObject.create() .put("name", "Alice").put("age", 26)), RawJsonDocument.create("user2", new JsonObject().put("name", "Bob").put("age", 31).encode()) )) .flatMap(doc -> bucket.upsert(doc)) .subscribe(Actions.empty(), startFuture::fail, startFuture::complete); } |
사용 업서트 를 사용하면 키가 데이터베이스에 이미 있는지 여부에 관계없이 문서가 생성됩니다.
Couchbase에서 JSON 데이터 서비스하기
바로 멈추지 않고 데이터베이스에서 json 문서를 검색하여 클라이언트로 전송하는 HTTP 서버를 돌리는 대신 경로가 다음과 같도록 수정해 보겠습니다. user/{id} 가 사용됩니다:
다음은 Vert.x를 빠르고 간단하게 사용하는 방법입니다. 런처 를 클릭하여 프로그램을 시작합니다(Vert.x 코어가 바로 중지되지는 않음). 우리의 메인 메서드에 다음과 같이 추가합니다:
|
1 2 |
String[] vertxArgs = new String[] { "run", "com.couchbase.demo.vertx.CouchbaseVerticle" }; Launcher.main(vertxArgs); |
참고: 실제 애플리케이션에서,
런처가 보통 jar의 메인 클래스가 되고 명령줄 인수를 직접 전달하게 됩니다.
이제 Verticle에서 HTTP 서버를 시작하겠습니다. 체인에 다음 코드를 추가합니다. 시작 메서드 바로 뒤에 flatMap(doc -> bucket.upsert(doc)) 호출합니다:
|
1 2 3 4 |
.last() .flatMap(ignore -> vertx.createHttpServer() .requestHandler(this::handle) .listenObservable(8080)) |
우리는 핸들 메서드를 사용하여 해당 경로를 설정합니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
private void handle(HttpServerRequest r) { String[] path = r.path().split("/"); if (path.length == 3 && "users".equals(path[1])) { bucket.get(path[2], RawJsonDocument.class) .switchIfEmpty(Observable.error(new DocumentDoesNotExistException(path[2]))) .subscribe(doc -> r.response() .putHeader("content-type", "application/json") .end(doc.content()), error -> { r.response() .putHeader("content-type", "application/json") .setStatusCode(500).setStatusMessage(error.toString()) .end("{"error": "" + error.toString() + ""}"); }); } } |
테스트해 보겠습니다. 애플리케이션을 실행하고 이 URL로 이동합니다: https://localhost:8080/users/user1. Couchbase에서 직접 제공되는 Alice의 JSON을 보실 수 있습니다!
|
1 2 3 4 |
{ "name": "Alice", "age": 26 } |
For 또 다른 키를 클릭하면 JSON 형식으로 예외가 표시됩니다:
|
1 2 3 |
{ "error": "com.couchbase.client.java.error.DocumentDoesNotExistException: user3" } |
HTTP 엔드포인트를 통해 버티클 중지하기
재미와 이익을 위해 Vert.x를 중지하는 경로를 빠르게 추가해 봅시다 :)
|
1 2 3 4 |
//...replacing from the last line in `handle` } else if (r.path().equals("/stop")) { r.response() .end(" |
Couchbase 및 Vertx 종료
vertx 스타터에서 실행하면 메인 스레드가 죽지 않습니다.
"); vertx.close(); }
열기 https://localhost:8080/stop 를 호출하면 전체 Vert.x 애플리케이션이 중지되고 배포된 Verticles가 제거됩니다.
참고: 메시지에서 설명한 것처럼 IDE에서 실행해도 프로세스가 종료되지 않습니다.
결론
이 블로그 게시물에서는 다음과 같은 방법을 알아봤습니다. Vert.x 및 카우치베이스 자바 SDK 는 완전한 비동기 애플리케이션을 구축하기 위해 함께 작업할 수 있습니다.
행복한 비동기 코딩!
위의 구현을 사용할 수 있는 깃허브 저장소 링크가 있나요?