博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
sparkStreaming+sql点击前十商品
阅读量:5909 次
发布时间:2019-06-19

本文共 3188 字,大约阅读时间需要 10 分钟。

hot3.png

1. 目的

    输出网站点击次数top10的商品信息,mysql存储商品信息表,socket模拟网站点击日志

2. 素材

    1)mysql建立product表

mysql> select * from product;+--------+----------+| itemid | itemname |+--------+----------+| 001    | phone    || 002    | book     || 003    | paper    |+--------+----------+

    2)socket输入模拟点击log

        启动linux上的netcat程序

            nc -lk 9999

        输入字符

            9342 001 20170909

            4532 002 20170909
            7159 001 20170909
            5834 003 20170909
            5521 003 20170909
            2633 001 20170909

3. 代码

/**  * Created by puwenchao on 2017-09-08.  * 处理10分钟内商品点击次数Top10,商品信息存放在MySQL数据库中,点击日志来源于socket  */package Streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SQLContextimport org.apache.spark.storage.StorageLevelimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}//商品点击日志格式case class AccessLog(userid: String, itemid: String, clicktime: String)object streaming_blacklist3 {  val WINDOW_LENGTH = new Duration(600 * 1000)  val SLIDE_INTERVAL = new Duration(10 * 1000)  def main(args: Array[String]) {    //设定日志等级    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)    Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)    //创建所需context    val sparkConf = new SparkConf().setAppName("streaming_blacklist3").setMaster("local[4]")    val sc = new SparkContext(sparkConf)    val sqlc = new SQLContext(sc)    val streamc = new StreamingContext(sc, Seconds(10))    import sqlc.implicits._    //从数据库中加载product表    val productDF = sqlc.read.format("jdbc").options(Map(      "url"-> "jdbc:mysql://192.168.252.141:3306/test",      "driver"->"com.mysql.jdbc.Driver",      "dbtable"->"product",      "user"->"root",      "password"-> "mysql"    )).load()    productDF.registerTempTable("product")    //点击日志格式化处理的逻辑    def parseLog(log: String): AccessLog = {      val logInfo = log.split(" ")      if (logInfo.length == 3) {        AccessLog(logInfo(0),logInfo(1), logInfo(2))      }      else {        AccessLog("0","0","0")      }    }    //处理socket输入的点击日志    //  val logLinesDStream = ssc.textFileStream("D:/logs_incoming")    val logLinesDStream = streamc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY)    val accessLogsDStream = logLinesDStream.map(parseLog).cache()    val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL)    //关联点击数据与产品数据,输出前十名产品信息和点击次数    windowDStream.foreachRDD(accessLogs => {      if (accessLogs.isEmpty()) {        println("No logs received in this time interval")      } else {        accessLogs.toDF().registerTempTable("accessLogs")        val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM product a JOIN " +         " (SELECT itemid,COUNT(*) cnt FROM accessLogs GROUP BY itemid) b " +          " ON (a.itemid=b.itemid) ORDER BY cnt DESC limit 10"        val topTenClick = sqlc.sql(sqlStr)        //可将前十名输出到HDFS、数据库等地方        topTenClick.show()      }    })    streamc.start()    streamc.awaitTermination()  }}

4. 输出

    No logs received in this time interval

    No logs received in this time interval
    +------+--------+----+
    |itemid|itemname|cnt|
    +------+--------+----+
    |   001| phone|   3|
    |   003|  paper|   2|
    |   002|   book|   1|
    +------+--------+----+

转载于:https://my.oschina.net/puwenchao/blog/1531825

你可能感兴趣的文章
android 开发环境安装和测试中常出现的问题
查看>>
转---9 个开始使用 C++11 的理由
查看>>
Android与服务“.NET研究”器端数据交互
查看>>
继续谈谈Twisted
查看>>
IPC 有命管道
查看>>
groovy string类型转换成int(来自csdn)不要问为什么系列6
查看>>
Objective-C equivalent of Java Vector/ ArrayList
查看>>
事件的好处~实现对修改的封闭,对扩展的开放!~续
查看>>
详解 ML2 Core Plugin(I) - 每天5分钟玩转 OpenStack(71)
查看>>
OC多态
查看>>
python爬虫中文网页cmd打印出错问题解决
查看>>
dos.orm
查看>>
Entityframework core 动态添加模型实体
查看>>
一周规划汇总_2016-09-18
查看>>
leetcode70
查看>>
前端组件化Polymer入门教程(3)——快速入门
查看>>
iOS移动开发周报-第22期
查看>>
libyuv 编译 for android
查看>>
Makefile-入门与进阶【转】
查看>>
V-rep学习笔记:机器人逆运动学数值解法(Cyclic Coordinate Descent Method)
查看>>