필요성
아, 데이터베이스 마이그레이션. JSON 문서 데이터 표현이 훨씬 더 유연한 Couchbase로 마이그레이션한 후에는 변경 요청 파이프라인을 통해 개발 프로세스를 비틀어 다운타임, 오류 가능성 및 행과 열에 바인딩할 때의 일반적인 불쾌감을 겪을 필요가 더 이상 없습니다.
즉, Couchbase를 사용한다고 해서 항상 Couchbase의 데이터로 데이터 변환을 할 필요가 없다는 의미는 아닙니다. 다만 그 빈도가 훨씬 적을 뿐입니다.
이곳은 실제로 Apache Spark에 흥미로운 장소입니다!
부연 설명...
Spark 커넥터와 Couchbase 4.0에는 Spark 사용과 관련된 인터페이스가 하나뿐 아니라 네 가지가 있습니다. K-V 인터페이스, 데이터베이스 변경 프로토콜(일명 DCP) 스트리밍 인터페이스, Spark SQL을 통한 N1QL 쿼리 인터페이스 및 보기 쿼리 인터페이스가 바로 그것입니다.
이러한 데이터는 Spark 에코시스템의 다양한 데이터 소스와 결합하여 여러 가지 방법으로 데이터를 통합하고 조작할 수 있습니다. 예를 들어, DCP를 통해 Couchbase에서 데이터를 스트리밍하여 HDFS 데이터 소스와 혼합한 다음 대상 결과를 다른 Couchbase 버킷에 다시 넣을 수 있습니다.
솔루션...
간단한 예를 들어, Spark를 사용하여 Couchbase 내부의 데이터 집합을 효율적으로 변환하는 코드를 작성하려면 어떻게 해야 할까요?
게임 플레이어에 대한 새로운 데이터 세트를 JSON 형식으로 확보한 시나리오를 상상해 보세요. 이들은 모두 곧 새로운 FizzBuzz 게임을 플레이할 예정이고, 프로필은 파트너로부터 전달받았습니다. 수신되는 프로필은 모두 다음과 비슷합니다:
1 2 3 4 5 6 |
{ "givenname": "Joel", "성": "Smith", "이메일": "joelsmith@g00글메일.com", "자격 토큰": 78238743 } |
문제는 FizzBuzz 프로필이 모두 다음과 같이 보인다는 것입니다:
1 2 3 4 5 6 |
{ "fname": "Matt", "lname": "인젠트론", "이메일": "매트@카우치베이스.com", "커런츠코어": 1000000 } |
일반적으로 데이터에 다른 형태가 있다면 읽기 및 쓰기 시점에 매핑을 위한 약간의 로직을 추가합니다. 그러나 이 특별한 전환은 일회성 프로세스이며 추가적인 문제가 있습니다. 이 '자격 토큰'은 보유하고 있는 MySQL 데이터베이스 백업에서 조회해야 합니다. 출시일 트래픽을 처리하기 위해 대규모 MySQL 배포를 프로비저닝하거나 유지 관리할 필요가 없으므로 출시 전에 한 번만 변환하는 것이 더 좋습니다.
데이터를 스트리밍하여 원하는 '모양'을 가진 데이터를 찾은 다음 SQL 쿼리를 기반으로 Spark로 변환하는 것이 가장 이상적입니다.
먼저 연결을 설정하고 데이터를 스트리밍하여 이미 가져온 JSON의 모양을 찾아야 합니다. 여기서는 Couchbase의 DCP 인터페이스를 사용하여 데이터를 스트리밍합니다.
1 2 3 4 5 6 |
val ssc = new 스트리밍 컨텍스트(sc, 초(5)) ssc.카우치베이스스트림("혁신적") .필터(_.isInstanceOf[돌연변이]) .지도(m => (new 문자열(m.asInstanceOf[돌연변이].키), new 문자열(m.asInstanceOf[돌연변이].콘텐츠))) |
현재의 한계는 DStream이 멈추지 않는다는 것이지만, 이 간단한 경우에 대한 해결 방법으로 더 이상 데이터가 변환되지 않는지 모니터링할 수 있습니다.
그런 다음 항목별로 이 MySQL 조회를 기반으로 변환을 적용해야 합니다. 그러기 위해서는 MySQL에서 데이터를 로드해야 합니다. MySQL 테이블이 다음과 같다고 가정합니다:
1 2 3 4 5 6 7 8 9 10 |
mysql> 설명 프로필; +------------------+-------------+------+-----+---------+-------+ | 필드 | 유형 | Null | 키 | 기본값 | 추가 | +------------------+-------------+------+-----+---------+-------+ | givenname | varchar(20) | 예 | | NULL | | | 성 | varchar(20) | 예 | | NULL | | | 이메일 | varchar(20) | 예 | | NULL | | | 자격 토큰 | int(11) | 예 | | NULL | | +------------------+-------------+------+-----+---------+-------+ 4 행 in set (0.00 초) |
MySQL 데이터를 데이터프레임으로 로드하고 싶을 것입니다. 스트리밍 컨텍스트는 조인할 RDD를 제공하므로, 나중에 스트림 내에서 조인할 수 있도록 RDD 집합으로 변환하겠습니다. Spark 1.6을 사용하면 이 작업이 더 쉬워질 수 있습니다. 변환은 다음과 같습니다(다시 사용할 수 있도록 함수로 추출됨):
1 2 3 4 5 6 |
/** 문서에서 추출한 이메일 주소를 기반으로 RDD를 반환합니다 */. def CreateMappableRdd(s: (문자열, 문자열)): (문자열, JsonDocument) = { val 반환_문서 = JsonDocument.create(s._1, JsonObject.fromJson(s._2)) (반환_문서.콘텐츠().getString("이메일"), 반환_문서) } |
또한 새 자격 토큰(역시 추출됨)을 추가해야 합니다:
1 2 3 4 5 6 7 8 |
/** 자격 토큰으로 강화된 JsonDocument를 반환합니다 */. def mergeIntoDoc(t: (문자열, (JsonDocument, 정수))): JsonDocument = { val jsonToEnrich = t._2._1.콘텐츠() val 자격출발 = t._2._2 jsonToEnrich.put("자격 토큰", 자격출발) t._2._1 } |
결국 우리는 비행 중에 변경이 필요한 RDD를 수정하는 변환에 대해 유창하게 설명할 수 있게 되었습니다. 이렇게 하면 궁극적으로 변환된 데이터가 Couchbase에 다시 쓰여지고, K-V 인터페이스를 사용하여 항목을 덮어쓰게 됩니다.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
// MySQL에서 모든 사용자의 데이터 프레임을 로드합니다. // 데이터 양에 따라 여기에 .cache()를 추가하는 것이 적절할 수도 있고 그렇지 않을 수도 있습니다. val 자격 = mysqlReader.load() /* 로드 중입니다: +---------+-----------+-----------------+----------------+ |이름| 성| 이메일| 자격 토큰| +---------+-----------+-----------------+----------------+ | 매트| 인젠트론| matt@email.com| 11211| | Michael|Nitschinger|michael@email.com| 11210| +---------+-----------+-----------------+----------------+ */ val 자격산스케마 = 자격.rdd.지도[(문자열, 정수)](f => (f.getAs[문자열]("이메일"), f.getAs[정수]("자격 토큰"))) val ssc = new 스트리밍 컨텍스트(sc, 초(5)) ssc.카우치베이스스트림("혁신적") .필터(_.isInstanceOf[돌연변이]) .지도(m => (new 문자열(m.asInstanceOf[돌연변이].키), new 문자열(m.asInstanceOf[돌연변이].콘텐츠))) .지도(s => CreateMappableRdd(s)) .필터(_._2.콘텐츠().get("자격 토큰").eq(null)) .foreachRDD(rdd => { rdd .join(자격산스케마) .지도(mergeIntoDoc) //.foreach(println) // 효과를 확인하기 좋은 곳입니다. .저장하기("혁신적") }) ssc.시작() ssc.기다림종료() |
그리고 전체 예제는 couchbase-spark-samples에 있습니다. 리포지토리에 저장합니다.
이 예시의 장점은 무슨 일이 일어나고 있는지 이해하기 쉽고 확장하기가 매우 간단하다는 것입니다. 여러분만의 변형은 더 복잡할 수 있지만, 이 예시를 통해 어떤 것이 가능하고 어떤 것을 기반으로 구축할 수 있는지에 대한 감을 잡을 수 있을 것입니다.
개선의 여지는 항상 존재합니다.
한 가지 문제는 MySQL이 메모리에 로드하려는 것보다 클 수 있다는 것입니다. Spark는 데이터 프레임을 분할하는 방법을 제공함으로써 이 문제를 해결합니다. 여기서는 그런 기능이 필요하지 않았고 샘플을 읽을 수 있기를 원했습니다. 또 다른 도움이 될 수 있는 것은 기존 스트리밍 컨텍스트 내에서 SparkContext를 참조할 수 있는 기능입니다. 현재 Spark는 여러 가지 이유로 이를 허용하지 않지만, 스트림 내부에서 단일 레코드 조회를 수행하는 이 간단한 사용 사례는 어느 정도 의미가 있다고 생각합니다.
현재 카우치베이스 커넥터에서 DCP 인터페이스는 휘발성으로 분류되어 있으며 실험적인 것으로 간주해야 합니다. 또한 위의 예는 매우 빠르지만 확장하는 데 약간의 도움이 필요합니다. 제 동료인 Sergey Avseyev가 곧 업데이트할 예정으로, 이 변환을 병렬화하기 위해 Spark 작업자 간에 DCP 스트림을 분할할 수 있게 될 것입니다.
결론
Spark는 이러한 종류의 전환을 위한 훌륭한 새 도구입니다. 관계형 데이터베이스와 같은 다른 데이터 소스에서 Couchbase로 마이그레이션할 때도 동일한 기술을 적용할 수 있습니다. 심지어 이 기술은 Spark의 머신 러닝을 통해 확장되어 결과를 예측하기 위해 Couchbase의 데이터 스트리밍을 중심으로 모델을 구축할 수도 있습니다.