技术开发 频道

干货:Spark Streaming工业应用实例

  【IT168 案例】Spark Streaming 是Spark API进行数据流计算的核心,下面就为大家分享在HBase上如何使用Apache Spark Streaming。

  Spark Streaming是什么?

  什么是数据流呢?数据流是一个连续不断的数据序列。Streaming将连续输入的数据流分割成离散的数据单元。流处理具有低延迟性。Spark Streaming是Spark API的核心扩展应用,可扩展性好,吞吐量高,容错性也不错。Spark Streaming的用例需要使用大量的实时数据。Spark Streaming的应用实例如下:

  · 网站、网络监测(Website monitoring, network monitoring)

  · 欺诈检测(Fraud detection)

  · 网站点击(Web clicks)

  · 广告信息(Advertising)

  · 物联网传感器(Internet of Things sensors)

  Spark Streaming支持多种数据源,例如HDFS目录、TCP套接字、Kafka、Flume、Twitter等等。Data Streams的处理工具也有很多,例如Spark 核心API、DataFrames SQL以及一些机器学习的API。输出格式主要有HDFS、Databases以及其他一些Hadoop输出的数据源格式。

干货来袭:Spark Streaming应用实例

  Spark Streaming的工作方式

  Spark Streaming将连续的数据持久化、离散化,然后批量处理。这种方式被称为Dstreams,其内部是一个RDD的序列。Spark应用程序使用Spark API来处理这些RDD,然后批量返回结果。

干货来袭:Spark Streaming应用实例

  Streaming Application的架构

干货来袭:Spark Streaming应用实例

  Spark Streaming 的基本组成部分:

  · Reads streaming data.

  · Processes the streaming data.

  · Writes the processed data to an HBase Table.

  Spark 的组成部分

  · Reads HBase Table data written by the streaming code

  · Calculates daily summary statistics

  · Writes summary statistics to the HBase table Column Family stats

  数据集

  油泵传感器是工业中最常见的传感器之一,下面就以在油泵传感器中收集到的数据为例。数据在目录文件中以逗号为分隔值。Spark Streaming监视该目录并且可以在目录中创建文件。(如前所述,Spark Streaming支持不同的数据源,为简单起见,本例将使用文件。)下面是一些CSV文件的数据样本:

干货来袭:Spark Streaming应用实例

  我们使用Scala case类来定义传感器数据的CSV文件,parsesensor函数来解析case类中的CSV数据。

  HBase 表结构

  流数据的HBase表结构如下:

  · 表的行主要有泵的名称和时间戳

  · Column Family数据列对应数据的输入域,Column Family报警列对应报警值的过滤器。数据和警报列超过一定的时间会被设置成过期值。

  常用的Schema:

  · 泵的名称和日期

  · Column Family stats

  · 最大值、最小值和平均值

干货来袭:Spark Streaming应用实例

  接下来,将Sensor对象转换为HBase Put对象,在HBase中插入一行数据。

  写入HBase Table配置

  将一个Spark TableOutputFormat 类写入HBase表,它的做法类似于从MapReduce写入HBase表。下面我们设置将TableOutputFormat类写入到HBase配置中。

  Spark Streaming实例:

  Spark Streaming的基本步骤:

  1.初始化一个 Spark StreamingContext 对象。

  2.申请转换,输出为DStreams。

  3.使用streamingContext.start()接收数据并做相应的处理。

  4.使用streamingContext.awaitTermination()等待处理结束。

  初始化StreamingContext

  创建StreamingContext

干货来袭:Spark Streaming应用实例

  使用StreamingContext textfilestream方法来创建一个输入流,用于监视Hadoop-compatible 文件系统新文件的创建以及其它文件操作。

干货来袭:Spark Streaming应用实例

  使用linesDstream代表数据流,每条记录是文本的一行。内部Dstream是一个RDD序列,每批次间隔一个RDD。

干货来袭:Spark Streaming应用实例

  申请转换,输出为DStreams

  将数据解析到Sensor对象中,然后对linesDStream做map操作。

干货来袭:Spark Streaming应用实例

  使用Sensor.parseSensor函数将linesDStream的RDD转换为Sensor对象的RDD。

干货来袭:Spark Streaming应用实例

  使用DStream foreachRDD方法处理DStream中的每个RDD。筛选出合适的低磅压力值创建报警,然后转换为Put对象将传感器和报警数据写入到HBase中,利用PairRDDFunctions saveAsHadoopDataset 方法将RDD输出到Hadoop支持的存储系统中。

干货来袭:Spark Streaming应用实例

  sensorRDD对象转换为put对象并写入到HBase中。

干货来袭:Spark Streaming应用实例

  接收数据

  在StreamingContext中调用start()接收数据,调用awaitTermination()等待流计算的结束。

干货来袭:Spark Streaming应用实例

  Spark读写操作

  读取HBase传感器的表数据,统计每天的数据并写入到column family。

干货来袭:Spark Streaming应用实例

  读取HBase传感器表中的psi列的数据,使用statCount()统计数据并写入到column family。

干货来袭:Spark Streaming应用实例

  从newAPIHadoopRDD输出一行RDD。使用 PairRDDFunctions saveAsHadoopDataset 转换为Put对象并存入HBase。

干货来袭:Spark Streaming应用实例

  软件支持

  这个实例运行在MapR Sandbox。

  程序运行

  以上代码可以作为一个独立的程序运行,具体可以参照Getting Started with Spark on MapR Sandbox。

  步骤总结:

  1. 登陆MapR Sandbox,具体方法参照Getting Started with Spark on MapR Sandbox,用户ID为user01,密码为mapr。使用maven新建程序。

  2. 使用scp复制jar文件和数据文件,并放置在 /user/user01 的目录下。

  3. 运行Streaming 。

  4. 复制流数据文件到下面的目录:cp sensordata.csv /user/user01/stream/。

干货来袭:Spark Streaming应用实例

  5. 读取并计算一列数据。

干货来袭:Spark Streaming应用实例

  6. 计算所有行。

干货来袭:Spark Streaming应用实例

  总结

  这只是Spark Streaming在HBase上的一个小实例,希望大家可以从这个实例中学到一些知识。

  原文出处:https://dzone.com/articles/spark-streaming-1

0
相关文章