Lançamento do Spark Connector 1.0.0
Depois de duas prévias para desenvolvedores e uma versão beta, estou muito feliz em anunciar a primeira versão estável do nosso Couchbase Spark Connector. O momento não é coincidência, já que na próxima semana Spark Summit Europa 2015 está acontecendo em Amsterdã. Estamos patrocinando o evento e, como resultado, você poderá encontrar a mim e a meus colegas no estande do Couchbase!
Essa versão estável marca o fim das grandes mudanças significativas, trazendo estabilidade para a API e um caminho claro para o futuro. Se você ainda não leu os anúncios anteriores, a postagem a seguir oferece uma visão geral dos recursos e capacidades.
O Conector é distribuído a partir do Maven Central (assim como o spark-packages.org), portanto, se quiser fazer experiências com ele usando o spark-shell, isso é tudo de que você precisa para começar a trabalhar:
Para abrir seu apetite, aqui está um exemplo de código completo que você pode executar em nosso conjunto de dados "travel-sample". Ele usa Spark SQL para criar um quadro de dados para todas as companhias aéreas (com base em um predicado que você especificar) e, em seguida, seleciona alguns campos e aplica a ordenação, bem como um limite:
Isso imprime:
Em poucas linhas de código, você pode executar todos os tipos de consultas para análise de dados, ETL ou aprendizado de máquina sobre o Couchbase. Para mim, isso é fantástico - se você também gosta disso, continue lendo para saber todos os detalhes.
A propósito, a documentação completa pode ser encontrada em aqui.
Spark Core - A base escalável
A API de menor contato com o usuário no Spark é a RDD (conjuntos de dados distribuídos resilientes). Trata-se basicamente de uma coleção de dados que o Spark distribui por todo o cluster. Como o Spark é uma máquina de processamento de big data, mas não um banco de dados, ele precisa de mecanismos para criar RDDs, bem como para persistir RDDs no final dos cálculos. Para ajudar com isso, o Couchbase fornece:
- API para criar RDDs por meio de KeyValue, Views e N1QL
- Persistir RDDs em um bucket do Couchbase por meio de KeyValue
A documentação detalhada para essas tarefas está disponível aqui. Os exemplos de código a seguir mostram como criar RDDs facilmente, bem como mantê-los. Observe que esses exemplos esperam apenas que um SparkContext esteja disponível.
E aqui um exemplo mais complicado que lê todas as companhias aéreas, executa uma contagem clássica de palavras em seus nomes, agrega os resultados e os armazena em um documento no cluster do Couchbase:
Como você pode imaginar, muitas coisas estão acontecendo nos bastidores. A API é transformada em consultas do Couchbase, mas o mais importante é que o conector lida com os recursos de forma totalmente transparente. Como seus cálculos serão executados em trabalhadores arbitrários no cluster, o conector abre conexões onde for necessário de forma eficiente. Portanto, você só precisa informar ao Spark o que buscar ou persistir - o conector cuidará do resto.
Se você executar workers do Spark lado a lado com nós do Couchbase, o conector tentará indicar o worker adequado para operações KeyValue (novamente, de forma transparente). Dessa forma, as operações caras de embaralhamento da rede são reduzidas, o que leva a um desempenho ainda melhor em tais configurações. Observe que essa é uma otimização pura, você pode executar qualquer topologia que desejar e ela simplesmente funcionará.
Spark SQL - Uma história de amor com o N1QL
O Spark SQL é um módulo para trabalhar com dados estruturados. Ele permite que o usuário coloque um esquema sobre um RDD, que é então chamado de DataFrame (anteriormente SchemaRDD). Como o Spark agora tem informações sobre a estrutura dos dados com os quais está trabalhando, ele pode aplicar todos os tipos de transformações e otimizações.
O Couchbase Server 4.0 inclui a novíssima linguagem de consulta N1QL, que se integra perfeitamente às APIs Spark SQL. Há apenas um problema: os documentos armazenados no Couchbase não precisam aderir a um esquema específico - esse é um de seus recursos. Então, como podemos trazer estrutura em um mundo sem esquema?
A resposta para isso é a inferência automática de esquema. Se você criar um DataFrame em cima do Couchbase, precisará fornecer um "schemaFilter" que, por sua vez, criará internamente um predicado. Em seguida, carregaremos vários documentos com esse predicado e inferiremos o esquema a partir daí. O exemplo a seguir mostra como criar um DataFrame para companhias aéreas no intervalo "travel-sample", que são identificadas pelo atributo type no próprio documento:
Isso imprime:
Se seus documentos forem mais ou menos semelhantes, essa abordagem funcionará bem. Se os documentos não tiverem nenhum esquema, de modo que cada documento tenha uma aparência muito diferente, você também poderá fornecer o esquema manualmente. Dessa forma, você especifica apenas os campos potencialmente necessários:
Por fim, se isso ainda não funcionar, você sempre poderá recorrer a uma consulta RDD e gerar um DataFrame a partir dos resultados:
Isso imprime:
Você pode ver como ele detecta até mesmo a estrutura recursiva de objetos e matrizes JSON. Isso também pode ser utilizado no momento da consulta, proporcionando flexibilidade tanto na modelagem de dados quanto na consulta.
Agora que o DataFrame foi criado, você pode realizar todos os tipos de consultas com ele:
Isso imprime:
Aqui está um exemplo diferente que mostra como você pode criar um DataFrame do HDFS e uni-lo às linhas do Couchbase:
Uma parte importante disso também é tratada de forma oculta: os campos e predicados necessários são transferidos para o mecanismo de consulta N1QL no servidor, de modo que só computamos e transferimos os dados essenciais, o que permite uma rede mais eficiente e o manuseio de recursos da CPU.
Spark Streaming - In-N-Out em tempo real (suave)
O Spark Streaming traz uma abordagem de streaming de microbatch para o Spark, permitindo que você execute aplicativos de batch e streaming em um único sistema. O Couchbase permite que você mantenha esses fluxos no Couchbase, além de criar (experimentalmente) esse fluxo por meio de seu protocolo interno de alteração de documentos (DCP).
A persistência de um DStream funciona da mesma forma que a persistência de um RDD - você só precisa usar a importação implícita correta e convertê-la em uma representação de documento. Os exemplos a seguir mostram como persistir o conteúdo dos tweets em um feed do Twitter no Couchbase:
Você pode encontrar mais informações sobre o suporte do Spark Streaming aqui.
O caminho a seguir
Foi importante conseguir lançar essa primeira versão estável. A próxima versão (1.1) trará compatibilidade oficial com o Spark 1.5, além de outros aprimoramentos e correções de estabilidade. Como sempre, experimente o conector e dê seu feedback sobre o que você acha que deve ser melhorado.
Feliz hacking, sem bugs e com operações rápidas de embaralhamento!
Hi.
Qual seria a aparência desse código no databricks? Se você executar atualmente, haverá um erro: os desenvolvedores devem utilizar o SparkContext compartilhado em vez de criar um usando o construtor. Nos notebooks Scala e Python, o contexto compartilhado pode ser acessado como sc. Ao executar um trabalho, você pode acessar o contexto compartilhado chamando SparkContext.getOrCreate()
Código ao qual estou me referindo:
// Gerar o contexto genérico do Spark
val sc = new SparkContext(new SparkConf().setAppName(\"example\")
.setMaster(\"local[*]\")
.set(\"com.couchbase.bucket.travel-sample\", \"\"))
// Configuração do Spark SQL
val sql = new SQLContext(sc)
// Criar um DataFrame com inferência de esquema
val airlines = sql.read.couchbase(schemaFilter = EqualTo(\"type\", \"airline\"))
// Executar a consulta
companhias aéreas
.select(\"name\", \"iata\", \"icao\")
.sort(airlines(\"name\").asc)
.limit(5)
.show()
Obrigado,
Marca