RxJava는 반응형 프로그래밍을 위한 멋진 도구로, Java CSV 리더로도 유용합니다.

RxJava를 처음 사용하거나 들어본 적이 없다면, 비동기 데이터 스트림으로 프로그래밍하는 방법입니다. 또한 스트림 데이터를 관찰하거나 수신하고 해당 스트림이 발견되면 무언가를 수행할 수 있는 이벤트 기반 프로그래밍입니다. 데이터 스트림은 변수부터 데이터 구조에 이르기까지 무엇이든 될 수 있습니다.

이 예제에서는 한 줄의 CSV 데이터를 데이터 스트림이라고 가정해 보겠습니다. 많은 개발자가 예상보다 더 자주 CSV 데이터를 데이터베이스에 로드해야 하는 상황에 직면하게 됩니다. 대부분의 시나리오에서는 CSV 파일에서 한 줄을 읽어 데이터베이스에 놓을 수 없습니다. 이 예제에서는 RxJava를 사용하여 데이터 스트림을 구독하고 이를 Java로 변환하여 Couchbase에 저장될 CSV 파일을 읽도록 하겠습니다.

요구 사항

이 프로젝트를 시작하고 실행하는 데 필요한 요구 사항은 그리 많지 않습니다. 최소한 다음이 필요합니다:

모든 개발 및 작업은 JDK 1.8 및 Maven을 통해 이루어지며, 여기에는 애플리케이션 실행이 포함됩니다.

데이터 집합 및 데이터 모델 이해

RxJava에 익숙해지는 가장 좋은 방법은 샘플 데이터 세트를 가져와서 사용해 보는 것입니다. 간단하게 하기 위해 쉼표로 구분된 값(CSV) 파일을 직접 만들겠지만, 좀 더 화려한 것을 원한다면 데이터 과학 웹사이트를 방문하세요, Kaggle.

간단한 CSV 데이터 집합에 행당 다음과 같은 열이 있다고 가정해 보겠습니다:

  1. id
  2. first_name
  3. last_name
  4. 트위터

쿼리 및 분석 관점에서 볼 때, CSV 형식의 데이터로 작업하는 것은 거의 불가능합니다. 대신, 이 데이터는 나중에 처리할 수 있도록 NoSQL 데이터로 저장됩니다. 여기서는 넘버 크런칭과 쿼리에 대해서는 다루지 않겠지만, 추후 다른 글에서 다룰 예정입니다. 지금은 그냥 NoSQL 형식으로 가져오고 싶습니다.

Couchbase에 로드된 CSV의 각 행은 다음과 같이 표시됩니다:

예, 위의 데이터 청크는 JSON 문서이며, 이는 Couchbase가 지원하는 것입니다. 이제 데이터 목표를 알았으니 RxJava를 사용하여 CSV 데이터를 Couchbase에 로드하기 시작할 수 있습니다.

원시 데이터 변환 및 카우치베이스에 쓰기

RxJava를 사용하여 Java 애플리케이션을 통해 CSV 데이터를 로드하려면 몇 가지 종속성을 포함해야 합니다. RxJava, OpenCSV, Couchbase Java SDK를 포함해야 합니다. Maven을 사용하고 있으므로 모두 Maven을 통해 포함할 수 있습니다. pom.xml 파일을 추가합니다. RxJava를 포함하려면 Maven 파일에 다음 종속성을 포함하세요:

원시 데이터는 CSV 형식이므로 OpenCSV라는 Java용 오픈 소스 CSV 라이브러리를 사용할 수 있습니다. OpenCSV에 대한 Maven 종속성은 다음과 같이 추가할 수 있습니다:

마지막으로 Java는 다음에 연결해야 합니다. 카우치베이스 서버. 이 작업은 카우치베이스 자바 SDK. 이 종속성을 Maven 프로젝트에 추가하려면 다음 내용을 pom.xml file:

모든 프로젝트 종속성을 사용할 수 있습니다!

CSV 파일을 읽지 않고 로드하려면 새로운 CSVReader 객체를 다음과 같이 설정합니다:

이 데이터는 결국 Couchbase로 처리되므로 서버에 연결하여 버킷을 열어야 합니다.

위의 내용은 Couchbase가 로컬에서 실행 중이며 데이터가 비밀번호 없이 기본 버킷에 저장된다고 가정합니다.

CSV 데이터 집합을 처리하기 위해 RxJava Observable을 생성할 수 있습니다:

Observable에서 일어나는 일을 분석하려면 다음 단계를 수행합니다.

그리고 CSVReaderIterable. 옵저버블은 Iterable 를 사용하여 데이터 소스로 .from 메서드를 사용합니다.

읽은 데이터는 데이터베이스에 직접 저장할 수 있는 것이 아니라 문자열 배열이 됩니다. 이 경우 .map 함수를 사용하면 문자열 배열을 원하는 대로 변환할 수 있습니다. 이 경우 목표는 CSV의 각 줄을 Couchbase 문서에 매핑하는 것입니다. 이 매핑 과정에서 추가 데이터 정리를 수행할 수 있습니다. 예를 들어 다음과 같은 작업을 수행할 수 있습니다. csvRow[*].trim() 를 사용하여 각 CSV 열의 앞뒤 공백을 제거합니다.

마지막으로 각 읽기 행이 처리될 때마다 Couchbase에 저장해야 합니다. 그리고 .subscribe 메서드는 옵저버블이 내보내는 알림(이 경우 조작된 데이터)을 구독합니다.

결론

방금 RxJava와 Couchbase Java SDK를 사용하여 더티 CSV 데이터를 Couchbase에 로드하는 것을 맛보았습니다. 반응형 프로그래밍을 수행하면 관찰 중인 모든 데이터 스트림에 대해 조치를 취할 수 있습니다. 이 시나리오에서는 CSV 파일을 Couchbase에 로드하기만 하면 됩니다.

방대한 데이터 세트가 있는 경우 Apache Spark를 사용하여 이 튜토리얼을 한 단계 더 발전시킬 수 있습니다. 저는 매우 유사한 CSV 로더를 작성했습니다. 여기 Spark를 사용합니다.

작성자

게시자 Nic Raboy, 개발자 옹호자, Couchbase

닉 라보이는 최신 웹 및 모바일 개발 기술을 옹호하는 사람입니다. 그는 Java, JavaScript, Golang 및 Angular, NativeScript, Apache Cordova와 같은 다양한 프레임워크에 대한 경험이 있습니다. Nic은 웹 및 모바일 개발을 보다 쉽게 이해할 수 있도록 자신의 개발 경험에 대해 글을 쓰고 있습니다.

댓글 남기기