博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spark--广播变量(Broadcast)源码分析
阅读量:6656 次
发布时间:2019-06-25

本文共 5952 字,大约阅读时间需要 19 分钟。

  hot3.png

类关系

BroadcastManager类中包含一个BroadcastFactory对象的引用。大部分操作通过调用BroadcastFactory中的方法来实现。

BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory、HttpBroadcastFactory。这两个子类实现了对HttpBroadcast、TorrentBroadcast的封装,而后面两个又同时集成了Broadcast抽象类。

BroadcastManager的初始化

SparkContext初始化时会创建SparkEnv对象env,这个过程中会调用BroadcastManager的构造方法返回一个对象作为env的成员变量存在:

  • val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)  

构造BroadcastManager对象时会调用initialize方法,主要根据配置初始化broadcastFactory成员变量,并调用其initialize方法。

  • val broadcastFactoryClass =  

  •          conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")  

  •   

  •        broadcastFactory =  

  •          Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]  

  •   

  •        // Initialize appropriate BroadcastFactory and BroadcastObject  

  •        broadcastFactory.initialize(isDriver, conf, securityManager)  

两个工厂类的initialize方法都是对其相应实体类的initialize方法的调用,下面分开两个类来看。

HttpBroadcast的initialize方法

  • def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {  

  •   synchronized {  

  •     if (!initialized) {  

  •       bufferSize = conf.getInt("spark.buffer.size", 65536)  

  •       compress = conf.getBoolean("spark.broadcast.compress", true)  

  •       securityManager = securityMgr  

  •       if (isDriver) {  

  •         createServer(conf)  

  •         conf.set("spark.httpBroadcast.uri",  serverUri)  

  •       }  

  •       serverUri = conf.get("spark.httpBroadcast.uri")  

  •       cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)  

  •       compressionCodec = CompressionCodec.createCodec(conf)  

  •       initialized = true  

  •     }  

  •   }  

  • }  

除了一些变量的初始化外,主要做两件事情,一是createServer(只有在Driver端会做),其次是创建一个MetadataCleaner对象。

createServer

  • private def createServer(conf: SparkConf) {  

  •   broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))  

  •   server = new HttpServer(broadcastDir, securityManager)  

  •   server.start()  

  •   serverUri = server.uri  

  •   logInfo("Broadcast server started at " + serverUri)  

  • }  

首先创建一个存放广播变量的目录,默认是

  • conf.get("spark.local.dir",  System.getProperty("java.io.tmpdir")).split(',')(0)  

然后初始化一个HttpServer对象并启动(封装了jetty),启动过程中包括加载资源文件,起端口和线程用来监控请求等。这部分的细节在org.apache.spark.HttpServer类中,此处不做展开。

创建MetadataCleaner对象

一个MetadataCleaner对象包装了一个定时计划Timer,每隔一段时间执行一个回调函数,此处传入的回调函数为cleanup:

  • private def cleanup(cleanupTime: Long) {  

  •   val iterator = files.internalMap.entrySet().iterator()  

  •   while(iterator.hasNext) {  

  •     val entry = iterator.next()  

  •     val (file, time) = (entry.getKey, entry.getValue)  

  •     if (time < cleanupTime) {  

  •       iterator.remove()  

  •       deleteBroadcastFile(file)  

  •     }  

  •   }  

  • }  

即清楚存在吵过一定时长的broadcast文件。在时长未设定(默认情况)时,不清除:

  • if (delaySeconds > 0) {  

  •    logDebug(  

  •      "Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +  

  •      "and period of " + periodSeconds + " secs")  

  •    timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)  

  • }  

TorrentBroadcast的initialize方法

  • def initialize(_isDriver: Boolean, conf: SparkConf) {  

  •   TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests  

  •   synchronized {  

  •     if (!initialized) {  

  •       initialized = true  

  •     }  

  •   }  

  • }  

Torrent在此处没做什么,这也可以看出和Http的区别,Torrent的处理方式就是p2p,去中心化。而Http是中心化服务,需要启动服务来接受请求。

创建broadcast变量

调用SparkContext中的 def broadcast[T: ClassTag](value: T): Broadcast[T]方法来初始化一个广播变量,实现如下:

  • def broadcast[T: ClassTag](value: T): Broadcast[T] = {  

  •     val bc = env.broadcastManager.newBroadcast[T](value, isLocal)  

  •     cleaner.foreach(_.registerBroadcastForCleanup(bc))  

  •     bc  

  •   }  

即调用broadcastManager的newBroadcast方法:

  • def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {  

  •   broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())  

  • }  

再调用工厂类的newBroadcast方法,此处返回的是一个Broadcast对象。

HttpBroadcastFactory的newBroadcast

  • def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =  

  •   new HttpBroadcast[T](value_, isLocal, id)  

即创建一个新的HttpBroadcast对象并返回。

构造对象时主要做两件事情:

  • HttpBroadcast.synchronized {  

  •    SparkEnv.get.blockManager.putSingle(  

  •      blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)  

  • }  

  •   

  • if (!isLocal) {  

  •    HttpBroadcast.write(id, value_)  

  • }  

1.将变量id和值放入blockManager,但并不通知master

2.调用伴生对象的write方法

  • def write(id: Long, value: Any) {  

  •     val file = getFile(id)  

  •     val out: OutputStream = {  

  •       if (compress) {  

  •         compressionCodec.compressedOutputStream(new FileOutputStream(file))  

  •       } else {  

  •         new BufferedOutputStream(new FileOutputStream(file), bufferSize)  

  •       }  

  •     }  

  •     val ser = SparkEnv.get.serializer.newInstance()  

  •     val serOut = ser.serializeStream(out)  

  •     serOut.writeObject(value)  

  •     serOut.close()  

  •     files += file  

  •   }  

write方法将对象值按照指定的压缩、序列化写入指定的文件。这个文件所在的目录即是HttpServer的资源目录,文件名和id的对应关系为:

  • case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {  

  •   def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)  

  • }  

TorrentBroadcastFactory的newBroadcast方法

  • def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =  

  •   new TorrentBroadcast[T](value_, isLocal, id)  

同样是创建一个TorrentBroadcast对象,并返回。

  • TorrentBroadcast.synchronized {  

  •   SparkEnv.get.blockManager.putSingle(  

  •     broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)  

  • }  

  •   

  •   

  • if (!isLocal) {  

  •   sendBroadcast()  

  • }  

做两件事情,第一步和Http一样,第二步:

  • def sendBroadcast() {  

  •   val tInfo = TorrentBroadcast.blockifyObject(value_)  

  •   totalBlocks = tInfo.totalBlocks  

  •   totalBytes = tInfo.totalBytes  

  •   hasBlocks = tInfo.totalBlocks  

  •   

  •   // Store meta-info  

  •   val metaId = BroadcastBlockId(id, "meta")  

  •   val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)  

  •   TorrentBroadcast.synchronized {  

  •     SparkEnv.get.blockManager.putSingle(  

  •       metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)  

  •   }  

  •   

  •   // Store individual pieces  

  •   for (i <- 0 until totalBlocks) {  

  •     val pieceId = BroadcastBlockId(id, "piece" + i)  

  •     TorrentBroadcast.synchronized {  

  •       SparkEnv.get.blockManager.putSingle(  

  •         pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)  

  •     }  

  •   }  

  • }  

可以看出,先将元数据信息缓存到blockManager,再将块信息缓存过去。开头可以看到有一个分块动作,是调用伴生对象的blockifyObject方法:

  • def blockifyObject[T](obj: T): TorrentInfo  

此方法将对象obj分块(默认块大小为4M),返回一个TorrentInfo对象,第一个参数为一个TorrentBlock对象(包含blockID和block字节数组)、块数量以及obj的字节流总长度。

元数据信息中的blockId为广播变量id+后缀,value为总块数和总字节数。

数据信息是分块缓存,每块的id为广播变量id加后缀及块变好,数据位一个TorrentBlock对象

 

更多分享请关注: 扫一扫超人学院二维码:164633_1wFW_2273204.jpg

转载于:https://my.oschina.net/crxy/blog/410464

你可能感兴趣的文章
oracle基本操作语句(初学者语句)
查看>>
【Android必备】应用小部件概述(23)
查看>>
【Interface&navigation】材料设计(20)
查看>>
我要学python之生成器
查看>>
ubuntu 13.04 安装QQ
查看>>
IOS图片的拉伸技巧
查看>>
tomcat安装
查看>>
KVM虚拟化的部署及使用
查看>>
Linux软链接和硬链接文件
查看>>
semaphore.h
查看>>
java学习笔记 --- 网络编程(套接字)
查看>>
tkinter 03 Listbox 列表部件
查看>>
Linux磁盘管理命令介绍
查看>>
一锤定音:高通(Qualcomm)370亿美元收购NXP,成为全球第一大汽车芯片供应商...
查看>>
JVM工作原理学习笔记
查看>>
windows 共享访问相关问题
查看>>
DC的sysvol目录管理!
查看>>
apache 防盗链 与 地址重写
查看>>
python3版本mysql的操作
查看>>
登录式shell与非登录式shell
查看>>