Couchbase는 DCP 커넥터를 사용하는 Apache Kafka의 소스로 훌륭합니다.
그러나 빠르고 메모리를 우선시하며 안정적인 스토리지를 제공하기 때문에 데이터를 소화하는 엔드포인트로도 훌륭합니다.
이 블로그 게시물에서는 생산자와 소비자를 위한 간단한 Java 애플리케이션을 구축하여 Kafka에서 게시된 메시지를 Couchbase에 저장하는 방법을 보여드리겠습니다.
여기서는 이미 Kafka 클러스터가 있다고 가정합니다(단일 노드 클러스터일지라도). 그렇지 않은 경우 해당 설치 가이드를 따르세요.
이 블로그 환경은 4가지 부분으로 구성되어 있습니다:
- 카프카 프로듀서
- 아파치 카프카 대기열
- 카프카 소비자
- 카우치베이스 서버
프로듀서
대기열에 메시지를 제출하려면 프로듀서가 필요합니다.
대기열에서 해당 메시지가 소화되고 있으며 해당 토픽을 구독한 모든 애플리케이션이 해당 메시지를 읽을 수 있습니다.
메시지의 소스는 Mockaroo를 사용하여 만든 더미 JSON 파일이 될 것이며, 이를 분할하여 대기열로 전송할 것입니다.
샘플 JSON 데이터는 다음과 유사합니다:
1 2 3 4 5 6 7 8 9 |
{ "id":1, "gender":"여성", "first_name":"Jane", "last_name":"Holmes", "이메일":"jholmes0@myspace.com", "ip_address":"230.49.112.20", "city":"Houston" } |
프로듀서 코드입니다:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 |
가져오기 com.fasterxml.잭슨.데이터 바인드.JsonNode; 가져오기 com.fasterxml.잭슨.데이터 바인드.오브젝트 맵퍼; 가져오기 com.fasterxml.잭슨.데이터 바인드.노드.ArrayNode; 가져오기 org.아파치.카프카.클라이언트.프로듀서.카프카 프로듀서; 가져오기 org.아파치.카프카.클라이언트.프로듀서.ProducerConfig; 가져오기 org.아파치.카프카.클라이언트.프로듀서.프로듀서 레코드; 가져오기 org.아파치.카프카.클라이언트.프로듀서.RecordMetadata; 가져오기 자바.io.파일; 가져오기 자바.nio.문자셋.문자 집합; 가져오기 자바.nio.파일.파일; 가져오기 자바.nio.파일.경로; 가져오기 자바.활용.ArrayList; 가져오기 자바.활용.해시맵; 가져오기 자바.활용.목록; 가져오기 자바.활용.지도; 가져오기 자바.활용.동시.미래; public 클래스 카프카심플프로듀서 { public 정적 void 메인(문자열[] args) 던지기 예외 { 지도<문자열, 개체> 구성 = new 해시맵<>(); 구성.put(ProducerConfig.부트스트랩_서버_구성, "localhost:9092"); 구성.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 구성.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); 카프카 프로듀서<문자열, 문자열> 프로듀서 = new 카프카 프로듀서<문자열, 문자열>(구성); 파일 입력 = new 파일("sampleJsonData.json"); 바이트[] 인코딩 = 파일.readAllBytes(경로.get(입력.getPath() )); 문자열 jsons = new 문자열(인코딩, 문자 집합.기본 문자 집합()); 시스템.out.println("jsons로 파일 분할하기...."); 목록 splittedJsons = 분할(jsons); 시스템.out.println("JsonDocuments로 변환하기...."); int docCount = splittedJsons.크기(); 시스템.out.println("문서 수는 " + docCount ); 시스템.out.println("kafka로 메시지 보내기 시작...."); int 카운트 = 0; 에 대한 ( 문자열 doc : splittedJsons) { 시스템.out.println("보내는 메시지...." + 카운트); 프로듀서 레코드<문자열,문자열> 기록 = new 프로듀서 레코드<>( "couchbaseTopic", doc ); 미래 메타 = 프로듀서.보내기(기록); 시스템.out.println("msg sent...." + 카운트); 카운트++; } 시스템.out.println("총 " + 카운트 + "메시지 전송"); 프로듀서.닫기(); } public 정적 목록 분할(문자열 jsonArray) 던지기 예외 { 목록 splittedJsonElements = new ArrayList(); 오브젝트 맵퍼 jsonMapper = new 오브젝트 맵퍼(); JsonNode jsonNode = jsonMapper.readTree(jsonArray); 만약 (jsonNode.isArray()) { ArrayNode 배열 노드 = (ArrayNode) jsonNode; 에 대한 (int i = 0; i < 배열 노드.크기(); i++) { JsonNode 개별 요소 = 배열 노드.get(i); splittedJsonElements.추가(개별 요소.toString()); } } 반환 splittedJsonElements; } } |

카프카 프로듀서 앱의 출력
소비자
이것은 매우 간단하며, 대기열에서 메시지를 가져오고 Couchbase Java SDK를 사용하여 Couchbase에 문서를 삽입하기만 하면 됩니다. 간단하게 하기 위해 동기화 자바 SDK를 사용하겠지만 비동기화를 사용하는 것도 충분히 가능하고 권장하기도 합니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
가져오기 com.카우치베이스.클라이언트.자바.버킷; 가져오기 com.카우치베이스.클라이언트.자바.클러스터; 가져오기 com.카우치베이스.클라이언트.자바.카우치베이스클러스터; 가져오기 com.카우치베이스.클라이언트.자바.문서.JsonDocument; 가져오기 com.카우치베이스.클라이언트.자바.문서.json.JsonObject; 가져오기 카프카.소비자.소비자; 가져오기 카프카.소비자.ConsumerConfig; 가져오기 카프카.소비자.카프카스트림; 가져오기 카프카.자바파이.소비자.소비자 커넥터; 가져오기 카프카.메시지.메시지 및 메타데이터; 가져오기 자바.활용.*; public 클래스 카프카심플컨슈머 { public 정적 void 메인(문자열[] args) { 속성 구성 = new 속성(); 구성.put("zookeeper.connect", "localhost:2181"); 구성.put("zookeeper.connectiontimeout.ms", "10000"); 구성.put("group.id", "default"); ConsumerConfig 소비자 구성 = new 카프카.소비자.ConsumerConfig(구성); 소비자 커넥터 소비자 커넥터 = 소비자.createJavaConsumerConnector(소비자 구성); 지도<문자열, 정수> topicCountMap = new 해시맵<>(); topicCountMap.put("couchbaseTopic", 1); 지도<문자열, 목록<카프카스트림<바이트[], 바이트[]>>> consumerMap = 소비자 커넥터.createMessageStreams(topicCountMap); 목록<카프카스트림<바이트[], 바이트[]>> 스트림 = consumerMap.get("couchbaseTopic"); 목록 노드 = new ArrayList<>(); 노드.추가("localhost"); 클러스터 클러스터 = 카우치베이스클러스터.create(노드); final 버킷 버킷 = 클러스터.오픈버킷("kafkaExample"); 시도 { 에 대한 (final 카프카스트림<바이트[], 바이트[]> 스트림 : 스트림) { 에 대한 (메시지 및 메타데이터<바이트[], 바이트[]> msgAndMetaData : 스트림) { 문자열 msg = 변환 페이로드 문자열(msgAndMetaData.메시지()); 시스템.out.println(msgAndMetaData.주제() + ": " + msg); 시도 { JsonObject doc = JsonObject.fromJson(msg); 문자열 id = UUID.randomUUID().toString(); 버킷.업서트(JsonDocument.create(id, doc)); } catch (예외 ex) { 시스템.out.println("json 객체가 아닙니다: " + ex.getMessage()); } } } } catch (예외 ex) { 시스템.out.println("예외!!!!" + ex.getMessage()); 클러스터.연결 끊기(); } 클러스터.연결 끊기(); } 비공개 정적 문자열 변환 페이로드 문자열(final 바이트[] 메시지) { 문자열 문자열 = new 문자열(메시지); 반환 문자열; } } |

카프카 소비자 콘솔 출력
카우치베이스 서버
이제 Couchbase 서버에서 결과를 확인할 수 있습니다.
카프카 예시 버킷 - 1000개의 문서로 채워져 있습니다.

카우치베이스 버킷
각 문서는 이와 비슷하게 생겼습니다:

샘플 문서
간단한 3단계 솔루션.
프로덕션 환경에서는 프로듀서, 소비자, 카프카 또는 카우치베이스가 각각 하나 이상의 머신에 설치되어 있다는 점에 유의하세요.
의 전체(Maven 종속성 포함) 코드 GitHub.
Roi.