M. David Allen은 10년 넘게 거의 모든 프로그래밍 언어와 데이터베이스 시스템을 다뤄온 풀스택 소프트웨어 엔지니어이자 기업가입니다. 이전에는 금융, 의료, 정부 등 다양한 산업 분야에서 일했으며, 주로 대규모 데이터 통합 과제, 응용 연구 전환, 신기술 혁신에 주력해 왔습니다. 그는 버지니아 커먼웰스 대학교에서 석사 학위를 받았지만, 정규 교육을 떠난 후에도 여전히 기술을 공부하는 학생으로 남아 있습니다.
카우치베이스의 모든 것의 역사: DCP
약어 DCP(데이터베이스 변경 프로토콜) 뒤에 숨어 있는 Couchbase에는 비밀스러운 초능력이 있습니다. 대부분의 사람들은 데이터베이스를 특정 시점의 데이터를 저장하는 스토리지로 생각합니다. 하지만 데이터베이스 변경 프로토콜(DCP)를 통해 Couchbase 클러스터를 지속적인 변화의 흐름으로 볼 수 있습니다.
기본적으로 Couchbase는 "역사를 되감기"하여 데이터베이스에 발생한 모든 일을 처음부터 다시 재생할 수 있습니다. 그렇게 함으로써 그 이후의 모든 내부 상태를 해결할 수 있습니다. 이 글에서는 애초에 왜 그런 미친 짓을 하려고 했는지, 그리고 이 강력한 기능을 활용하여 문서로 더 멋진 일을 할 수 있는 방법에 대해 다뤄보겠습니다.
DCP란 무엇인가요?
클러스터형 데이터베이스인 Couchbase는 데이터베이스 아키텍처가 해결해야 하는 여러 가지 특수한 문제를 해결합니다. 한 노드에 일시적인 문제가 발생하거나 노드 간에 네트워크 중단이 발생하더라도 여러 노드가 서로 동기화 상태를 유지해야 합니다. 또한 대량의 새 문서가 들어오고 발생하는 쿼리를 처리할 수 있을 만큼 충분히 빨라야 합니다.
카우치베이스의 기본 설계와 이를 가능하게 하는 방법의 일부는 스트림 변경을 통해 이루어집니다. 추가 전용 접근 방식[1]. 즉, 문서를 변경할 때 Couchbase는 문서를 찾아서 디스크의 제자리에서 수정하지 않습니다. 그 대신 변경 결과가 무엇이든 키와 값의 로그에 추가하고 계속 이동합니다. 이러한 방식으로 쓰기를 매우 빠르게 수행할 수 있으며 여러 노드를 조정하기가 더 쉽습니다.
관계형 데이터베이스에 익숙하다면 DCP의 접근 방식은 다음과 다소 유사합니다. 미리 쓰기 로깅 다른 소프트웨어에서 볼 수 있습니다.
카우치베이스 내부에서 이를 어떻게 처리할까요?
어느 정도 수준에서는 데이터베이스가 다음과 같은 사항에 대해 걱정하기를 원합니다. 에 대한 하지 않아도 됩니다. 하지만 DCP의 경우 이러한 이벤트 스트림을 뱀파이어처럼 활용하고 이를 통해 온갖 멋진 일을 할 수 있기 때문에 우리가 관심을 갖는 것입니다:
완전히 다른 데이터베이스에 데이터 복제. 카우치베이스에는 이미 여러 가지 사용 가능한 커넥터를 사용할 수 있습니다. 별도의 Elasticsearch 클러스터를 설정하고 여기에 Couchbase의 모든 데이터 복사본을 포함하도록 하여 다른 데이터베이스가 제공할 수 있는 기능에 모든 데이터를 노출시킬 수 있습니다. 이 두 클러스터를 유지하는 것은 일반적으로 매우 까다로운 동기화 문제이지만, 커넥터는 대부분 DCP에 의해 공급되기 때문에 DCP 메시지를 다른 데이터베이스를 지속적으로 최신 상태로 유지하는 "git 커밋"의 스트림으로 생각할 수 있습니다.
대기열 기반 접근 방식을 사용하고 다른 마이크로서비스와 통합합니다. 해당 DCP 스트림을 수신하는 코드를 작성할 수 있다면 관심 있는 메시지만 수신하도록 필터링할 수도 있습니다. 문서가 삭제된 경우나 특정 유형의 문서에 대한 업데이트만 있을 수도 있습니다. 두 경우 모두 DCP 클라이언트를 사용하여 해당 메시지 스트림을 필터링하고 추가 메시지를 RabbitMQ 대기열에 게시할 수 있습니다, 카프카 주제또는 필요한 모든 것을 사용할 수 있습니다. 이는 다른 복잡한 비즈니스 로직을 구현하는 데 매우 유용할 수 있습니다. 예를 들어, 고객의 상태에 따라 규정 준수와 관련된 몇 가지 추가 로직을 트리거해야 하는 보험 회사를 이미징한다고 가정해 보세요. 고객 문서의 변형을 확인하고, 주별로 필터링하고, 올바른 대기열에 게시하고, 버지니아주와 메릴랜드주 고객에 대해 수행해야 하는 작업의 차이에 대해 다른 서비스가 걱정하도록 할 수 있습니다.
시계열/수정 분석. 데이터베이스의 각 값은 시간이 지남에 따라 변화합니다. 어떤 것은 한 번 쓰여지고 잊혀지기도 합니다. 다른 것들은 자주 업데이트될 수 있습니다. 자주 업데이트되는 항목의 경우 DCP를 사용하면 모든 항목을 시계열. 이전에는 어떤 값이었는지, 언제 변경되었는지, 어떤 값으로 변경되었는지 알 수 있습니다. 이를 통해 흥미로운 다운스트림 데이터 분석이 가능합니다. 사람들이 금요일과 토요일 밤 또는 화요일 아침에 사이트에서 맥주에 대한 평가를 가장 많이 한다는 것을 짐작할 수 있을까요?
기본적으로 DCP는 일종의 아키텍처 접착제로 사용할 수 있으며, Couchbase에서 다른 시스템으로 데이터를 가져와서 다른 시스템과 더 쉽고 유연하게 통합할 수 있게 해줍니다.
마지막으로, 백업 및 재해 복구를 위해 다른 Couchbase 클러스터로 복제할 수 있는 Couchbase의 표준 데이터 센터 간 복제(XDCR) 기능도 언급해야 합니다. 는 거의 동일한 방식으로 DCP를 사용할 수 있습니다.. 따라서 기본적으로 DCP는 단순히 "있으면 좋은 추가 기능"이 아니라 스토리지 아키텍처의 작동 방식에 대한 기본부터 XDCR과 같은 다른 Couchbase 기능의 기반을 제공하는 것까지 핵심에 내장되어 있습니다.
만약 를 하고 싶다면 어떻게 해야 하나요?
DCP 메시지를 수락하고 처리할 수 있는 소프트웨어 모듈인 DCP 클라이언트가 필요합니다. 이 글을 쓰는 현재, 다음과 같은 좋은 클라이언트가 준비되어 있습니다. Java에 하나, 그리고 Python 도 오래됐을 수 있습니다.
DCP에 대한 몇 가지 세부 정보 ...
DCP는 기본적으로 변조 및 삭제 메시지의 스트림으로 생각할 수 있습니다. 변이는 키 또는 그 뒤에 있는 콘텐츠가 어떤 식으로든 변경되는 것을 말합니다. 삭제는 말 그대로 '삭제'입니다. 다른 유형의 DCP 메시지가 있습니다.하지만 지금은 삭제와 변경으로만 설명하겠습니다.
DCP 메시지에는 어떤 내용이 포함되나요? 두 가지 중요한 항목이 포함됩니다:
- vBucket 식별자입니다. 무대 뒤에서 Couchbase는 거대한 키/값 저장소에 불과합니다. 분산형 데이터베이스라는 것을 알고 있기 때문에 Couchbase는 데이터베이스의 모든 키를 잘게 쪼개서 vBuckets라고 하는 서로 다른 "파티션"에 할당합니다. vBucket 식별자는 데이터베이스에 키 공간의 어느 세그먼트가 변경되는지 알려줄 뿐입니다. 이 주제에 대한 자세한 정보는 다음과 같습니다. 를 참조하세요. 더 자세히 살펴보고 싶으시다면 Couchbase에서 vBuckets를 중심으로 많은 구성이 가능합니다. 이 이미지에서는 3개의 서버에 9개의 vBucket이 중복 없이 분산되어 있는 것을 볼 수 있습니다.
- 증가하는 숫자입니다. 숫자의 값은 그다지 중요하지 않지만 중요한 것은 항상 올라간다는 것입니다. 이것이 바로 Couchbase가 모든 DCP 메시지를 제때에 주문할 수 있는 방법이며, 이는 매우 중요합니다.
이미지 출처: http://static.couchbaseinc.hosting.ca.onehippo.com/images/server/3.x/20170420-170703/vbuckets.png
간단한 사용 사례
Couchbase의 맥주 샘플 데이터 버킷을 예로 들어보겠습니다. 사용자들은 맥주를 평가할 수 있으며, 맥주의 총 평점이 특정 임계값에 도달하면 마케팅 담당자는 이를 알고 싶어 합니다. 마케팅 담당자는 맥주 회사에 전화를 걸어 맥주 웹사이트에 광고를 하고 싶은지 물어볼 것입니다. 아니면 별 5개를 받은 맥주가 있다면 다음 회사 모임에 사용할 맥주를 사재기할 수도 있습니다. 어느 쪽이든 그들은 별 다섯 개를 받은 맥주에 대해 알고 싶어합니다.
문제는 마케팅 담당자가 별도의 리드 시스템을 가지고 있다는 것입니다. 우리는 그들의 시스템에 간단한 HTTP 게시물을 작성하여 알림을 보낼 수 있습니다. 맥주가 별 5개를 받으면, 우리는 그들의 리드 시스템에 알림을 보내 Foo Brew를 확인하도록 합니다.
코드
아래 코드는 튜토리얼의 Java DCP 클라이언트. 일단 무슨 일이 일어나고 있는지 이해하면 필요에 맞게 조정하는 것은 매우 간단합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 |
패키지 com.foo.앱; 가져오기 com.카우치베이스.클라이언트.dcp.클라이언트; 가져오기 com.카우치베이스.클라이언트.dcp.메시지.DcpMutationMessage; 가져오기 com.카우치베이스.클라이언트.dcp.데이터 이벤트 핸들러; 가져오기 com.카우치베이스.클라이언트.dcp.컨트롤이벤트핸들러; 가져오기 com.카우치베이스.클라이언트.deps.io.netty.버퍼.ByteBuf; 가져오기 com.카우치베이스.클라이언트.dcp.운송.netty.채널 흐름 컨트롤러; 가져오기 com.카우치베이스.클라이언트.dcp.메시지.DcpDeletionMessage; 가져오기 com.카우치베이스.클라이언트.dcp.StreamFrom; 가져오기 com.카우치베이스.클라이언트.dcp.StreamTo; 가져오기 자바.활용.동시.시간 단위; 가져오기 com.카우치베이스.클라이언트.deps.io.netty.활용.문자 집합 활용; 가져오기 com.구글.gson.JsonParser; 가져오기 com.구글.gson.JsonObject; public 클래스 앱 { public 정적 void 메인(문자열[] args) 던지기 예외 { final JsonParser 파서 = new JsonParser(); final 클라이언트 클라이언트 = 클라이언트.구성().호스트 이름("localhost").버킷("맥주 샘플").빌드(); // 이 예제에서는 제어 이벤트로 아무 작업도 하지 않습니다. 클라이언트.제어 이벤트 핸들러(new 컨트롤이벤트핸들러() { public void onEvent(채널 흐름 컨트롤러 흐름 컨트롤러, ByteBuf 이벤트) { 이벤트.릴리스(); } }); 클라이언트.데이터 이벤트 핸들러(new 데이터 이벤트 핸들러() { public void onEvent(채널 흐름 컨트롤러 흐름 컨트롤러, ByteBuf 이벤트) { 만약 (DcpMutationMessage.는(이벤트)) { JsonObject 객체 = 파서.parse(DcpMutationMessage.콘텐츠(이벤트).toString(문자 집합 활용.UTF_8)).getAsJsonObject(); 만약 (객체.get("rating") != null && 객체.get("rating").getAsInt() > 4) { // 세상에, 마케팅 담당자들이 이걸 좋아하겠네... 시스템.out.println("맛있는 맥주가 있습니다: " + 객체.get("name").getAsString()); } } else 만약 (DcpDeletionMessage.는(이벤트)) { // System.out.println("안녕, 맛있는 맥주! " + DcpDeletionMessage.toString(event)); } 이벤트.릴리스(); } }); // 소켓 연결 클라이언트.연결().기다림(); // 상태 초기화(지금 시작, 멈추지 않음) 클라이언트.초기화 상태(StreamFrom.시작, StreamTo.INFINITY).기다림(); // 모든 파티션에서 스트리밍 시작 클라이언트.시작 스트리밍().기다림(); // 돌연변이를 인쇄하려면 잠시 동안 잠자기 상태로 전환합니다. // 인쇄는 IO 스레드에서 이루어집니다! 스레드.수면(시간 단위.분.toMillis(10)); // 시간이 끝나면 종료합니다. 클라이언트.연결 끊기().기다림(); } } |
이 코드에서 무슨 일이 일어나고 있나요?
- 클라이언트 인스턴스(및 특정 버킷)에 연결하기
- 클라이언트에게 관심 있는 메시지 시간 범위를 알려주세요. 전체 클러스터 기록을 대상으로 하겠지만 원하는 기간을 선택할 수 있습니다: StreamFrom.BEGINNING, StreamTo.INFINITY
- 클라이언트에게 개별 메시지를 처리하는 방법을 알려주는 것이 바로 데이터이벤트핸들러 코드입니다.
- 우리의 경우, 기본 문서의 JSON을 파싱하고 맥주의 등급을 확인하고 있습니다.
- 이 경우 프로세스가 계속 실행되어 새 메시지가 발생하면 계속해서 새 메시지를 받습니다.
유의해야 할 사항
단순화된 예제에서 Couchbase에 저장하는 것은 항상 JSON 문서입니다. 하지만 반드시 그럴 필요는 없습니다. 따라서 애플리케이션에 관심 있는 키를 기반으로 필터링하기 위해 몇 가지 스마트 기능을 추가하고 모든 것을 JSON으로 파싱하지 않도록 해야 할 것입니다. DCP 이벤트의 콘텐츠는 바이트 버퍼이며 문자열이나 JSON 문서가 아닙니다. 이것은 Couchbase가 실제로 내부에 저장하는 형식에 해당합니다.
DCP는 또한 단일 시점의 돌연변이도 보고합니다. 한 가지 세부 사항은 특정 맥주의 등급이 4점 이상일 수 있지만, 이것이 해당 맥주의 등급을 보장하는 것은 아니라는 점입니다. 지금 바로를 사용하여 일단 변이되면 를 해당 상태로 전환합니다. 보다 정교한 빌드 아웃에서는 이를 고려할 수 있습니다.
마지막으로 클러스터의 크기와 변이 속도에 따라 실제로 길어질 수 있다는 점을 염두에 두시기 바랍니다. 따라서 다음 사항을 살펴볼 수 있습니다. 흐름 제어 기술 를 사용하여 클라이언트가 이벤트 흐름을 따라잡을 수 있도록 합니다.
다음 단계는 어디인가요?
이제 DCP에 익숙해졌으니 먼저 다음 사항을 확인해 보시기 바랍니다. 카우치베이스의 사용 가능한 커넥터는 대부분 이미 DCP에 의존하고 있으며, 코딩 없이도 이 모든 기능을 활용할 수 있는 방법을 제공합니다.
살펴봐야 할 또 다른 주제는 스트리밍 데이터 시스템그리고 저장된 데이터가 아닌 데이터 스트림에서 "즉시" 수행할 수 있는 분석 유형입니다. AWS, Google Compute Engine 등 대부분의 클라우드 컴퓨팅 제공업체는 이 분야에서 놀라운 성과를 거두고 있으며, DCP를 사용하면 이러한 강력한 다운스트림 도구와 연결할 수 있는 데이터 스트림에 Couchbase 클러스터에 있는 모든 것을 완벽하게 넣을 수 있습니다.
[1] 조금 더 복잡하지만, 이 주제에 대해 더 자세히 알아볼 수 있도록 스토리지 아키텍처를 단순화하고 연결해 보았습니다.