视频:JDBCRDD源码及自定义JDBCRDD的分区策略


jdbcRDD虽然是鸡肋,但是也值得一讲。帮助大家更进一步理解RDD。

1,JDBCRDD使用

val data = new JdbcRDD(sc, getConnection

, "SELECT id,aa FROM bbb where ? <= ID AND ID <= ?", lowerBound = 3, upperBound =5, numPartitions = 1, mapRow = extractValues)

参数解释:

1,sparkcontext。

2,一个创建链接的函数。

3,sql。必须有? <= ID AND ID <= ?。

4,要取数据的id最小行。

5,要取数据的id最大行号。

6,分区数。

7,一个将ResultSet转化为需要类型的方法。

2,JdbcRDD的getPartition方法

override def getPartitions: Array[Partition] = { // bounds are inclusive, hence the + 1 here and - 1 on end val length = BigInt(1) + upperBound - lowerBound (0 until numPartitions).map(i => { val start = lowerBound + ((i * length) / numPartitions) val end = lowerBound + (((i + 1) * length) / numPartitions) - 1 new JdbcPartition(i, start.toLong, end.toLong) }).toArray }

3,JdbcRDD的compute方法

就是一个通过jdbc获取指定范围数据的过程。

val part = thePart.asInstanceOf[JdbcPartition] val conn = getConnection() val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY) stmt.setLong(1, part.lower) stmt.setLong(2, part.upper) val rs = stmt.executeQuery()

4,重写JDBC方法

重写分区的方法即可。

如:

CustomizedJdbcRDD[T: ClassTag]( sc: SparkContext, getConnection: () => Connection, sql: String, getCustomizedPartitions: () => Array[Partition], prepareStatement: (PreparedStatement, CustomizedJdbcPartition) => PreparedStatement, mapRow: (ResultSet) => T = CustomizedJdbcRDD.resultSetToObjectArray _)

同时把getPartition方法重写为:

override def getPartitions: Array[Partition] = { getCustomizedPartitions(); }

视频内容
优质内容筛选与推荐>>
1、归途,奋斗的起点
2、CentOS 6环境安装 iRedmail 邮件服务器及配置过程
3、【翻译】Ext JS最新技巧——2016-3-4
4、java jdbc数据库操作
5、并行计算框架-hama,Ubuntu下伪分布安装


长按二维码向我转账

受苹果公司新规定影响,微信 iOS 版的赞赏功能被关闭,可通过二维码转账支持公众号。

    阅读
    好看
    已推荐到看一看
    你的朋友可以在“发现”-“看一看”看到你认为好看的文章。
    已取消,“好看”想法已同步删除
    已推荐到看一看 和朋友分享想法
    最多200字,当前共 发送

    已发送

    朋友将在看一看看到

    确定
    分享你的想法...
    取消

    分享想法到看一看

    确定
    最多200字,当前共

    发送中

    网络异常,请稍后重试

    微信扫一扫
    关注该公众号