최근 포럼에서 누군가 CouchDB에서 Couchbase로 데이터를 옮기고 싶다는 질문을 보았습니다. 평소 어려운 처지에 있는 친구의 이사를 도와주곤 했기 때문에 제가 도와야겠다고 생각했습니다. 이사는 특히 카우치처럼 큰 가구를 옮길 때는 준비가 필요합니다.
비유해서 죄송합니다. 이제 제가 어떻게 했는지 설명해드리겠습니다. 제 목표는 CouchDB에서 Couchbase로 데이터를 옮기는 것입니다. 따라서 첫 번째 질문은 CouchDB에서 데이터를 어떻게 가져올 수 있을까요? 여기에는 몇 가지 옵션이 있으며 가장 간단한 방법은 REST API를 사용하는 것이었습니다. include_doc 매개변수를 true로 설정한 상태에서 _all_docs 엔드포인트를 사용하면 모든 문서를 가져올 수 있습니다. 이것이 바로 저에게 필요한 기능입니다.
이제 REST 응답을 직접 다운로드하는 대신 Java 8 스트림 API를 사용할 수 있습니다. 그리고 SDK의 일부인 RxJava를 사용할 것이므로 해당 스트림을 관찰 가능 항목으로 래핑해야 합니다. 사실 아주 간단합니다:
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
URL url = new URL(couchDBRequest); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod(“GET”); conn.setRequestProperty(“Accept”, “application/json”); //assume this is going to be a big file… conn.setReadTimeout(0); if (conn.getResponseCode() != 200) { throw new RuntimeException(“Failed : HTTP error code : ” + conn.getResponseCode()); } BufferedReader inp2 = new BufferedReader(new InputStreamReader(conn.getInputStream())); final long[] totalRows = new long[2]; int count = Observable.from(inp2.lines()::iterator).count().toBlocking().single(); |
이것으로 CouchDB 문서를 내보내는 옵저버블이 생겼습니다. 다음 단계는 당연히 이 문서를 가져와서 Couchbase로 보내는 것입니다. 먼저 플랫맵을 사용해 응답의 각 줄을 파싱할 것입니다. 보시다시피 응답의 각 줄에는 하나의 문서가 포함되어 있기 때문에 쉽게 할 수 있습니다:
|
1 2 3 4 5 6 |
{"total_rows":2,"offset":0,"rows":[ {"id":"f750a4273b48b6c1146fe4ead1000c1b","key":"f750a4273b48b6c1146fe4ead1000c1b","value":{"rev":"2-bea53b374bf5a427ab15245bc029cac0"},"doc":{"_id":"f750a4273b48b6c1146fe4ead1000c1b","_rev":"2-bea53b374bf5a427ab15245bc029cac0","title":"A title"}}, {"id":"f750a4273b48b6c1146fe4ead1000c24","key":"f750a4273b48b6c1146fe4ead1000c24","value":{"rev":"1-9baa68f46c29940ad7a6d57ae1a04002"},"doc":{"_id":"f750a4273b48b6c1146fe4ead1000c24","_rev":"1-9baa68f46c29940ad7a6d57ae1a04002","title":"Another Title"}} ]} |
첫 번째 줄과 마지막 줄을 따로 처리해야 합니다. 첫 번째 줄에서는 총_행과 오프셋 정보를 가져옵니다. 마지막 줄에서는 아무 작업도 할 필요가 없습니다. 다음 연산자에게 전달할 내용이 없으므로 이 두 줄에 대해서는 Observable.empty()을 반환하기만 하면 됩니다. 다른 모든 줄에는 CouchDB가 편집한 행이 포함되어 있습니다. 이러한 각 행에는 JsonNode로 래핑할 수 있는 JSON 문서가 포함되어 있습니다.
다음 연산자 역시 플랫맵입니다. 여기서는 문서의 키와 그 내용을 문자열로 추출합니다. Json 객체를 String으로 가지고 있으므로 Jackson 등을 사용한 어떤 종류의 매핑도 필요하지 않습니다. RawJsonDocument를 직접 사용할 수 있습니다. RawJsonDocument가 있으면 이를 Couchbase로 가져올 수 있습니다. 이를 위해 저는 업서트 메서드를 사용합니다. 이것은 "묻지도 따지지도 않는" 것과 비슷합니다. 문서가 존재하는지 여부는 중요하지 않습니다. 문서가 존재하지 않으면 생성되고, 존재하면 대체됩니다. 원하는 동작이 아닐 수도 있지만 이 시나리오에서는 키가 이미 존재할 때 오류를 처리할 필요가 없으므로 가장 간단합니다.
실제로 오래 걸리지 않아야 하므로 작업에 500밀리초의 타임아웃을 할당합니다. 그런 다음 재시도 빌더를 사용합니다. 오류 시 재시도를 쉽게 관리하기 위해 Simon Basle이 추가한 멋진 도우미입니다. 여기서는 요청 취소 예외가 발생하면 최대 100회까지 재시도합니다. 각 재시도 전에 31초의 임의 지연을 추가합니다. 임시 실패 예외와 백압 예외에 대해서도 동일한 작업을 수행합니다. 여기서는 대신 100밀리초의 지연을 사용합니다.
그런 다음 doOnError와 doOnNext를 사용하여 성공 로그 파일 또는 오류 로그 파일에 문서의 키를 기록합니다. doOnX 메서드는 스트림의 핵심 의미를 변경하지 않고(예를 들어 데이터 변환처럼) 측면에 일부 동작, 즉 "부작용"을 추가합니다. 여기서는 FileWriter를 사용하여 로그 파일에 문자열을 씁니다. 이것은 안타깝게도 동기식이며 차단됩니다. 대신 비동기 로거를 사용하도록 변경할 수도 있습니다.
그런 다음 오류 발생 시에도 가져오기가 계속되도록 하기 위해 onErrorResumeNext를 사용합니다. 마지막으로 count().toBlocking().single()을 사용하여 Couchbase에서 얼마나 많은 삽입을 수행했는지 확인합니다. 결과를 마지막에 총_행 수와 비교합니다.
결국 코드는 다음과 같습니다:
아직 개선할 수 있는 부분이 많이 남아 있습니다. 예를 들어 가져오기 중에 원하는 일관성 수준을 선택할 수 있는 구성 옵션(PersistTo 및 ReplicateTo 옵션)을 추가하는 것입니다. 또한 가져오지 않은 문서의 키만 포함된 오류 로그를 스크립트의 항목으로 제공하는 것도 좋을 것 같습니다. 이렇게 하면 오류가 발생한 문서만 가져오기를 다시 실행할 수 있습니다.
어쨌든 도움이 되었기를 바라며, 피드백을 환영합니다!