博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
029 RDD Join相关API,以及程序
阅读量:6589 次
发布时间:2019-06-24

本文共 6381 字,大约阅读时间需要 21 分钟。

1.数据集  

  A表数据:

    1 a
    2 b
    3 c
  B表数据:
    1 aa1
    1 aa2
    2 bb1
    2 bb2
    2 bb3
    4 dd1

 

2.join的分类

  inner join

  left outer join

  right outer join

  full outer join

  left semi join

  

 

3.集中join的结果

  A inner join B:

    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3

  A left outer join B:

    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3
    3 c null null

  A right outer join B:

    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3
    null null 4 dd1

  A full outer join B:

    1 a 1 aa1
    1 a 1 aa2
    2 b 2 bb1
    2 b 2 bb2
    2 b 2 bb3
    3 c null null
    null null 4 dd1

  A left semi join B:(。。。。。注意。。。。。。)

    1 a
    2 b

 

4.API  

def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]

  返回值是RDD,RDD中的类型是一个二元组(a),a第一个元素是KEY类型的值(join的key), a第二个元素又是二元组(b), b的第一个元素是来自调用join函数的RDD的value,
  b的第二个元素是来自参数other这个RDD的value

def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]

  对于右边的数据返回的是Option类型是数据,所以如果右表数据不存在,返回的是None;否则是一个Some的具体数据

def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]

  对于左边的数据返回的是Option类型是数据,所以如果左表数据不存在,返回的是None;否则是一个Some的具体数据

def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]

  返回的value类型是Option封装后的数据,如果数据不存在, 返回的是None,存在返回的是Some具体数据

 

5.其他方式实现join

  

 

6.join程序以及非join实现join

1 package com.ibeifeng.senior.join  2   3 import org.apache.spark.{SparkConf, SparkContext}  4   5 /**  6   * RDD数据Join相关API讲解  7   * Created by ibf on 02/09.  8   */  9 object RDDJoin { 10   def main(args: Array[String]): Unit = { 11     val conf = new SparkConf() 12       .setMaster("local[*]") 13       .setAppName("RDD-Join") 14     val sc = SparkContext.getOrCreate(conf) 15  16     // ==================具体代码====================== 17     // 模拟数据产生 18     val rdd1 = sc.parallelize(Array( 19       (1, "张三1"), 20       (1, "张三2"), 21       (2, "李四"), 22       (3, "王五"), 23       (4, "Tom"), 24       (5, "Gerry"), 25       (6, "莉莉") 26     ), 1) 27  28     val rdd2 = sc.parallelize(Array( 29       (1, "上海"), 30       (2, "北京1"), 31       (2, "北京2"), 32       (3, "南京"), 33       (4, "纽约"), 34       (6, "深圳"), 35       (7, "香港") 36     ), 1) 37  38     // 调用RDD API实现内连接 39     val joinResultRDD = rdd1.join(rdd2).map { 40       case (id, (name, address)) => { 41         (id, name, address) 42       } 43     } 44     println("----------------") 45     joinResultRDD.foreachPartition(iter => { 46       iter.foreach(println) 47     }) 48     // 调用RDD API实现左外连接 49     val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map { 50       case (id, (name, addressOption)) => { 51         (id, name, addressOption.getOrElse("NULL")) 52       } 53     } 54     println("----------------") 55     leftJoinResultRDd.foreachPartition(iter => { 56       iter.foreach(println) 57     }) 58     // 左外连接稍微变化一下:需要左表出现,右表不出现的数据(not in) 59     println("----------------") 60     rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map { 61       case (id, (name, _)) => (id, name) 62     }.foreachPartition(iter => { 63       iter.foreach(println) 64     }) 65  66     // 右外连接 67     println("----------------") 68     rdd1 69       .rightOuterJoin(rdd2) 70       .map { 71         case (id, (nameOption, address)) => { 72           (id, nameOption.getOrElse("NULL"), address) 73         } 74       } 75       .foreachPartition(iter => iter.foreach(println)) 76  77     // 全外连接 78     println("----------------") 79     rdd1 80       .fullOuterJoin(rdd2) 81       .map { 82         case (id, (nameOption, addressOption)) => { 83           (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL")) 84         } 85       } 86       .foreachPartition(iter => iter.foreach(println)) 87  88     ///假设rdd2的数据比较少,将rdd2的数据广播出去/// 89     val leastRDDCollection = rdd2.collect() 90     val broadcastRDDCollection = sc.broadcast(leastRDDCollection) 93     // Inner Join 95     rdd1 96       // 过滤rdd1中的数据,只要在rdd1中出现的数据,没有出现的数据过滤掉 97       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1)) 98       // 数据合并,由于一条rdd1的数据可能在rdd2中存在多条对应数据,所以使用fla  tMap 99       .flatMap {100       case (id, name) => {101         broadcastRDDCollection.value.filter(_._1 == id).map {102           case (_, address) => {103             (id, name, address)104           }105         }106       }107     }108       .foreachPartition(iter => iter.foreach(println))109 110     // 左外连接111     println("---------------------")112     rdd1113       .flatMap {114         case (id, name) => {115           // 从右表所属的广播变量中获取对应id的集合列表116           val list = broadcastRDDCollection.value.filter(_._1 == id)117           // 对应id的集合可能为空,也可能数据有多个118           if (list.nonEmpty) {119             // 存在多个120             list.map(tuple => (id, name, tuple._2))121           } else {122             // id在右表中不存在,填默认值123             (id, name, "NULL") :: Nil124           }125         }126       }127       .foreachPartition(iter => iter.foreach(println))128 129     // 右外连接130     /**131       * rdd2中所有数据出现,由于rdd2中的数据在driver中可以存储,可以认为rdd1和rdd2通过right join之后的数据也可以在driver中保存下132       **/133     println("---------------------")134     // 将rdd1中符合条件的数据过滤出来保存到driver中135     val stage1 = rdd1136       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))137       .collect()138     // 将driver中两个集合进行right join139     val stage2 = leastRDDCollection.flatMap {140       case (id, address) => {141         val list = stage1.filter(_._1 == id)142         if (list.nonEmpty) {143           list.map(tuple => (id, tuple._2, address))144         } else {145           Iterator.single((id, "NULL", address))146         }147       }148     }149     stage2.foreach(println)150 151     // TODO: 全外连接,不写代码,因为代码比较复杂152   153     //====================================154     // 左半连接:只出现左表数据(要求数据必须在右表中也出现过),如果左表的数据在右表中出现多次,最终结果只出现一次155     println("+++++++++++++++++")156     println("-----------------------")157     rdd1158       .join(rdd2)159       .map {160         case (id, (name, _)) => (id, name)161       }162       .distinct()163       .foreachPartition(iter => iter.foreach(println))164     println("------------------------")165     rdd1166       .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))167       .foreachPartition(iter => iter.foreach(println))168 169     // 休眠为了看4040页面170         Thread.sleep(1000000)171   }172 }

 

6.

 

转载地址:http://pukio.baihongyu.com/

你可能感兴趣的文章
【开发问题记录①】关于滑动CollectionView时ContentSize变化的问题
查看>>
java中GC的基本概念
查看>>
building xxx gradle project info的解决办法
查看>>
Vagrant (一) - 基本知识
查看>>
在 CentOS 7 上搭建 Jenkins + Maven + Git 持续集成环境
查看>>
数据结构与算法 | Leetcode 19. Remove Nth Node From End of List
查看>>
一起来读you don't know javascript(一)
查看>>
[LeetCode] 862. Shortest Subarray with Sum at Least K
查看>>
【分享】终端命令工具 自动生成vue组件文件以及修改router.js
查看>>
[LeetCode] Student Attendance Record I
查看>>
PHP回顾之多进程编程
查看>>
spring boot + redis
查看>>
Ajax技术细节
查看>>
nuxt.js部署vue应用到服务端过程
查看>>
删除数组中的指定元素 | JavaScript
查看>>
CSS3+JS实现静态圆形进度条【清晰、易懂】
查看>>
关于树形插件展示中数据结构转换的算法
查看>>
图片加载框架之Fresco
查看>>
Spotify开源其Cassandra编排工具cstar
查看>>
高性能web建站规则(将js放在页面底部)
查看>>