7x24快讯 ·

干货:Spark在360商业数据部的应用实践

Spark的应用现状

Spark需求背景

随着数据规模的持续增长,数据需求越来越多,原有的以MapReduce为代表的Hadoop平台越来越显示出其局限性。主要体现在以下两点:

? ?任务执行时间比较长。特别是某些复杂的SQL任务,或者一些复杂的机器学习迭代。

? ?不能很好的支持像机器学习、实时处理这种新的大数据处理需求。

Spark作为新一代大数据处理的计算平台,使得我们可以用Spark这一种平台统一处理数据处理的各种复杂需求,非常好的支持了我们目前现有的业务。与原有MapReduce模型相比,其具有下面3个特点:

? ?充分使用内存作为框架计算过程存储的介质,与磁盘相比大大提高了数据读取速度。利用内存缓存,显着降低算法迭代时频繁读取数据的开销。

? ?更好的DAG框架。原有在MapReduce M-R-M-R的模型,在Spark框架下,更类似与M-R-R,优化掉无用流程节点。

? ?丰富的组件支持。如支持对结构化数据执行SQL操作的组件Spark-SQL,支持实时处理的组件Spark-Streaming,支持机器学习的组件Mllib,支持图形学习的Graphx。

以Spark为核心的数据平台结构

?

?

商业数据部的数据平台架构如上图所示,Spark在其中起到一个非常核心作用。目前每天提交的Spark作业有1200多个,使用的资源数Max Resources: ,每日处理的数据量约有100TB。

Spark的几种典型应用

基于SparkStreaming的实时处理需求

商业数据部内部有大量的实时数据处理需求,如实时广告收入计算,实时线上ctr预估,实时广告重定向等,目前主要通过SparkStreaming完成。

实时数据处理的第一步,需要有实时的数据。360的用户产品,几乎全国各地都部署有机房,主要有4大主力机房。实时数据的收集过程如下:

?

?

使用Apache flume实时将服务器的日志上传至本地机房的Kafka,数据延迟在100ms以内。

使用Kafka MirorMaker将各大主力机房的数据汇总至中心机房洛阳,数据延迟在200ms以内。由于公司的网络环境不是很好,为了保证低延迟,在MirorMaker机房的机器上,申请了带宽的QOS保 证,以降低延迟。

数据处理的实时链路如下所示:

? 1种方式是通过Apache Flume实时写入Hdfs,用于第二天全量数据的离线计算

? 1种方式是通过SparkSteaming实时处理,处理后数据会回流至Kafka或者Redis,便于后续流程使用。

?

?

基于SparkSQL和DataFrame的qq不断发红包的群号码需求

SparkSQL是Spark的核心组件,作为新一代的SQL on Hadoop的解决方案,完美的支持了对现有Hive数据的存取。在与Hive进行集成的同时,Spark SQL也提供了JDBC/ODBC接口,便于第三方工具如Tableau、Qlik等通过该接口接入Spark SQL。

由于之前大部分qq不断发红包的群号码工作都是通过使用hive命令行完成的,为了将迁移至SparkSQL的代价最小,360系统部的同事开发了SparkSQL的命令行版本spark-hive。原有的以hive 命令运行的脚本,简单的改成spark-hive便可以运行。360系统部的同事也做了大量兼容性的工作。spark-hive目前已经比较稳定,成为qq不断发红包的群号码的首选。

DataFrma是Spark 1.3引入的新API,与RDD类似,DataFrame也是一个分布式数据容器。

但与RDD不同的是,DataFrame除了数据以外,还掌握更多数据的结构信息,即schema。同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上 看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

大数据开发过程中,可能会遇到各种类型的数据源,而DataFrame与生俱来就支持各种数据类型,如下图,包括JSON文件、Parquet文件、Hive表格、本地文件系统、分布式文件系统(HDFS)以及云存储(S3)。同时,配合JDBC,它还可以读取外部关系型数据库系统如Mysql,Oracle中的数据。对于自带Schema的数据类型,如Parquet,DataFrame还能够自动解析列类型。

?

参与评论