【云计算】ScalaActor简介
小标 2019-01-14 来源 : 阅读 784 评论 0

摘要:本文主要向大家介绍了【云计算】ScalaActor简介,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】ScalaActor简介,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


Scala Actor


课程目标 目标一:熟悉Scala Actor并发编程 目标二:为学习Akka做准备

注:我们现在学的Scala Actor是scala 2.10.x版本及以前版本的Actor。


Scala在2.11.x版本中将Akka加入其中,作为其默认的Actor,老版本的Actor已经废弃


什么是Scala Actor 概念

Scala中的Actor能够实现并行编程的强大功能,它是基于事件模型的并发机制,Scala是运用消息(message)的发送、接收来实现多线程的。使用Scala能够更容易地实现多线程应用的开发。


传统java并发编程与Scala Actor编程的区别

对于Java,我们都知道它的多线程实现需要对共享资源(变量、对象等)使用synchronized 关键字进行代码块同步、对象锁互斥等等。而且,常常一大块的try…catch语句块中加上wait方法、notify方法、notifyAll方法是让人很头疼的。原因就在于Java中多数使用的是可变状态的对象资源,对这些资源进行共享来实现多线程编程的话,控制好资源竞争与防止对象状态被意外修改是非常重要的,而对象状态的不变性也是较难以保证的。 而在Scala中,我们可以通过复制不可变状态的资源(即对象,Scala中一切都是对象,连函数、方法也是)的一个副本,再基于Actor的消息发送、接收机制进行并行编程。


Actor方法执行顺序 首先调用start()方法启动Actor 调用start()方法后其act()方法会被执行 向Actor发送消息 发送消息的方式
 


 

!


 

 

发送异步消息,没有返回值。


 

 

!


 

 

发送同步消息,等待返回值。


 

 

!!


 

 

发送异步消息,返回值是 Future[Any]。


 


Actor实战 第一个例子
 


 package com.qf.actor

//注意导包是scala.actors.Actor
 import scala.actors.Actor

 

 object MyActor1 extends Actor{

 //重新act方法

 def act(){

 for(i <- 1 to 10){

 println("actor-1 " + i)

 Thread.sleep(2000)

 }

 }

}

  

  object MyActor2 extends Actor{

 //重新act方法

 def act(){

 for(i <- 1 to 10){

 println("actor-2 " + i)

 Thread.sleep(2000)

 }

 }

}

  

  object ActorTest extends App{

 //启动Actor

 MyActor1.start()

 MyActor2.start()

}
 

     


说明:上面分别调用了两个单例对象的start()方法,他们的act()方法会被执行,相当于在java中开启了两个线程,线程的run()方法会被执行


注意:这两个Actor是并行执行的,act()方法中的for循环执行完成后actor程序就退出了


2.第二个例子(可以不断地接收消息)



 
 
 


 package com.qf.actor

 
  import scala.actors.Actor
 

 class MyActor extends Actor {

  

 override def act(): Unit = {

 while (true) {

 receive {

 case "start" => {

 println("starting ...")

 Thread.sleep(5000)

 println("started")

 }

 case "stop" => {

 println("stopping ...")

 Thread.sleep(5000)

 println("stopped ...")

 }

 }

 }

 }

}
 
  object MyActor {

 def main(args: Array[String]) {

 val actor = new MyActor

 actor.start()

 actor ! "start"

 actor ! "stop"

 println("消息发送完成!")
  }

}
 

     


说明:在act()方法中加入了while (true) 循环,就可以不停的接收消息


注意:发送start消息和stop的消息是异步的,但是Actor接收到消息执行的过程是同步的按顺序执行


3.第三个例子(react方式会复用线程,比receive更高效)



 
 
 


 package com.qf.actor

 
  import scala.actors.Actor
 
  class YourActor extends Actor {
 
 override def act(): Unit = {

 loop {

 react {

 case "start" => {

 println("starting ...")

 Thread.sleep(5000)

 println("started")

 }

 case "stop" => {

 println("stopping ...")

 Thread.sleep(8000)

 println("stopped ...")

 }

 }

 }

 }

}
 

 
  object YourActor {

 def main(args: Array[String]) {

 val actor = new YourActor

 actor.start()

 actor ! "start"

 actor ! "stop"

 println("消息发送完成!")
  }

}
 

     


说明:react 如果要反复执行消息处理,react外层要用loop,不能用while


4.第四个例子(结合case class发送消息)



 
 
 


 package com.qf.actor

  import scala.actors.Actor

  

  class AppleActor extends Actor {

  

 def act(): Unit = {

 while (true) {

 receive {

 case "start" => println("starting ...")

 case SyncMsg(id, msg) => {

 println(id + ",sync " + msg)

 Thread.sleep(5000)

 sender ! ReplyMsg(3,"finished")

 }

 case AsyncMsg(id, msg) => {

 println(id + ",async " + msg)

 Thread.sleep(5000)

 }

 }

 }

 }

}

  

  object AppleActor {

 def main(args: Array[String]) {

 val a = new AppleActor

 a.start()

 //异步消息

 a ! AsyncMsg(1, "hello actor")

 println("异步消息发送完成")

 //同步消息

 //val content = a.!(1000, SyncMsg(2, "hello actor"))

 //println(content)

 val reply = a !! SyncMsg(2, "hello actor")

 println(reply.isSet)

 //println("123")

 val c = reply.apply()

 println(reply.isSet)

 println(c)

 }

}

  case class SyncMsg(id : Int, msg: String)

  case class AsyncMsg(id : Int, msg: String)

  case class ReplyMsg(id : Int, msg: String)
 

     


练习

用actor并发编程写一个单机版的WorldCount,将多个文件作为输入,计算完成后将多个任务汇总,得到最终的结果



 
 
 


 package com.qf.actor

  

  import java.io.File

  

  import scala.actors.{Actor, Future}

  import scala.collection.mutable

  import scala.io.Source

  

  class Task extends Actor {

  

 override def act(): Unit = {

 loop {

 react {

 case SubmitTask(fileName) => {

 val contents = Source.fromFile(new File(fileName)).mkString

 val arr = contents.split("\r\n")

 val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.length)

 //val result = arr.flatMap(_.split(" ")).map((_, 1)).groupBy(_._1).mapValues(_.foldLeft(0)(_ + _._2))

 sender ! ResultTask(result)

 }

 case StopTask => {

 exit()

 }

 }

 }

 }

}

  

  object WorkCount {

 def main(args: Array[String]) {

 val files = Array("c://words.txt", "c://words.log")

  

 val replaySet = new mutable.HashSet[Future[Any]]

 val resultList = new mutable.ListBuffer[ResultTask]

  

 for(f <- files) {

 val t = new Task

 val replay = t.start() !! SubmitTask(f)

 replaySet += replay

 }

  

 while(replaySet.size > 0){

 val toCumpute = replaySet.filter(_.isSet)

 for(r <- toCumpute){

 val result = r.apply()

 resultList += result.asInstanceOf[ResultTask]

 replaySet.remove(r)

 }

 Thread.sleep(100)

 }

 val finalResult = resultList.map(_.result).flatten.groupBy(_._1).mapValues(x => x.foldLeft(0)(_ + _._2))

 println(finalResult)

 }

}

  

  case class SubmitTask(fileName: String)

  case object StopTask

  case class ResultTask(result: Map[String, Int])
          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程