【云计算】SparkStreaming解析
小标 2018-12-13 来源 : 阅读 801 评论 0

摘要:本文主要向大家介绍了【云计算】SparkStreaming解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】SparkStreaming解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


1.UpdateBykey和mapWithState


俩个实现的功能都是累加,但是updateBykey是1.6版本之前的,mapWithState是之后的并且更加实用!!!源码记得去看


https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala


2.transform


(1)作用:DStream整合RDD


package Streaming02


import org.apache.spark.streaming.{Seconds, StreamingContext}


import org.apache.spark.{SparkConf, SparkContext}


import scala.collection.mutable.ListBuffer


object LeftJoinStramingApp {


def main(args: Array[String]): Unit = {


val sparkconf=new SparkConf().setMaster("local[2]").setAppName("LeftJoinStramingApp")


//这三个真是一个都不能少


val ssc=new StreamingContext(sparkconf,Seconds(10))


//数据二:rdd


val input2=new ListBuffer[(String,Boolean)]


input2.append(("www.baidu.com",true))


val data2=ssc.sparkContext.parallelize(input2) //构建rdd是sparkContext的,但是我们没有,所以调用一个再parallelize


//因为下面的调用是先创建在调用,所以得移到上面来


//数据一:nc -lk 9999 过来


val lines=ssc.socketTextStream("192.168.137.251",9999)//创建输入流


lines.map(x=>(x.split(",")(0),x)) .transform(rdd=>{


rdd.leftOuterJoin(data2).filter(x=>{ //这一步就是DS结合RDD的操作!!!!这是一个实时过滤,


会有个test给你处理,你需要将他过滤,然而这不是最好的,最好的直接将test广播变量出去,然后直接filter就好


x._2._2.getOrElse(false)!=true


}).map(_._2._1)


}).print()


//这是一个DStream,DStram是由一系列的RDD构成的不加x的时候表示分割去除第一个值,加了x之后表示成为key,value的类型


ssc.start()


ssc.awaitTermination()


}


}


3.做leftjoin取出需要的数据!这是RddJoin


package Streaming02


import org.apache.spark.{SparkConf, SparkContext}


import scala.collection.mutable.ListBuffer


object LeftJoinApp {


def main(args: Array[String]): Unit = {


val sparkconf=new SparkConf().setAppName("LeftJoinApp").setMaster("local[2]")


//setMaster不能乱写!要记得为什么不能这样写?


val sc=new SparkContext(sparkconf)


//数据一。域名,和流量


val input1=new ListBuffer[(String,Long)] //本地创建数据


input1.append(("www.ruozedata.com",8888))


input1.append(("www.ruozedata.com",9999))


input1.append(("www.baidu.com",7777))


val data1=sc.parallelize(input1) //装成RDD


//数据二


val inpu2=new ListBuffer[(String,Boolean)]


inpu2.append(("www.baidu.com",true))


val data2=sc.parallelize(inpu2)


data1.leftOuterJoin(data2) //这个得到是左边表的所有数据但这不是我们想要的


.filter(x=>{


x._2._2.getOrElse(false)!=true //过滤取出第二个不是true的值,但还有none的值


})


.map(x=>(x._1,x._2._1)) //过滤之后直接根据数据模型来取出值


.collect().foreach(println)


sc.stop()


}


}


4.foreachRDD


作用:一般在输出的时候用,例如写到MySQL中去写数据库都是这样的


package Streaming02


import java.sql.DriverManager


import org.apache.spark.SparkConf


import org.apache.spark.streaming.{Seconds, StreamingContext}


object foreachRDDApp {


def main(args: Array[String]): Unit = {


val sparkconf=new SparkConf().setAppName("foreacRDDApp").setMaster("local[2]")


val ssc=new StreamingContext(sparkconf,Seconds(10))


//WC写到MySQL里面去


val lines=ssc.socketTextStream("192.168.137.251",9998)


val re=lines.flatMap(_.split(",")).map((_,1))reduceByKey(_+_)


//re结果写入MySQL


/*re.foreachRDD{rdd=>


//val con=createConnection() //将connection放在driver端,这是有问题的因为,connection应该是在work上的


rdd.foreach{ rec=>


val con=createConnection() //将connection放在excutor端来,这也有问题,如果是一百万的数据,那需要多少个connection


val word=rec._1


val count=rec._2


val sql=s"insert into wc(word,c) values('$word',$count)"


con.createStatement().execute(sql)


con.close()


}


}*/


re.foreachRDD{ rdd=>


rdd.foreachPartition{ //这是工作中最常用的方式


par=> val con=createConnection()


par.foreach(rec=>{


val word=rec._1


val count=rec._2


val sql=s"insert into wc(word,c) values('$word',$count)"


con.createStatement().execute(sql)


})


con.close()


}


}


ssc.start()


ssc.awaitTermination()


}


def createConnection()={


Class.forName("com.mysql.jdbc.Driver")


DriverManager.getConnection("jdbc:mysql://192.168.137.251:3306/g3","root","123456")


//注意这个url的格式!!!!!


}


}


(2)最好的方法是ConnectionPool


我们用的是BoneCP ,添加依赖


com.jolbox


bonecp


0.8.0.RELEASE


5.window


(1)这个spark官网看看


本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程