摘要:本文主要向大家介绍了【云计算】SparkStreaming解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
本文主要向大家介绍了【云计算】SparkStreaming解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。
1.UpdateBykey和mapWithState
俩个实现的功能都是累加,但是updateBykey是1.6版本之前的,mapWithState是之后的并且更加实用!!!源码记得去看
https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala
2.transform
(1)作用:DStream整合RDD
package Streaming02
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object LeftJoinStramingApp {
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setMaster("local[2]").setAppName("LeftJoinStramingApp")
//这三个真是一个都不能少
val ssc=new StreamingContext(sparkconf,Seconds(10))
//数据二:rdd
val input2=new ListBuffer[(String,Boolean)]
input2.append(("www.baidu.com",true))
val data2=ssc.sparkContext.parallelize(input2) //构建rdd是sparkContext的,但是我们没有,所以调用一个再parallelize
//因为下面的调用是先创建在调用,所以得移到上面来
//数据一:nc -lk 9999 过来
val lines=ssc.socketTextStream("192.168.137.251",9999)//创建输入流
lines.map(x=>(x.split(",")(0),x)) .transform(rdd=>{
rdd.leftOuterJoin(data2).filter(x=>{ //这一步就是DS结合RDD的操作!!!!这是一个实时过滤,
会有个test给你处理,你需要将他过滤,然而这不是最好的,最好的直接将test广播变量出去,然后直接filter就好
x._2._2.getOrElse(false)!=true
}).map(_._2._1)
}).print()
//这是一个DStream,DStram是由一系列的RDD构成的不加x的时候表示分割去除第一个值,加了x之后表示成为key,value的类型
ssc.start()
ssc.awaitTermination()
}
}
3.做leftjoin取出需要的数据!这是RddJoin
package Streaming02
import org.apache.spark.{SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
object LeftJoinApp {
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setAppName("LeftJoinApp").setMaster("local[2]")
//setMaster不能乱写!要记得为什么不能这样写?
val sc=new SparkContext(sparkconf)
//数据一。域名,和流量
val input1=new ListBuffer[(String,Long)] //本地创建数据
input1.append(("www.ruozedata.com",8888))
input1.append(("www.ruozedata.com",9999))
input1.append(("www.baidu.com",7777))
val data1=sc.parallelize(input1) //装成RDD
//数据二
val inpu2=new ListBuffer[(String,Boolean)]
inpu2.append(("www.baidu.com",true))
val data2=sc.parallelize(inpu2)
data1.leftOuterJoin(data2) //这个得到是左边表的所有数据但这不是我们想要的
.filter(x=>{
x._2._2.getOrElse(false)!=true //过滤取出第二个不是true的值,但还有none的值
})
.map(x=>(x._1,x._2._1)) //过滤之后直接根据数据模型来取出值
.collect().foreach(println)
sc.stop()
}
}
4.foreachRDD
作用:一般在输出的时候用,例如写到MySQL中去写数据库都是这样的
package Streaming02
import java.sql.DriverManager
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
object foreachRDDApp {
def main(args: Array[String]): Unit = {
val sparkconf=new SparkConf().setAppName("foreacRDDApp").setMaster("local[2]")
val ssc=new StreamingContext(sparkconf,Seconds(10))
//WC写到MySQL里面去
val lines=ssc.socketTextStream("192.168.137.251",9998)
val re=lines.flatMap(_.split(",")).map((_,1))reduceByKey(_+_)
//re结果写入MySQL
/*re.foreachRDD{rdd=>
//val con=createConnection() //将connection放在driver端,这是有问题的因为,connection应该是在work上的
rdd.foreach{ rec=>
val con=createConnection() //将connection放在excutor端来,这也有问题,如果是一百万的数据,那需要多少个connection
val word=rec._1
val count=rec._2
val sql=s"insert into wc(word,c) values('$word',$count)"
con.createStatement().execute(sql)
con.close()
}
}*/
re.foreachRDD{ rdd=>
rdd.foreachPartition{ //这是工作中最常用的方式
par=> val con=createConnection()
par.foreach(rec=>{
val word=rec._1
val count=rec._2
val sql=s"insert into wc(word,c) values('$word',$count)"
con.createStatement().execute(sql)
})
con.close()
}
}
ssc.start()
ssc.awaitTermination()
}
def createConnection()={
Class.forName("com.mysql.jdbc.Driver")
DriverManager.getConnection("jdbc:mysql://192.168.137.251:3306/g3","root","123456")
//注意这个url的格式!!!!!
}
}
(2)最好的方法是ConnectionPool
我们用的是BoneCP ,添加依赖
com.jolbox
bonecp
0.8.0.RELEASE
5.window
(1)这个spark官网看看
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!
您输入的评论内容中包含违禁敏感词
我知道了
请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号