Kafka와의 통합의 다음 주요 반복에 대한 첫 번째 개발자 프리뷰를 발표하게 되어 기쁘게 생각합니다. 이 버전은 DCP용 새 라이브러리를 기반으로 하며, Kafka Connect 프레임워크를 지원합니다. 이 포스팅에서는 이를 통합하여 Couchbase에서 HDFS로 데이터를 릴레이하는 방법을 보여드리겠습니다.
여기서는 CentOS/Fedora Linux 배포판에 대한 단계를 보여드리겠습니다. 다른 OS의 단계도 비슷합니다. 먼저 Confluent 플랫폼(http://docs.confluent.io/3.0.0/installation.html#rpm-packages-via-yum)에서 커넥터가 통합된 Couchbase zip 아카이브를 다운로드합니다. http://packages.couchbase.com/clients/kafka/3.0.0-DP1/kafka-connect-couchbase-3.0.0-DP1.zip
커넥터를 등록하려면 기본 클래스 경로에 내용을 추출하면 됩니다(예: CentOS(Fedora)의 경우). /usr/share/java:
|
1 2 |
압축 해제 카프카-연결-카우치베이스-3.0.0-DP1.zip sudo cp -a 카프카-연결-카우치베이스-3.0.0-DP1/공유 /usr/ |
이제 Confluent 제어 센터 및 모든 종속 서비스를 실행하세요. Confluent에서 이러한 명령의 기능에 대해 자세히 알아보세요. 빠른 시작 가이드
|
1 2 3 4 5 6 |
sudo 동물원 관리자-서버-시작 /등/카프카/동물원 관리자.속성 sudo 카프카-서버-시작 /등/카프카/서버.속성 sudo 스키마-레지스트리-시작 /등/스키마-레지스트리/스키마-레지스트리.속성 sudo 연결-분산 /등/카프카/연결-분산.속성 sudo 제어-center-시작 /등/컨플루언트-제어-center/제어-center.속성 |
이제 모든 것이 준비되었으므로 Kafka Connect를 사용하여 Couchbase에서 HDFS로 문서를 전송하는 링크를 설정할 수 있습니다. 다음에서 Couchbase Server를 실행 중이라고 가정합니다. http://127.0.0.1:8091/ 및 컨플루언트 제어 센터의 http://127.0.0.1:9021/. 이 예제에서는 여행 샘플 버킷을 로드해야 합니다. 클러스터를 설정할 때 설정하지 않은 경우 웹 UI의 설정 부분을 통해 추가할 수 있습니다.
이러한 모든 전제 조건이 완료되면 Confluent 제어 센터에서 "Kafka 연결" 섹션으로 이동합니다. "새 소스"를 선택한 다음 커넥터 클래스로 "CouchbaseSourceConnector"를 선택하고 최종 JSON이 다음과 유사하도록 설정을 입력합니다:
|
1 2 3 4 5 6 7 |
{ "connector.class": "com.couchbase.connect.kafka.CouchbaseSourceConnector", "name": "travel-source", "connection.bucket": "travel-sample", "connection.cluster_address": "127.0.0.1", "topic.name": "여행 주제" } |
소스 연결을 저장하면 연결 디먼이 돌연변이를 수신하여 지정된 Kafka 토픽에 저장하기 시작합니다. 전체 파이프라인을 시연하기 위해, Kafka에서 데이터를 가져오기 위해 Sink 연결을 설정해 보겠습니다. 그러려면 "Sinks" 탭으로 이동하여 "New sink" 버튼을 클릭합니다. 흥미로운 데이터가 저장된 토픽을 묻는 메시지가 표시되면 "travel-topic"을 입력합니다. 그런 다음 "HdfsSinkConnector"를 선택하고 설정을 입력하면 JSON 구성이 다음과 같이 표시됩니다(HDFS 이름 노드가 수신 대기 중이라고 가정할 때 hdfs://127.0.0.1:8020/):
|
1 2 3 4 5 6 7 8 9 10 |
{ "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", "name": "hdfs-travel-sink", "flush.size": "10", "partitioner.class": "io.confluent.connect.hdfs.partitioner.FieldPartitioner", "partition.field.name": "파티션", "hdfs.url": "hdfs://127.0.0.1:8020", "주제": "여행 주제" } |
싱크 연결이 구성되면 다음에서 HDFS에 표시되는 데이터를 볼 수 있습니다. /topics/travel-topic/ 를 기본 토픽 디렉터리로 설정합니다. 그중 하나를 살펴봅시다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 |
$ hdfs dfs -fs hdfs://localhost:8020 -cat /topics/travel-topic/partition=89/travel-topic+0+0000000101+0000000101.avro | avropipe / [] /0 {} /0/파티션 89 /0/키 "route_28879" /0/만료 0 /0/플래그 33554438 /0/cas 1471633063247347712 /0/lockTime 0 /0/bySeqno 1 /0/revSeqno 1 /0/콘텐츠 "{"id":28879,"유형":"경로","항공사":"G4","airlineid":"항공사_35","소스공항":"AZA","목적지공항":"FWA","멈춤":0,"장비":"319","일정":[{"일":0,"UTC":"01:59:00","비행":"G4097"},{"일":1,"UTC":"09:30:00","비행":"G4697"},{"일":1,"UTC":"09:50:00","비행":"G4879"},{"일":1,"UTC":"07:44:00","비행":"G4310"},{"일":1,"UTC":"01:23:00","비행":"G4226"},{"일":2,"UTC":"19:58:00","비행":"G4921"},{"일":2,"UTC":"09:49:00","비행":"G4376"},{"일":2,"UTC":"17:57:00","비행":"G4446"},{"일":2,"UTC":"21:06:00","비행":"G4032"},{"일":3,"UTC":"17:05:00","비행":"G4198"},{"일":3,"UTC":"12:21:00","비행":"G4098"},{"일":3,"UTC":"19:31:00","비행":"G4571"},{"일":4,"UTC":"05:27:00","비행":"G4001"},{"일":4,"UTC":"07:03:00","비행":"G4023"},{"일":4,"UTC":"16:50:00","비행":"G4631"},{"일":5,"UTC":"18:13:00","비행":"G4757"},{"일":6,"UTC":"20:35:00","비행":"G4157"},{"일":6,"UTC":"21:52:00","비행":"G4582"},{"일":6,"UTC":"00:55:00","비행":"G4348"},{"일":6,"UTC":"06:01:00","비행":"G4731"}],"거리":2483.859992489083}" |
이것이 제가 간략하게 살펴본 예시입니다! DCP 클라이언트는 아직 개발 중이며 다양한 토폴로지 변경, 장애 시나리오를 처리하기 위해 몇 가지 추가 기능이 추가되고 있습니다. 다음 몇 차례의 Kafka 커넥터 업데이트에는 이러한 업데이트가 반영될 예정입니다. 또한 간단히 언급해야 할 사항은 카우치베이스의 DCP 클라이언트 인터페이스는 변동성이 있는 것으로 간주해야 합니다. 를 사용할 수 없습니다. 다양한 프로젝트에서 사용하고 있지만, 사용자 책임 하에 직접 사용해야 합니다.
커넥터의 소스 코드는 다음 링크에서 확인할 수 있습니다. https://github.com/couchbaselabs/kafka-connect-couchbase. 이슈 트래커는 https://issues.couchbase.com/projects/KAFKAC에 질문이 있으시면 언제든지 문의해 주세요. https://www.couchbase.com/forums/.