새로운 Couchbase kafka 커넥터에 오신 것을 환영합니다! 이 커넥터는 카우치베이스 서버(3.0 이상)에서 카프카로 이벤트를 직접 스트리밍하는 기능을 제공합니다. 아직 개발 중이므로 주의해서 사용하고 이슈가 발견되면 공개해 주세요. 이슈 트래커는 다음 위치에 있습니다. https://issues.couchbase.com/browse/KAFKAC. PayPal의 Shibi에게 감사드립니다. 이전 인터페이스를 기반으로 다른 커넥터를 작성했습니다.이 작품에 영감을 주었습니다.
받기
프로젝트 소스 코드는 깃허브에서 찾을 수 있습니다. 여기. 개발자 미리 보기는 자체 maven 리포지토리를 통해 제공되며, GA 아티팩트는 maven central에서 사용할 수 있습니다. 좌표는 다음과 같습니다:
- 그룹 ID: com.couchbase.client
- 아티팩트 ID: 카프카-커넥터
- 버전: 1.0.0-dp1
1 2 3 4 5 6 7 8 9 10 11 |
신청하기 플러그인: 'java' 리포지토리 { mavenCentral() maven { URL { "http://files.couchbase.com/maven2" } } mavenLocal() } 종속성 { 컴파일(그룹: 'com.couchbase.client', name: 'kafka-connector', version: '1.0.0-dp1') } |
사용법
라이브러리 사용법은 매우 간단합니다. Couchbase 서버에서 모든 수정 사항을 수신하고 문서 본문만 Kafka로 보내고 싶다고 가정해 봅시다(기본적으로 커넥터는 문서 본문과 메타데이터를 JSON으로 직렬화합니다). 이를 위해서는 다음의 인스턴스만 허용하는 필터 클래스를 정의해야 합니다. 변이 메시지 를 눌러 통과합니다:
1 2 3 4 5 6 7 8 9 10 11 12 |
패키지 예제; 가져오기 com.카우치베이스.클라이언트.핵심.메시지.dcp.변이 메시지; 가져오기 com.카우치베이스.카프카.DCPEvent; 가져오기 com.카우치베이스.카프카.필터.필터; public 클래스 샘플 필터 구현 필터 { @오버라이드 public 부울 통과(final DCPEvent dcpEvent) { 반환 dcpEvent.메시지() 인스턴스 오브 변이 메시지; } } |
그리고 문서 값을 받는 인코더 클래스는 이를 바이트 배열로 변환합니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
패키지 예제; 가져오기 com.카우치베이스.클라이언트.핵심.메시지.dcp.변이 메시지; 가져오기 com.카우치베이스.클라이언트.deps.io.netty.활용.문자 집합 활용; 가져오기 com.카우치베이스.카프카.DCPEvent; 가져오기 com.카우치베이스.카프카.코더.추상 인코더; 가져오기 카프카.유틸리티.검증 가능한 속성; public 클래스 샘플 인코더 확장 추상 인코더 { public 샘플 인코더(final 검증 가능한 속성 속성) { super(속성); } @오버라이드 public 바이트[] 토바이트(final DCPEvent dcpEvent) { 변이 메시지 메시지 = (변이 메시지)dcpEvent.메시지(); 반환 메시지.콘텐츠().toString(문자 집합 활용.UTF_8).getBytes(); } } |
이 정도면 Couchbase-Kafka 브리지를 설정하는 데 충분합니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
패키지 예제; 가져오기 com.카우치베이스.카프카.카우치베이스카프카커넥터; 가져오기 com.카우치베이스.카프카.카우치베이스카프카환경; 가져오기 com.카우치베이스.카프카.기본 카우치베이스 카프카 환경; public 클래스 예 { public 정적 void 메인(문자열[] args) { 기본 카우치베이스 카프카 환경.빌더 빌더 = (기본 카우치베이스 카프카 환경.빌더) 기본 카우치베이스 카프카 환경 .빌더() .카프카필터클래스("example.SampleFilter") .카프카 가치 직렬화 클래스("example.SampleEncoder") .dcpEnabled(true); 카우치베이스카프카환경 환경 = 빌더.빌드(); 카우치베이스카프카커넥터 커넥터 = 카우치베이스카프카커넥터.create( 환경, "couchbase1.vagrant", "default", "", "kafka1.vagrant", "default"); 커넥터.실행(); } } |
그리고 couchbase1.vagrant 그리고 kafka1.vagrant 위의 주소는 각각 Couchbase Server와 Kafka의 위치이며, 다음에서 프로비저닝 스크립트를 사용하여 쉽게 설정할 수 있습니다. env/ 디렉터리를 찾습니다. 해당 디렉터리로 이동하여 다음을 실행하면 됩니다. 방랑자 업.