1. 目的
输出网站点击次数top10的商品信息,mysql存储商品信息表,socket模拟网站点击日志
2. 素材
1)mysql建立product表
mysql> select * from product;+--------+----------+| itemid | itemname |+--------+----------+| 001 | phone || 002 | book || 003 | paper |+--------+----------+
2)socket输入模拟点击log
启动linux上的netcat程序
nc -lk 9999
输入字符
9342 001 20170909
4532 002 20170909 7159 001 20170909 5834 003 20170909 5521 003 20170909 2633 001 201709093. 代码
/** * Created by puwenchao on 2017-09-08. * 处理10分钟内商品点击次数Top10,商品信息存放在MySQL数据库中,点击日志来源于socket */package Streamingimport org.apache.log4j.{Level, Logger}import org.apache.spark.sql.SQLContextimport org.apache.spark.storage.StorageLevelimport org.apache.spark.{SparkConf, SparkContext}import org.apache.spark.streaming.{Duration, Seconds, StreamingContext}//商品点击日志格式case class AccessLog(userid: String, itemid: String, clicktime: String)object streaming_blacklist3 { val WINDOW_LENGTH = new Duration(600 * 1000) val SLIDE_INTERVAL = new Duration(10 * 1000) def main(args: Array[String]) { //设定日志等级 Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF) //创建所需context val sparkConf = new SparkConf().setAppName("streaming_blacklist3").setMaster("local[4]") val sc = new SparkContext(sparkConf) val sqlc = new SQLContext(sc) val streamc = new StreamingContext(sc, Seconds(10)) import sqlc.implicits._ //从数据库中加载product表 val productDF = sqlc.read.format("jdbc").options(Map( "url"-> "jdbc:mysql://192.168.252.141:3306/test", "driver"->"com.mysql.jdbc.Driver", "dbtable"->"product", "user"->"root", "password"-> "mysql" )).load() productDF.registerTempTable("product") //点击日志格式化处理的逻辑 def parseLog(log: String): AccessLog = { val logInfo = log.split(" ") if (logInfo.length == 3) { AccessLog(logInfo(0),logInfo(1), logInfo(2)) } else { AccessLog("0","0","0") } } //处理socket输入的点击日志 // val logLinesDStream = ssc.textFileStream("D:/logs_incoming") val logLinesDStream = streamc.socketTextStream("192.168.252.141", 9999, StorageLevel.MEMORY_ONLY) val accessLogsDStream = logLinesDStream.map(parseLog).cache() val windowDStream = accessLogsDStream.window(WINDOW_LENGTH, SLIDE_INTERVAL) //关联点击数据与产品数据,输出前十名产品信息和点击次数 windowDStream.foreachRDD(accessLogs => { if (accessLogs.isEmpty()) { println("No logs received in this time interval") } else { accessLogs.toDF().registerTempTable("accessLogs") val sqlStr = "SELECT a.itemid,a.itemname,b.cnt FROM product a JOIN " + " (SELECT itemid,COUNT(*) cnt FROM accessLogs GROUP BY itemid) b " + " ON (a.itemid=b.itemid) ORDER BY cnt DESC limit 10" val topTenClick = sqlc.sql(sqlStr) //可将前十名输出到HDFS、数据库等地方 topTenClick.show() } }) streamc.start() streamc.awaitTermination() }}
4. 输出
No logs received in this time interval
No logs received in this time interval +------+--------+----+ |itemid|itemname|cnt| +------+--------+----+ | 001| phone| 3| | 003| paper| 2| | 002| book| 1| +------+--------+----+