首页 首页 大数据 查看内容

当MongoDB遇见Spark

木马童年 2019-2-13 12:20 39 0

传统Spark生态系统 和 MongoDB在Spark生态的角色 传统Spark生态系统 Spark生态系统 那么Mongodb作为一个database, 可以担任什么样的角色呢? 就是数据存储这部分, 也就是图中的黑色圈圈HDFS的部分, 如下图 用MongoD ...

传统Spark生态系统 和 MongoDB在Spark生态的角色

传统Spark生态系统

当MongoDB遇见Spark

Spark生态系统

那么Mongodb作为一个database, 可以担任什么样的角色呢? 就是数据存储这部分, 也就是图中的黑色圈圈HDFS的部分, 如下图

用MongoDB替换HDFS后的Spark生态系统

当MongoDB遇见Spark

Spark+Mongodb生态系统

为什么要用MongoDB替换HDFS

存储方式上, HDFS以文件为单位,每个文件64MB~128MB不等, 而MongoDB作为文档数据库则表现得更加细颗粒化

MongoDB支持HDFS所没有的索引的概念, 所以在读取上更加快

MongoDB支持的增删改功能比HDFS更加易于修改写入后的数据

HDFS的响应级别为分钟, 而MongoDB通常是毫秒级别

如果现有数据库已经是MongoDB的话, 那就不用再转存一份到HDFS上了

可以利用MongoDB强大的Aggregate做数据的筛选或预处理

MongoDB Spark Connector介绍

支持读取和写入,即可以将计算后的结果写入MongoDB

将查询拆分为n个子任务, 如Connector会将一次match,拆分为多个子任务交给spark来处理, 减少数据的全量读取

MongoDB Spark 示例代码

计算用类型Type=1的message字符数并按userid进行分组

开发Maven dependency配置

这里用的是mongo-spark-connector_2.11 的2.0.0版本和spark的spark-core_2.11的2.0.2版本:

org.mongodb.spark

mongo-spark-connector_2.11

2.0.0

org.apache.spark

spark-core_2.11

2.0.2

示例代码

import com.mongodb.spark._

import org.apache.spark.{SparkConf, SparkContext}

import org.bson._

val conf = new SparkConf()

.setMaster("local")

.setAppName("Mingdao-Score")

//同时还支持mongo驱动的readPreference配置, 可以只从secondary读取数据

.set("spark.mongodb.input.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/inputDB.collectionName")

.set("spark.mongodb.output.uri", "mongodb://xxx.xxx.xxx.xxx:27017,xxx.xxx.xxx:27017,xxx.xxx.xxx:27017/outputDB.collectionName")

val sc = new SparkContext(conf)

// 创建rdd

val originRDD = MongoSpark.load(sc)

// 构造查询

val dateQuery = new BsonDocument()

.append("$gte", new BsonDateTime(start.getTime))

.append("$lt", new BsonDateTime(end.getTime))

val matchQuery = new Document("$match", BsonDocument.parse("{\"type\":\"1\"}"))

// 构造Projection

val projection1 = new BsonDocument("$project", BsonDocument.parse("{\"userid\":\"$userid\",\"message\":\"$message\"}")

val aggregatedRDD = originRDD.withPipeline(Seq(matchQuery, projection1))

//比如计算用户的消息字符数

val rdd1 = aggregatedRDD.keyBy(x=>{

Map(

"userid" -> x.get("userid")

)

})

val rdd2 = rdd1.groupByKey.map(t=>{

(t._1, t._2.map(x => {

x.getString("message").length

}).sum)

})

rdd2.collect().foreach(x=>{

println(x)

})

//保持统计结果至MongoDB outputurl 所指定的数据库

MongoSpark.save(rdd2)

总结

MongoDB Connector 的文档只有基础的示例代码, 具体详情需要看GitHub中的example和部分源码。

参考链接

MongoDB Connector for Spark官方文档:https://docs.mongodb.com/spark-connector/v2.0/

Mongo Spark 源码:https://github.com/mongodb/mongo-spark

欢迎加入本站公开兴趣群

软件开发技术群

兴趣范围包括:Java,C/C++,Python,PHP,Ruby,shell等各种语言开发经验交流,各种框架使用,外包项目机会,学习、培训、跳槽等交流

QQ群:26931708

在不久的将来,多智时代一定会彻底走入我们的生活,有兴趣入行未来前沿产业的朋友,可以收藏多智时代,及时获取人工智能、大数据、云计算和物联网的前沿资讯和基础知识,让我们一起携手,引领人工智能的未来!

数据存储 数据库
0
为您推荐
大数据技术改变城市的运作方式,智慧城市呼之欲出

大数据技术改变城市的运作方式,智慧城市呼

纽奥良虽像大多数城市一样有火灾侦测器安装计划,但直到最近还是要由市民主动申装。纽…...

大数据分析面临生死边缘,未来之路怎么走?

大数据分析面临生死边缘,未来之路怎么走?

大数据分析开始朝着营销落地,尤其像数果智能这类服务于企业的大数据分析供应商,不仅…...

什么是工业大数据,要通过3B和3C来理解?

什么是工业大数据,要通过3B和3C来理解?

核心提示:工业视角的转变如果说前三次工业革命分别从机械化、规模化、标准化、和自动…...

大数据普及为什么说肥了芯片厂商?

大数据普及为什么说肥了芯片厂商?

科技界默默无闻的存在,芯片行业年规模增长到了3520亿美元。半导体给无人驾驶汽车带来…...

大数据技术有哪些,为什么说云计算能力是大数据的根本!

大数据技术有哪些,为什么说云计算能力是大

历史规律告诉我们,任何一次大型技术革命,早期人们总是高估它的影响,会有一轮一轮的…...

个人征信牌照推迟落地,大数据 重新定义个人信用!!

个人征信牌照推迟落地,大数据 重新定义个

为金融学的基础正日益坚实。通过互联网大数据精准记录海量个人行为,进而形成分析结论…...