【云计算】Hive_SQL知识点代码解析
小标 2019-01-14 来源 : 阅读 1023 评论 0

摘要:本文主要向大家介绍了【云计算】Hive_SQL知识点代码解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。

本文主要向大家介绍了【云计算】Hive_SQL知识点代码解析,通过具体的内容向大家展现,希望对大家学习云计算有所帮助。


Last login: Tue Aug 29 09:11:12 2017 from 10.72.37.75
[root@bdddev-agent-205 ~]# su hive
[hive@bdddev-agent-205 root]$ cd
[hive@bdddev-agent-205 ~]$ hive
2017-08-30 08:56:44,327 WARN  [main] conf.HiveConf: HiveConf of name hive.server2.enable.impersonation does not exist

Logging initialized using configuration in file:/etc/hive/2.5.0.0-1245/0/hive-log4j.properties
hive> use yxpt;
OK
Time taken: 0.916 seconds
hive> describe pi_cust_item_recommend;
OK
cust_id                 string
item_id                 bigint
advise_level            decimal(10,0)
date                    int
cust_code               varchar(30)
pack_bar                varchar(30)
ymday                   string

# Partition Information
# col_name              data_type               comment

ymday                   string
Time taken: 0.181 seconds, Fetched: 12 row(s)
hive> select * from pi_cust_item_recommend limit 3;
OK
Time taken: 0.082 seconds


[hdfs@bdddev-agent-205 bin]$ ./pyspark
Python 2.7.5 (default, Nov  6 2016, 00:28:07)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-11)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Python version 2.7.5 (default, Nov  6 2016 00:28:07)
SparkSession available as 'spark'.
>>> from __future__ import division
>>> from pyspark.mllib.recommendation import ALS
>>> from pyspark.sql import HiveContext
>>> from pyspark.sql import SparkSession
>>> from collections import namedtuple
>>> import math
>>> import datetime
>>> spark = SparkSession.builder.appName("bjrecommender").enableHiveSupport().getOrCreate()
>>> sc = spark.sparkContext
>>> hiveCtx = HiveContext(sc)
>>> Rating = namedtuple("Rating", ["user", "product", "rating"])
>>> tid = namedtuple('tid',['id','cust_id'])
>>> now = datetime.datetime.now()
>>> begin_date = (now-datetime.timedelta(days=150)).strftime('%Y%m%d')
>>> begin_date = (now-datetime.timedelta(days=200)).strftime('%Y%m%d')
>>> end_date = now.strftime('%Y%m%d')
>>> sql="select dense_rank() over(order by cust_id) id,cust_id,item_id,need_score+sold_score score from (select cust_id,item_id,qty_need,qty_sold,ntile(5) over(partition by cust_id order by qty_need) need_score,ntile(5) over(partition by cust_id order by qty_sold) sold_score from (select cust_id,item_id,sum(qty_need) qty_need,sum(qty_sold) qty_sold from yxpt.pi_cust_item_day where date1>=" +begin_date + " and date1<="+end_date+ " group by cust_id,item_id) a1) b1"
>>> total = hiveCtx.sql(sql)
>>> id_custid=total.rdd.map(lambda x : tid(str(x[0]),x[1])).distinct()
17/08/30 09:18:03 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
>>> id_custid.toDF().registerTempTable("id_cid")
>>> ratings=total.rdd.map(lambda x : Rating(str(x[0]),int(x[2]),float(x[3])))
>>> ratings.toDF().show(3)
17/08/30 09:29:31 ERROR Utils: Uncaught exception in thread stdout writer for
+----+--------+------+
|user| product|rating|
+----+--------+------+
|   1|42010319|   2.0|
|   1|31010401|   2.0|
|   1|22240114|   2.0|
+----+--------+------+
only showing top 3 rows

>>>
>>> model = ALS.train(ratings, rank=15, iterations=10,seed=0,lambda_=0.001)
17/08/30 09:34:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeSystemBLAS
17/08/30 09:34:59 WARN BLAS: Failed to load implementation from: com.github.fommil.netlib.NativeRefBLAS
17/08/30 09:34:59 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeSystemLAPACK
17/08/30 09:34:59 WARN LAPACK: Failed to load implementation from: com.github.fommil.netlib.NativeRefLAPACK
17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4644:
[rdd_3272_0]
17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4645:
[rdd_3273_0]
17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4646:
[rdd_3272_0]
17/08/30 09:35:03 WARN Executor: 1 block locks were not released by TID = 4647:
[rdd_3273_0]
>>> all_rating=model.recommendProductsForUsers(30).map(lambda x:x[1]).collect()
>>> len(all_rating)
33695
>>> len(all_rating[0])
30
>>> userProducts = ratings.map(lambda rating:(rating.user,rating.product))
>>> predictions = model.predictAll(userProducts).map(lambda rating:((rating.user,rating.product),rating.rating))
>>> type(predictions)

>>> predictions.toDF().show(3)
[Stage 258:=====================>                                   (3 + 5) / 8]17/08/30 09:44:26 WARN Executor: Managed memory leak detected; size = 15977666 bytes, TID = 4787
17/08/30 09:44:27 WARN Executor: Managed memory leak detected; size = 15977666 bytes, TID = 4788
+---------------+-----------------+
|             _1|               _2|
+---------------+-----------------+
|[4904,37020312]|4.338272324285362|
|[4904,32010112]|2.763827789148973|
|[4904,12010504]|6.962521675730641|
+---------------+-----------------+
only showing top 3 rows

>>> ratingsAndPredictions = ratings.map(lambda rating:((int(rating.user),rating.product),rating.rating)).join(predictions)
>>> ratingsAndPredictions.toDF().show(3)
+----------------+--------------------+
|              _1|                  _2|
+----------------+--------------------+
| [4075,53100103]|[4.0,3.1492042815...|
|[20152,13070515]|[3.0,4.0453910858...|
| [1335,34030227]|[4.0,3.9336990908...|
+----------------+--------------------+
only showing top 3 rows

>>> MSE = ratingsAndPredictions.map(lambda ((x,y),(m,n)):math.pow(m-n,2)).reduce(lambda x,y:x+y)/ratingsAndPredictions.count()
>>> print "***************" +str(math.sqrt(MSE)) + "*****************"
***************1.39966771197*****************
>>> k=[]
>>> for row in all_rating:
...     k.extend(row)
...
>>> all_rating_rdd = sc.parallelize(k)
>>> all_rating_rdd.toDF().registerTempTable("all_score")
17/08/30 10:04:58 WARN TaskSetManager: Stage 415 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB.
>>> hiveCtx.sql("select * from all_score limit 5").show(3)
17/08/30 10:08:19 WARN TaskSetManager: Stage 416 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB.
+-----+--------+------------------+
| user| product|            rating|
+-----+--------+------------------+
|27456|51520615| 35.52432167919441|
|27456|90020219|29.566229211420946|
|27456|34030316| 28.08260143903327|
+-----+--------+------------------+
only showing top 3 rows

>>> hiveCtx.sql("select a2.cust_id,a1.product,rating," + end_date +" date  "+ " from all_score a1,id_cid a2 " + "where a1.user=a2.id").show(5)
17/08/30 10:18:22 WARN TaskSetManager: Stage 417 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB.
+------------+--------+------------------+--------+
|     cust_id| product|            rating|    date|
+------------+--------+------------------+--------+
|110101100985|90190202| 25.33671962331747|20170830|
|110101100985|34030326|23.906456902069216|20170830|
|110101100985|90020726|16.010945324507635|20170830|
|110101100985|90190101|15.628847628582498|20170830|
|110101100985|90020727|15.442605374580097|20170830|
+------------+--------+------------------+--------+
only showing top 5 rows

>>> hiveCtx.sql("SELECT NATION_CUST_CODE FROM yxpt.CO_CUST limit 3").show(3)
+----------------+
|NATION_CUST_CODE|
+----------------+
|    110114190495|
|    110115203224|
|    110108209145|
+----------------+

>>> hiveCtx.sql("SELECT * from id_cid limit 3").show(3)                                                                                  +-----+------------+
|   id|     cust_id|
+-----+------------+
| 4549|110105106838|
|12992|110108207746|
|30968|110228100250|
+-----+------------+

>>> hiveCtx.sql("SELECT * from id_cid limit 5").show(5)
+-----+------------+
|   id|     cust_id|
+-----+------------+
| 4549|110105106838|
|12992|110108207746|
|30968|110228100250|
|22213|110114100048|
|19728|110113101105|
+-----+------------+

>>> hiveCtx.sql("select B.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST B,id_cid A "+" where B.CUST_ID=A.CUST_ID limit 3").show(3)
+------------+
|   CUST_CODE|
+------------+
|110105106838|
|110108207746|
|110228100250|
+------------+

>>> hiveCtx.sql("select B.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST B,id_cid A "+" where B.CUST_ID=A.CUST_ID limit 3 ").show(3)   +------------+
|   CUST_CODE|
+------------+
|110105106838|
|110108207746|
|110228100250|
+------------+

>>> hiveCtx.sql("select C.PACK_BAR "+" from yxpt.PLM_ITEM C,all_score D "+" where C.ITEM_ID=D.product limit 3").show(3)
17/08/30 15:08:24 WARN TaskSetManager: Stage 448 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB.
+-------------+
|     PACK_BAR|
+-------------+
|6901028032957|
|4893225033276|
|6901028208550|
+-------------+

>>> hiveCtx.sql("select * from all_score limit 5").show(3)
17/08/30 16:09:04 WARN TaskSetManager: Stage 465 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB.
+-----+--------+------------------+
| user| product|            rating|
+-----+--------+------------------+
|27456|51520615| 35.52432167919441|
|27456|90020219|29.566229211420946|
|27456|34030316| 28.08260143903327|
+-----+--------+------------------+
only showing top 3 rows

>>>
>>> hiveCtx.sql("select CO_CUST_T.NATION_CUST_CODE CUST_CODE "+" from yxpt.CO_CUST CO_CUST_T,id_cid id_cid_t "+" where CO_CUST_T.CUST_ID=id_cid_t.CUST_ID limit 3").show(3)
+------------+
|   CUST_CODE|
+------------+
|110105106838|
|110108207746|
|110228100250|
+------------+

>>> hiveCtx.sql("select PLM_ITEM_T.PACK_BAR "+" from yxpt.PLM_ITEM PLM_ITEM_T,all_score all_score_t "+" where PLM_ITEM_T.ITEM_ID=all_score_t.product limit 3").show(3)
17/08/30 18:32:13 WARN TaskSetManager: Stage 481 contains a task of very large size (3118 KB). The maximum recommended task size is 100 KB.
+-------------+
|     PACK_BAR|
+-------------+
|6901028032957|
|4893225033276|
|6901028208550|
+-------------+

>>>


          

本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据安全频道!

本文由 @小标 发布于职坐标。未经许可,禁止转载。
喜欢 | 0 不喜欢 | 0
看完这篇文章有何感觉?已经有0人表态,0%的人喜欢 快给朋友分享吧~
评论(0)
后参与评论

您输入的评论内容中包含违禁敏感词

我知道了

助您圆梦职场 匹配合适岗位
验证码手机号,获得海同独家IT培训资料
选择就业方向:
人工智能物联网
大数据开发/分析
人工智能Python
Java全栈开发
WEB前端+H5

请输入正确的手机号码

请输入正确的验证码

获取验证码

您今天的短信下发次数太多了,明天再试试吧!

提交

我们会在第一时间安排职业规划师联系您!

您也可以联系我们的职业规划师咨询:

小职老师的微信号:z_zhizuobiao
小职老师的微信号:z_zhizuobiao

版权所有 职坐标-一站式IT培训就业服务领导者 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
 沪公网安备 31011502005948号    

©2015 www.zhizuobiao.com All Rights Reserved

208小时内训课程