이전 블로그 게시물 우리는 Couchbase가 어떻게 하면 이벤트 중심 아키텍처. 카우치베이스 이벤트 서비스를 사용하여 문서 변경 사항을 Solace PubSub+ 대기열에서 구독 마이크로서비스에 데이터를 거의 실시간으로 제공합니다.
이 게시물에서는 토픽 구독자인 마이크로서비스가 확장 가능하고 탄력적인 데이터 저장소로 Couchbase를 활용하는 방법에 대해 중점적으로 설명합니다.
아키텍처 개요 및 샘플 애플리케이션
이 샘플 애플리케이션를 사용하여 Solace PubSub+ 토픽을 구독하고, 데이터를 처리하고, Couchbase에서 문서를 업데이트합니다.
예제 애플리케이션에서는 호텔 세부 정보를 Couchbase에 저장합니다. 호텔 구성에 대한 업데이트는 Solace PubSub+ 토픽에 게시됩니다. 토픽을 구독하고 업데이트된 데이터를 검색하여 Couchbase에서 기존 호텔 구성을 업데이트하는 애플리케이션을 구축합니다.
저희는 여행 샘플 버킷에는 다음과 같은 샘플 데이터가 들어 있습니다. 항공편, 경로 및 기타. 이 예에서는 호텔 문서에 저장된 인벤토리 범위의 호텔 컬렉션에 추가합니다. 모든 호텔 문서는 다양한 속성을 가진 JSON 문서로 구성됩니다.
이 튜토리얼에서는 다음과 같은 일부 속성만 업데이트합니다. pets_allowed 그리고 무료_인터넷.

샘플 애플리케이션은 솔라스 토픽을 구독하고 Couchbase에서 호텔 데이터를 업데이트합니다. 다른 애플리케이션은 호텔 구성 변경 사항을 제공할 수 있습니다. 우리는 토픽에 업데이트를 게시하는 간단한 서비스를 사용합니다.
캡션: 샘플 애플리케이션은 솔라스 토픽을 구독하고 Couchbase에서 호텔 데이터를 업데이트합니다. 다른 애플리케이션에서 호텔 구성 변경 사항을 제공할 수도 있습니다. 여기서는 토픽에 업데이트를 게시하는 간단한 서비스를 사용합니다.
전제 조건
이 샘플 애플리케이션의 전제 조건을 검토하세요:
카우치베이스 카펠라
무료 카우치베이스 카펠라 평가판 계정을 생성하고 안내에 따라 평가판 클러스터를 프로비저닝하세요. 이 과정은 몇 분 밖에 걸리지 않으며 다음과 같은 본격적인 Couchbase 클러스터를 제공합니다. 데이터 서비스 및 샘플 데이터 집합을 제공합니다, 여행 샘플, 이 블로그 게시물 전체에서 사용할 것입니다.
이 튜토리얼은 관리형 카우치베이스 카펠라 제품을 사용하지 않을 때 사용할 수 있습니다. The 여행 샘플 버킷은 Couchbase 서버 제품의 일부이며 Couchbase 콘솔을 사용하여 가져올 수 있습니다.
Solace PubSub+ 이벤트 브로커 클라우드
무료로 가입하기 Solace 클라우드 평가판 를 클릭하고 서비스/VPN을 생성합니다. VPN을 사용하여 솔라스 토픽에 연결합니다.
코드 검토 - Solace 구독자 및 게시자
솔라스 토픽의 메시지를 구독하려면 솔라스 자바 API를 사용합니다. 이 예제 구현은 Solace 문서의 예제에서 파생되었습니다. 구현에 대한 자세한 내용은 Solace 설명서를 참조하세요.
|
1 2 3 4 5 6 7 8 9 |
final JCSMP프로퍼티 속성 = new JCSMP프로퍼티(); 속성.setProperty(JCSMP프로퍼티.호스트, "tcps://yourhost.messaging.solace.cloud:55443"); 속성.setProperty(JCSMP프로퍼티.사용자 이름, "솔라스-클라우드-클라이언트"); 속성.setProperty(JCSMP프로퍼티.비밀번호, "비밀번호"); 속성.setProperty(JCSMP프로퍼티.VPN_NAME, "couchbasedemo"); JCSMPSession 세션 = JCSMPFactory.onlyInstance().createSession(속성); 세션.연결(); final 주제 주제 = JCSMPFactory.onlyInstance().createTopic("튜토리얼/주제"); |
우리는 JCSMP프로퍼티 Solace 웹 인터페이스에서 관련 액세스 정보를 사용할 수 있습니다. 그런 다음 JCSMPSession 및 주제.
토픽에 대한 메시지를 구독하려면 다음과 같이 구현합니다. XMLMessageConsumer:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// ### 메시지 수신기 - 주제 구독 ### final XMLMessageConsumer 단점 = 세션.getMessageConsumer(new XML메시지 리스너() { 오버라이드 public void onReceive(BytesXMLMessage msg) { 만약 (msg 인스턴스 오브 문자 메시지) { 시스템.out.printf("구독: 메시지가 수신되었습니다: '%s'%n", ((문자 메시지) msg).getText()); 호텔 데이터 업데이트.getInstance().업서트호텔데이터(((문자 메시지) msg).getText()); } else { 시스템.out.println("메시지 수신됨."); } } 오버라이드 public void onException(JCSMPException e) { 시스템.out.printf("소비자가 예외를 받았습니다: %s%n", e); } }); 세션.구독 추가(주제); 시스템.out.println("구독: 연결됨. 메시지 대기 중..."); 단점.시작(); |
에서 onReceive 메서드를 호출하면 메시지를 받아 문자열로 변환한 다음 업서트호텔데이터 방법의 호텔 데이터 업데이트 클래스에는 이 블로그 게시물의 뒷부분에서 설명하는 Couchbase 구현이 포함되어 있습니다.
세션에 주제 구독을 추가하고 메시지 소비자를 시작합니다.
구현을 테스트하기 위해 메시지 퍼블리셔도 생성합니다. 이를 위해 먼저 XMLMessageProducer.
|
1 2 3 4 5 6 7 8 9 10 11 12 |
// ### 메시지 프로듀서 - 토픽에 메시지 보내기 XMLMessageProducer prod = 세션.getMessageProducer(new JCSMPS스트리밍퍼블리시연관 이벤트 핸들러() { 오버라이드 public void 응답받은Ex(개체 키) { 시스템.out.println("프로듀서가 메시지에 대한 응답을 받았습니다: " + 키.toString()); } 오버라이드 public void 핸들 에러엑스(개체 키, JCSMPException 원인, long 타임스탬프) { 시스템.out.printf("생산자가 메시지에 대한 오류 수신: %s@%s - %s%n", 키.toString(), 타임스탬프, 원인); } }); |
메시지 프로듀서가 설치되었으므로 이제 아래와 같이 메시지를 보낼 수 있습니다.
|
1 2 3 4 5 6 7 8 9 |
// 메시지 보내기. 여기서는 동일한 메시지를 반복해서 여러 번 전송합니다. 에 대한 (int msgsSent = 0; msgsSent < 2; ++msgsSent) { 문자 메시지 msg = JCSMPFactory.onlyInstance().createMessage(문자 메시지.클래스); msg.setText(데이터); 시스템.out.printf("게시: '%s' 토픽에 '%s' 메시지 보내기...%n", 데이터, 주제.getName()); prod.보내기(msg, 주제); //콘솔 로그를 추적할 시간을 줍니다. 스레드.수면(3000); } |
카우치베이스에 데이터 업로드하기
메시지를 구독하고 게시하는 로직이 준비되었으므로 이제 Couchbase에서 호텔 문서를 업데이트하는 방법에 대해 집중해 보겠습니다.
이 예에서는 토픽에서 검색된 메시지의 형식이 아래 JSON이라고 가정합니다. 여기에는 hotel_id 를 생성하여 Couchbase에 저장된 호텔 문서의 문서 ID와 변경할 수 있는 여러 속성으로 사용할 수 있습니다:
|
1 2 3 4 5 6 7 |
{ "hotel_id": "hotel_10025", "pets_ok": true, "free_breakfast": true, "free_인터넷": true, "free_parking": true } |
애플리케이션에서 카우치베이스 카펠라로 보안 연결을 설정하려면 다음과 같이 해야 합니다:
-
- 데이터베이스 사용자를 생성하고 이 사용자에게 호텔 데이터에 대한 액세스 권한을 부여합니다.
- 애플리케이션을 실행하는 호스트의 IP 주소를 화이트리스트에 추가합니다.
- 보안 연결 URL 캡처
이러한 모든 세부 사항은 Capella 관리 영역에서 관리할 수 있습니다. 카펠라에 로그인 를 클릭합니다:
- 열기 평가판 - 클러스터 클러스터로 이동하여 연결 탭
- 아래로 스크롤하여 데이터베이스 액세스 섹션을 클릭하고 자격 증명 관리

- 를 클릭합니다. 데이터베이스 자격 증명 만들기 를 클릭하고 데이터베이스 사용자 이름과 비밀번호를 입력합니다. 버킷 수준 액세스를 구성합니다. 인벤토리 범위의 여행 샘플 버킷을 만들고 읽기/쓰기 액세스 권한을 부여합니다.

- 를 클릭하여 애플리케이션 IP 주소를 화이트리스트에 추가합니다. 허용된 IP 관리 를 클릭한 다음 허용 IP 추가. 로컬 컴퓨터에서 애플리케이션을 실행하는 경우, 간단히 내 IP 추가 를 입력하면 외부 IP 주소가 자동으로 검색되어 추가됩니다.

- 에서 광역 네트워크 섹션에서 연결 URL을 복사합니다.
코드 검토 - Couchbase에 연결하여 문서 업데이트하기
첫 번째 단계로 클러스터 개체를 생성하여 Couchbase Capella에 연결합니다. 이전 단계에서 캡처한 엔드포인트 URL과 데이터베이스 사용자 자격 증명을 제공합니다.
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
// 클러스터에 업데이트 문자열 엔드포인트 = "cb.8c-f5di7pqrfjy8.cloud.couchbase.com"; 문자열 사용자 이름 = "dbuser"; 문자열 비밀번호 = "Passw0rd!"; // 사용자 입력은 여기서 끝납니다. 문자열 버킷 이름 = "travel-sample"; // 클러스터 환경 구성 클러스터 환경 환경 = 클러스터 환경.빌더() .보안 구성( SecurityConfig.enableTls(true).신뢰 관리자 팩토리(인시큐어트러스트매니저팩토리.인스턴스)) .ioConfig(IoConfig.enableDnsSrv(true)).빌드(); // 연결 초기화 및 데이터베이스 사용자 자격 증명 제공 클러스터 클러스터 = 클러스터.연결(엔드포인트, 클러스터 옵션.클러스터 옵션(사용자 이름, 비밀번호).환경(환경)); // 버킷 객체 생성 버킷 = 클러스터.버킷(버킷 이름); 버킷.waitUntilReady(기간.parse("PT10S")); // '인벤토리' 범위에 있는 '호텔' 컬렉션에 대한 컬렉션 객체를 생성합니다. 컬렉션 = 버킷.범위("인벤토리").컬렉션("호텔"); |
버킷 객체가 생성되면 버킷 객체의 호텔 범위를 설정하고 나중에 문서 변경 사항을 삽입하는 데 사용합니다. 문서의 호텔 범위는 인벤토리 범위의 여행 샘플 버킷.
이후의 모든 요청에 동일한 컬렉션 객체를 사용하기 위해 싱글톤 패턴을 사용한다는 점에 유의하세요. 자세한 내용은 샘플 구현을 참조하세요. 내 GitHub 프로젝트.
이제 토픽에서 받은 데이터로 호텔 문서를 업데이트하는 방법을 살펴보겠습니다.
수신된 메시지는 문자열이므로 첫 번째 단계로 이를 JSON 객체로 변환해야 합니다.
|
1 2 3 4 |
public void 업서트호텔데이터(문자열 콘텐츠) { ... 호텔데이터 = JsonObject.fromJson(콘텐츠); ... |
토픽의 입력 데이터에는 호텔 문서의 모든 속성의 하위 집합만 포함되어 있으므로 문서 필드를 가장 효율적으로 업데이트하는 방법을 고려해야 합니다.
두 가지 옵션이 있습니다:
-
- 문서 ID를 사용하여 Couchbase에서 전체 문서를 검색하고, 속성을 업데이트하고, 문서를 다시 작성합니다.
- 전체 문서를 교체하는 대신 문서 내의 전용 필드만 업데이트하기 위해 Couchbase의 하위 문서 API를 사용합니다.
옵션 1은 일반적으로 문서를 업데이트하거나 삽입하는 데 사용됩니다. 그러나 더 큰 문서로 작업하고 일부만 업데이트하려는 경우 전체 문서를 네트워크를 통해 전송할 필요가 없습니다. 아래 구현에서는 하위 문서 API를 사용하여 호텔 문서의 필드를 확인하고 업데이트합니다.
|
1 2 3 4 5 6 7 8 9 |
//전체 문서를 확인하는 대신 하위 문서 API를 사용합니다. //를 사용하여 업데이트 전에 호텔 문서의 관련 부분만 해결합니다. //여기에서 문서의 관련 속성에 대한 경로를 정의할 수 있습니다. //이 시나리오에서 'pets_ok'과 'free_internet'은 문서 루트에 있습니다. 룩업인결과 결과 = 컬렉션.룩업인(호텔데이터.getString("hotel_id"), 배열.asList(get("pets_ok"), get("free_인터넷"))); //문서 업데이트 전의 현재 값을 표시합니다. 시스템.out.println("업데이트 전 값: 'pets_ok': " + 결과.contentAs(0, 문자열.클래스) + ", 'free_internet': " + 결과.contentAs(1, 문자열.클래스)); |
저희는 룩업인 연산을 사용하여 문서에서 특정 경로를 쿼리할 수 있습니다. 여기서 경로의 다른 구성 요소는 점(.)으로 구분됩니다. 경로의 pets_ok 그리고 무료_인터넷 속성은 시나리오에서 문서 루트에 있습니다. 그런 다음 결과를 트래버스하여 현재 값을 표시합니다.
|
1 2 3 4 5 6 |
//여기에서도 하위 문서 API를 사용합니다. //문서 전체를 업데이트하는 대신 관련 속성만 업데이트합니다. //업서트 메서드는 속성이 이미 있는 경우 업데이트하거나 문서 내에 속성이 없는 경우 생성합니다. 결과 내 뮤테이트 업서트 결과 = 컬렉션.mutateIn(호텔데이터.getString("hotel_id"), 배열.asList(업서트("pets_ok", 호텔데이터.getBoolean("pets_ok")), 업서트("free_인터넷", 호텔데이터.getBoolean("free_인터넷")))); |
그리고 mutateIn 작업을 통해 문서에서 하나 이상의 경로를 업데이트할 수 있습니다. Couchbase에서 문서를 ID로 식별하고 설정하려는 경로와 값의 배열을 제공합니다.
마지막으로, 저희는 룩업인 작업을 수행하여 업데이트가 성공했는지 확인합니다.
|
1 2 3 4 |
//업데이트된 속성 확인 및 표시 룩업인결과 결과1 = 컬렉션.룩업인(호텔데이터.getString("hotel_id"), 배열.asList(get("pets_ok"), get("free_인터넷"))); 시스템.out.println("업데이트 후 값: 'pets_ok': " + 결과1.contentAs(0, 문자열.클래스) + ", 'free_internet': " + 결과1.contentAs(1, 문자열.클래스)); |
다음 단계
이 문서에서는 Couchbase 및 Solace Java SDK를 사용하여 Solace 이벤트 브로커를 Couchbase와 통합하는 방법을 설명합니다.
- 전체 코드 예제는 다음에서 확인할 수 있습니다. 이 프로젝트에 대한 내 GitHub 저장소.
- 메시지 보내기 배우기 에서 카우치베이스에서 위안이 되는 토픽으로 이 블로그 게시물(1부)에서.
- 사용 가능한 카우치베이스 SDK는 여기.
- 사용 카우치베이스 놀이터 를 사용하여 빠르게 개발을 시작할 수 있습니다.
- 시도 Solace PubSub+ 클라우드 평가판을 사용하세요.
- 지금 바로 카우치베이스 사용을 시작하세요. 카우치베이스 카펠라 클라우드 데이터베이스.