类关系
BroadcastManager类中包含一个BroadcastFactory对象的引用。大部分操作通过调用BroadcastFactory中的方法来实现。
BroadcastFactory是一个Trait,有两个直接子类TorrentBroadcastFactory、HttpBroadcastFactory。这两个子类实现了对HttpBroadcast、TorrentBroadcast的封装,而后面两个又同时集成了Broadcast抽象类。
BroadcastManager的初始化
SparkContext初始化时会创建SparkEnv对象env,这个过程中会调用BroadcastManager的构造方法返回一个对象作为env的成员变量存在:
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
构造BroadcastManager对象时会调用initialize方法,主要根据配置初始化broadcastFactory成员变量,并调用其initialize方法。
val broadcastFactoryClass =
conf.get("spark.broadcast.factory", "org.apache.spark.broadcast.HttpBroadcastFactory")
broadcastFactory =
Class.forName(broadcastFactoryClass).newInstance.asInstanceOf[BroadcastFactory]
// Initialize appropriate BroadcastFactory and BroadcastObject
broadcastFactory.initialize(isDriver, conf, securityManager)
两个工厂类的initialize方法都是对其相应实体类的initialize方法的调用,下面分开两个类来看。
HttpBroadcast的initialize方法def initialize(isDriver: Boolean, conf: SparkConf, securityMgr: SecurityManager) {
synchronized {
if (!initialized) {
bufferSize = conf.getInt("spark.buffer.size", 65536)
compress = conf.getBoolean("spark.broadcast.compress", true)
securityManager = securityMgr
if (isDriver) {
createServer(conf)
conf.set("spark.httpBroadcast.uri", serverUri)
}
serverUri = conf.get("spark.httpBroadcast.uri")
cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf)
compressionCodec = CompressionCodec.createCodec(conf)
initialized = true
}
}
}
除了一些变量的初始化外,主要做两件事情,一是createServer(只有在Driver端会做),其次是创建一个MetadataCleaner对象。
createServer
private def createServer(conf: SparkConf) {
broadcastDir = Utils.createTempDir(Utils.getLocalDir(conf))
server = new HttpServer(broadcastDir, securityManager)
server.start()
serverUri = server.uri
logInfo("Broadcast server started at " + serverUri)
}
首先创建一个存放广播变量的目录,默认是
conf.get("spark.local.dir", System.getProperty("java.io.tmpdir")).split(',')(0)
然后初始化一个HttpServer对象并启动(封装了jetty),启动过程中包括加载资源文件,起端口和线程用来监控请求等。这部分的细节在org.apache.spark.HttpServer类中,此处不做展开。
创建MetadataCleaner对象
一个MetadataCleaner对象包装了一个定时计划Timer,每隔一段时间执行一个回调函数,此处传入的回调函数为cleanup:
private def cleanup(cleanupTime: Long) {
val iterator = files.internalMap.entrySet().iterator()
while(iterator.hasNext) {
val entry = iterator.next()
val (file, time) = (entry.getKey, entry.getValue)
if (time < cleanupTime) {
iterator.remove()
deleteBroadcastFile(file)
}
}
}
即清楚存在吵过一定时长的broadcast文件。在时长未设定(默认情况)时,不清除:
if (delaySeconds > 0) {
logDebug(
"Starting metadata cleaner for " + name + " with delay of " + delaySeconds + " seconds " +
"and period of " + periodSeconds + " secs")
timer.schedule(task, periodSeconds * 1000, periodSeconds * 1000)
}
def initialize(_isDriver: Boolean, conf: SparkConf) {
TorrentBroadcast.conf = conf // TODO: we might have to fix it in tests
synchronized {
if (!initialized) {
initialized = true
}
}
}
Torrent在此处没做什么,这也可以看出和Http的区别,Torrent的处理方式就是p2p,去中心化。而Http是中心化服务,需要启动服务来接受请求。
创建broadcast变量
调用SparkContext中的 def broadcast[T: ClassTag](value: T): Broadcast[T]方法来初始化一个广播变量,实现如下:
def broadcast[T: ClassTag](value: T): Broadcast[T] = {
val bc = env.broadcastManager.newBroadcast[T](value, isLocal)
cleaner.foreach(_.registerBroadcastForCleanup(bc))
bc
}
即调用broadcastManager的newBroadcast方法:
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean) = {
broadcastFactory.newBroadcast[T](value_, isLocal, nextBroadcastId.getAndIncrement())
}
再调用工厂类的newBroadcast方法,此处返回的是一个Broadcast对象。
HttpBroadcastFactory的newBroadcast
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new HttpBroadcast[T](value_, isLocal, id)
即创建一个新的HttpBroadcast对象并返回。
构造对象时主要做两件事情:
HttpBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
blockId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
if (!isLocal) {
HttpBroadcast.write(id, value_)
}
1.将变量id和值放入blockManager,但并不通知master
2.调用伴生对象的write方法
def write(id: Long, value: Any) {
val file = getFile(id)
val out: OutputStream = {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
val serOut = ser.serializeStream(out)
serOut.writeObject(value)
serOut.close()
files += file
}
write方法将对象值按照指定的压缩、序列化写入指定的文件。这个文件所在的目录即是HttpServer的资源目录,文件名和id的对应关系为:
case class BroadcastBlockId(broadcastId: Long, field: String = "") extends BlockId {
def name = "broadcast_" + broadcastId + (if (field == "") "" else "_" + field)
}
def newBroadcast[T: ClassTag](value_ : T, isLocal: Boolean, id: Long) =
new TorrentBroadcast[T](value_, isLocal, id)
同样是创建一个TorrentBroadcast对象,并返回。
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
broadcastId, value_, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
}
if (!isLocal) {
sendBroadcast()
}
做两件事情,第一步和Http一样,第二步:
def sendBroadcast() {
val tInfo = TorrentBroadcast.blockifyObject(value_)
totalBlocks = tInfo.totalBlocks
totalBytes = tInfo.totalBytes
hasBlocks = tInfo.totalBlocks
// Store meta-info
val metaId = BroadcastBlockId(id, "meta")
val metaInfo = TorrentInfo(null, totalBlocks, totalBytes)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
metaId, metaInfo, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
// Store individual pieces
for (i <- 0 until totalBlocks) {
val pieceId = BroadcastBlockId(id, "piece" + i)
TorrentBroadcast.synchronized {
SparkEnv.get.blockManager.putSingle(
pieceId, tInfo.arrayOfBlocks(i), StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
}
}
可以看出,先将元数据信息缓存到blockManager,再将块信息缓存过去。开头可以看到有一个分块动作,是调用伴生对象的blockifyObject方法:
def blockifyObject[T](obj: T): TorrentInfo
此方法将对象obj分块(默认块大小为4M),返回一个TorrentInfo对象,第一个参数为一个TorrentBlock对象(包含blockID和block字节数组)、块数量以及obj的字节流总长度。
元数据信息中的blockId为广播变量id+后缀,value为总块数和总字节数。
数据信息是分块缓存,每块的id为广播变量id加后缀及块变好,数据位一个TorrentBlock对象
更多分享请关注: 扫一扫超人学院二维码: