1.数据集
A表数据:
1 a 2 b 3 c B表数据: 1 aa1 1 aa2 2 bb1 2 bb2 2 bb3 4 dd1
2.join的分类
inner join
left outer join
right outer join
full outer join
left semi join
3.集中join的结果
A inner join B:
1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3A left outer join B:
1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 3 c null nullA right outer join B:
1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 null null 4 dd1A full outer join B:
1 a 1 aa1 1 a 1 aa2 2 b 2 bb1 2 b 2 bb2 2 b 2 bb3 3 c null null null null 4 dd1A left semi join B:(。。。。。注意。。。。。。)
1 a 2 b
4.API
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))]
返回值是RDD,RDD中的类型是一个二元组(a),a第一个元素是KEY类型的值(join的key), a第二个元素又是二元组(b), b的第一个元素是来自调用join函数的RDD的value, b的第二个元素是来自参数other这个RDD的valuedef leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
对于右边的数据返回的是Option类型是数据,所以如果右表数据不存在,返回的是None;否则是一个Some的具体数据def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))]
对于左边的数据返回的是Option类型是数据,所以如果左表数据不存在,返回的是None;否则是一个Some的具体数据def fullOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], Option[W]))]
返回的value类型是Option封装后的数据,如果数据不存在, 返回的是None,存在返回的是Some具体数据
5.其他方式实现join
6.join程序以及非join实现join
1 package com.ibeifeng.senior.join 2 3 import org.apache.spark.{SparkConf, SparkContext} 4 5 /** 6 * RDD数据Join相关API讲解 7 * Created by ibf on 02/09. 8 */ 9 object RDDJoin { 10 def main(args: Array[String]): Unit = { 11 val conf = new SparkConf() 12 .setMaster("local[*]") 13 .setAppName("RDD-Join") 14 val sc = SparkContext.getOrCreate(conf) 15 16 // ==================具体代码====================== 17 // 模拟数据产生 18 val rdd1 = sc.parallelize(Array( 19 (1, "张三1"), 20 (1, "张三2"), 21 (2, "李四"), 22 (3, "王五"), 23 (4, "Tom"), 24 (5, "Gerry"), 25 (6, "莉莉") 26 ), 1) 27 28 val rdd2 = sc.parallelize(Array( 29 (1, "上海"), 30 (2, "北京1"), 31 (2, "北京2"), 32 (3, "南京"), 33 (4, "纽约"), 34 (6, "深圳"), 35 (7, "香港") 36 ), 1) 37 38 // 调用RDD API实现内连接 39 val joinResultRDD = rdd1.join(rdd2).map { 40 case (id, (name, address)) => { 41 (id, name, address) 42 } 43 } 44 println("----------------") 45 joinResultRDD.foreachPartition(iter => { 46 iter.foreach(println) 47 }) 48 // 调用RDD API实现左外连接 49 val leftJoinResultRDd = rdd1.leftOuterJoin(rdd2).map { 50 case (id, (name, addressOption)) => { 51 (id, name, addressOption.getOrElse("NULL")) 52 } 53 } 54 println("----------------") 55 leftJoinResultRDd.foreachPartition(iter => { 56 iter.foreach(println) 57 }) 58 // 左外连接稍微变化一下:需要左表出现,右表不出现的数据(not in) 59 println("----------------") 60 rdd1.leftOuterJoin(rdd2).filter(_._2._2.isEmpty).map { 61 case (id, (name, _)) => (id, name) 62 }.foreachPartition(iter => { 63 iter.foreach(println) 64 }) 65 66 // 右外连接 67 println("----------------") 68 rdd1 69 .rightOuterJoin(rdd2) 70 .map { 71 case (id, (nameOption, address)) => { 72 (id, nameOption.getOrElse("NULL"), address) 73 } 74 } 75 .foreachPartition(iter => iter.foreach(println)) 76 77 // 全外连接 78 println("----------------") 79 rdd1 80 .fullOuterJoin(rdd2) 81 .map { 82 case (id, (nameOption, addressOption)) => { 83 (id, nameOption.getOrElse("NULL"), addressOption.getOrElse("NULL")) 84 } 85 } 86 .foreachPartition(iter => iter.foreach(println)) 87 88 ///假设rdd2的数据比较少,将rdd2的数据广播出去/// 89 val leastRDDCollection = rdd2.collect() 90 val broadcastRDDCollection = sc.broadcast(leastRDDCollection) 93 // Inner Join 95 rdd1 96 // 过滤rdd1中的数据,只要在rdd1中出现的数据,没有出现的数据过滤掉 97 .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1)) 98 // 数据合并,由于一条rdd1的数据可能在rdd2中存在多条对应数据,所以使用fla tMap 99 .flatMap {100 case (id, name) => {101 broadcastRDDCollection.value.filter(_._1 == id).map {102 case (_, address) => {103 (id, name, address)104 }105 }106 }107 }108 .foreachPartition(iter => iter.foreach(println))109 110 // 左外连接111 println("---------------------")112 rdd1113 .flatMap {114 case (id, name) => {115 // 从右表所属的广播变量中获取对应id的集合列表116 val list = broadcastRDDCollection.value.filter(_._1 == id)117 // 对应id的集合可能为空,也可能数据有多个118 if (list.nonEmpty) {119 // 存在多个120 list.map(tuple => (id, name, tuple._2))121 } else {122 // id在右表中不存在,填默认值123 (id, name, "NULL") :: Nil124 }125 }126 }127 .foreachPartition(iter => iter.foreach(println))128 129 // 右外连接130 /**131 * rdd2中所有数据出现,由于rdd2中的数据在driver中可以存储,可以认为rdd1和rdd2通过right join之后的数据也可以在driver中保存下132 **/133 println("---------------------")134 // 将rdd1中符合条件的数据过滤出来保存到driver中135 val stage1 = rdd1136 .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))137 .collect()138 // 将driver中两个集合进行right join139 val stage2 = leastRDDCollection.flatMap {140 case (id, address) => {141 val list = stage1.filter(_._1 == id)142 if (list.nonEmpty) {143 list.map(tuple => (id, tuple._2, address))144 } else {145 Iterator.single((id, "NULL", address))146 }147 }148 }149 stage2.foreach(println)150 151 // TODO: 全外连接,不写代码,因为代码比较复杂152 153 //====================================154 // 左半连接:只出现左表数据(要求数据必须在右表中也出现过),如果左表的数据在右表中出现多次,最终结果只出现一次155 println("+++++++++++++++++")156 println("-----------------------")157 rdd1158 .join(rdd2)159 .map {160 case (id, (name, _)) => (id, name)161 }162 .distinct()163 .foreachPartition(iter => iter.foreach(println))164 println("------------------------")165 rdd1166 .filter(tuple => broadcastRDDCollection.value.map(_._1).contains(tuple._1))167 .foreachPartition(iter => iter.foreach(println))168 169 // 休眠为了看4040页面170 Thread.sleep(1000000)171 }172 }
6.