이전 블로그 게시물 우리는 Couchbase가 어떻게 하면 이벤트 중심 아키텍처. 카우치베이스 이벤트 서비스를 사용하여 문서 변경 사항을 Solace PubSub+ 대기열에서 구독 마이크로서비스에 데이터를 거의 실시간으로 제공합니다.
이 게시물에서는 토픽 구독자인 마이크로서비스가 확장 가능하고 탄력적인 데이터 저장소로 Couchbase를 활용하는 방법에 대해 중점적으로 설명합니다.
아키텍처 개요 및 샘플 애플리케이션
이 샘플 애플리케이션를 사용하여 Solace PubSub+ 토픽을 구독하고, 데이터를 처리하고, Couchbase에서 문서를 업데이트합니다.
예제 애플리케이션에서는 호텔 세부 정보를 Couchbase에 저장합니다. 호텔 구성에 대한 업데이트는 Solace PubSub+ 토픽에 게시됩니다. 토픽을 구독하고 업데이트된 데이터를 검색하여 Couchbase에서 기존 호텔 구성을 업데이트하는 애플리케이션을 구축합니다.
저희는 여행 샘플 버킷에는 다음과 같은 샘플 데이터가 들어 있습니다. 항공편, 경로 및 기타. 이 예에서는 호텔 문서에 저장된 인벤토리 범위의 호텔 컬렉션에 추가합니다. 모든 호텔 문서는 다양한 속성을 가진 JSON 문서로 구성됩니다.
이 튜토리얼에서는 다음과 같은 일부 속성만 업데이트합니다. pets_allowed 그리고 무료_인터넷.

샘플 애플리케이션은 솔라스 토픽을 구독하고 Couchbase에서 호텔 데이터를 업데이트합니다. 다른 애플리케이션은 호텔 구성 변경 사항을 제공할 수 있습니다. 우리는 토픽에 업데이트를 게시하는 간단한 서비스를 사용합니다.
캡션: 샘플 애플리케이션은 솔라스 토픽을 구독하고 Couchbase에서 호텔 데이터를 업데이트합니다. 다른 애플리케이션에서 호텔 구성 변경 사항을 제공할 수도 있습니다. 여기서는 토픽에 업데이트를 게시하는 간단한 서비스를 사용합니다.
전제 조건
이 샘플 애플리케이션의 전제 조건을 검토하세요:
카우치베이스 카펠라
무료 카우치베이스 카펠라 평가판 계정을 생성하고 안내에 따라 평가판 클러스터를 프로비저닝하세요. 이 과정은 몇 분 밖에 걸리지 않으며 다음과 같은 본격적인 Couchbase 클러스터를 제공합니다. 데이터 서비스 및 샘플 데이터 집합을 제공합니다, 여행 샘플, 이 블로그 게시물 전체에서 사용할 것입니다.
이 튜토리얼은 관리형 카우치베이스 카펠라 제품을 사용하지 않을 때 사용할 수 있습니다. The 여행 샘플 버킷은 Couchbase 서버 제품의 일부이며 Couchbase 콘솔을 사용하여 가져올 수 있습니다.
Solace PubSub+ 이벤트 브로커 클라우드
무료로 가입하기 Solace 클라우드 평가판 를 클릭하고 서비스/VPN을 생성합니다. VPN을 사용하여 솔라스 토픽에 연결합니다.
코드 검토 - Solace 구독자 및 게시자
솔라스 토픽의 메시지를 구독하려면 솔라스 자바 API를 사용합니다. 이 예제 구현은 Solace 문서의 예제에서 파생되었습니다. 구현에 대한 자세한 내용은 Solace 설명서를 참조하세요.
|
1 2 3 4 5 6 7 8 9 |
final JCSMPProperties properties = new JCSMPProperties(); properties.setProperty(JCSMPProperties.HOST, "tcps://yourhost.messaging.solace.cloud:55443"); properties.setProperty(JCSMPProperties.USERNAME, "solace-cloud-client"); properties.setProperty(JCSMPProperties.PASSWORD, "password"); properties.setProperty(JCSMPProperties.VPN_NAME, "couchbasedemo"); JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties); session.connect(); final Topic topic = JCSMPFactory.onlyInstance().createTopic("tutorial/topic"); |
우리는 JCSMP프로퍼티 Solace 웹 인터페이스에서 관련 액세스 정보를 사용할 수 있습니다. 그런 다음 JCSMPSession 및 주제.
토픽에 대한 메시지를 구독하려면 다음과 같이 구현합니다. XMLMessageConsumer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// ### Message Receiver - Subscribe to Topic ### final XMLMessageConsumer cons = session.getMessageConsumer(new XMLMessageListener() { @Override public void onReceive(BytesXMLMessage msg) { if (msg instanceof TextMessage) { System.out.printf("SUBSCRIBE: Message received: '%s'%n", ((TextMessage) msg).getText()); UpdateHotelData.getInstance().upsertHotelData(((TextMessage) msg).getText()); } else { System.out.println("Message received."); } } @Override public void onException(JCSMPException e) { System.out.printf("Consumer received exception: %s%n", e); } }); session.addSubscription(topic); System.out.println("SUBSCRIBE: Connected. Awaiting message..."); cons.start(); |
에서 onReceive 메서드를 호출하면 메시지를 받아 문자열로 변환한 다음 업서트호텔데이터 방법의 호텔 데이터 업데이트 클래스에는 이 블로그 게시물의 뒷부분에서 설명하는 Couchbase 구현이 포함되어 있습니다.
세션에 주제 구독을 추가하고 메시지 소비자를 시작합니다.
구현을 테스트하기 위해 메시지 퍼블리셔도 생성합니다. 이를 위해 먼저 XMLMessageProducer.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
// ### Message Producer - Send message to Topic XMLMessageProducer prod = session.getMessageProducer(new JCSMPStreamingPublishCorrelatingEventHandler() { @Override public void responseReceivedEx(Object key) { System.out.println("Producer received response for msg: " + key.toString()); } @Override public void handleErrorEx(Object key, JCSMPException cause, long timestamp) { System.out.printf("Producer received error for msg: %s@%s - %s%n", key.toString(), timestamp, cause); } }); |
메시지 프로듀서가 설치되었으므로 이제 아래와 같이 메시지를 보낼 수 있습니다.
|
1 2 3 4 5 6 7 8 9 |
// Send messages. Here we loop through and send the same message multiple times. for (int msgsSent = 0; msgsSent < 2; ++msgsSent) { TextMessage msg = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); msg.setText(data); System.out.printf("PUBLISH: Sending message '%s' to topic '%s'...%n", data, topic.getName()); prod.send(msg, topic); //Gives us some time to follow the console logs Thread.sleep(3000); } |
카우치베이스에 데이터 업로드하기
메시지를 구독하고 게시하는 로직이 준비되었으므로 이제 Couchbase에서 호텔 문서를 업데이트하는 방법에 대해 집중해 보겠습니다.
이 예에서는 토픽에서 검색된 메시지의 형식이 아래 JSON이라고 가정합니다. 여기에는 hotel_id 를 생성하여 Couchbase에 저장된 호텔 문서의 문서 ID와 변경할 수 있는 여러 속성으로 사용할 수 있습니다:
|
1 2 3 4 5 6 7 |
{ "hotel_id": "hotel_10025", "pets_ok": true, "free_breakfast": true, "free_internet": true, "free_parking": true } |
애플리케이션에서 카우치베이스 카펠라로 보안 연결을 설정하려면 다음과 같이 해야 합니다:
-
- 데이터베이스 사용자를 생성하고 이 사용자에게 호텔 데이터에 대한 액세스 권한을 부여합니다.
- 애플리케이션을 실행하는 호스트의 IP 주소를 화이트리스트에 추가합니다.
- 보안 연결 URL 캡처
이러한 모든 세부 사항은 Capella 관리 영역에서 관리할 수 있습니다. 카펠라에 로그인 를 클릭합니다:
- 열기 평가판 - 클러스터 클러스터로 이동하여 연결 탭
- 아래로 스크롤하여 데이터베이스 액세스 섹션을 클릭하고 자격 증명 관리

- 를 클릭합니다. 데이터베이스 자격 증명 만들기 를 클릭하고 데이터베이스 사용자 이름과 비밀번호를 입력합니다. 버킷 수준 액세스를 구성합니다. 인벤토리 범위의 여행 샘플 버킷을 만들고 읽기/쓰기 액세스 권한을 부여합니다.

- 를 클릭하여 애플리케이션 IP 주소를 화이트리스트에 추가합니다. 허용된 IP 관리 를 클릭한 다음 허용 IP 추가. 로컬 컴퓨터에서 애플리케이션을 실행하는 경우, 간단히 내 IP 추가 를 입력하면 외부 IP 주소가 자동으로 검색되어 추가됩니다.

- 에서 광역 네트워크 섹션에서 연결 URL을 복사합니다.
코드 검토 - Couchbase에 연결하여 문서 업데이트하기
첫 번째 단계로 클러스터 개체를 생성하여 Couchbase Capella에 연결합니다. 이전 단계에서 캡처한 엔드포인트 URL과 데이터베이스 사용자 자격 증명을 제공합니다.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// Update this to your cluster String endpoint = "cb.8c-f5di7pqrfjy8.cloud.couchbase.com"; String username = "dbuser"; String password = "Passw0rd!"; // User Input ends here. String bucketName = "travel-sample"; // Cluster Environment configuration ClusterEnvironment env = ClusterEnvironment.builder() .securityConfig( SecurityConfig.enableTls(true).trustManagerFactory(InsecureTrustManagerFactory.INSTANCE)) .ioConfig(IoConfig.enableDnsSrv(true)).build(); // Initialize the Connection and provide database user credentials Cluster cluster = Cluster.connect(endpoint, ClusterOptions.clusterOptions(username, password).environment(env)); // Create bucket object bucket = cluster.bucket(bucketName); bucket.waitUntilReady(Duration.parse("PT10S")); // create collection object for the 'hotel' collection located in the 'inventory' scope collection = bucket.scope("inventory").collection("hotel"); |
버킷 객체가 생성되면 버킷 객체의 호텔 범위를 설정하고 나중에 문서 변경 사항을 삽입하는 데 사용합니다. 문서의 호텔 범위는 인벤토리 범위의 여행 샘플 버킷.
이후의 모든 요청에 동일한 컬렉션 객체를 사용하기 위해 싱글톤 패턴을 사용한다는 점에 유의하세요. 자세한 내용은 샘플 구현을 참조하세요. 내 GitHub 프로젝트.
이제 토픽에서 받은 데이터로 호텔 문서를 업데이트하는 방법을 살펴보겠습니다.
수신된 메시지는 문자열이므로 첫 번째 단계로 이를 JSON 객체로 변환해야 합니다.
|
1 2 3 4 |
public void upsertHotelData(String content) { … hotelData = JsonObject.fromJson(content); … |
토픽의 입력 데이터에는 호텔 문서의 모든 속성의 하위 집합만 포함되어 있으므로 문서 필드를 가장 효율적으로 업데이트하는 방법을 고려해야 합니다.
두 가지 옵션이 있습니다:
-
- 문서 ID를 사용하여 Couchbase에서 전체 문서를 검색하고, 속성을 업데이트하고, 문서를 다시 작성합니다.
- 전체 문서를 교체하는 대신 문서 내의 전용 필드만 업데이트하기 위해 Couchbase의 하위 문서 API를 사용합니다.
옵션 1은 일반적으로 문서를 업데이트하거나 삽입하는 데 사용됩니다. 그러나 더 큰 문서로 작업하고 일부만 업데이트하려는 경우 전체 문서를 네트워크를 통해 전송할 필요가 없습니다. 아래 구현에서는 하위 문서 API를 사용하여 호텔 문서의 필드를 확인하고 업데이트합니다.
|
1 2 3 4 5 6 7 8 9 |
//Instead of resolving the entire document we use the sub-document API //to only resolve the relevant parts of the hotel document prior to the update. //Here we can define the path to the relevant attributes of the document. //In our scenario 'pets_ok' and 'free_internet' are at the document root. LookupInResult result = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); //Display the current values prior to the document update System.out.println("Values prior to update: 'pets_ok': " + result.contentAs(0, String.class) + ", 'free_internet': " + result.contentAs(1, String.class)); |
저희는 룩업인 연산을 사용하여 문서에서 특정 경로를 쿼리할 수 있습니다. 여기서 경로의 다른 구성 요소는 점(.)으로 구분됩니다. 경로의 pets_ok 그리고 무료_인터넷 속성은 시나리오에서 문서 루트에 있습니다. 그런 다음 결과를 트래버스하여 현재 값을 표시합니다.
|
1 2 3 4 5 6 |
//Even here we use the sub-document API. //Instead of updating the entire document we only update the relevant attributes. //The UPSERT method will either update the attributes if they already exist or create them in case they do not exist in within the document MutateInResult upsertResult = collection.mutateIn(hotelData.getString("hotel_id"), Arrays.asList(upsert("pets_ok", hotelData.getBoolean("pets_ok")), upsert("free_internet", hotelData.getBoolean("free_internet")))); |
그리고 mutateIn 작업을 통해 문서에서 하나 이상의 경로를 업데이트할 수 있습니다. Couchbase에서 문서를 ID로 식별하고 설정하려는 경로와 값의 배열을 제공합니다.
마지막으로, 저희는 룩업인 작업을 수행하여 업데이트가 성공했는지 확인합니다.
|
1 2 3 4 |
//Resolve and display the updated attributes LookupInResult result1 = collection.lookupIn(hotelData.getString("hotel_id"), Arrays.asList(get("pets_ok"), get("free_internet"))); System.out.println("Values after the update: 'pets_ok': " + result1.contentAs(0, String.class) + ", 'free_internet': " + result1.contentAs(1, String.class)); |
다음 단계
이 문서에서는 Couchbase 및 Solace Java SDK를 사용하여 Solace 이벤트 브로커를 Couchbase와 통합하는 방법을 설명합니다.
- 전체 코드 예제는 다음에서 확인할 수 있습니다. 이 프로젝트에 대한 내 GitHub 저장소.
- 메시지 보내기 배우기 에서 카우치베이스에서 위안이 되는 토픽으로 이 블로그 게시물(1부)에서.
- 사용 가능한 카우치베이스 SDK는 여기.
- 사용 카우치베이스 놀이터 를 사용하여 빠르게 개발을 시작할 수 있습니다.
- 시도 Solace PubSub+ 클라우드 평가판을 사용하세요.
- 지금 바로 카우치베이스 사용을 시작하세요. 카우치베이스 카펠라 클라우드 데이터베이스.