携程实时大数据平台介绍技术创新,引领未来目录缘起小试牛刀 成熟和完善新方向和新尝试 不断演进中的平台我的介绍缘起携程数据业务的特点:业务部门多,形态差别大:酒店 / 机票两大BU,超过15个SBU和公共部门业务复杂,变化快之前,各个业务部门也有一些实时数据应用,但存在着诸多问题:技术上五花八门力量薄弱,应用的稳定性无法保证缺少周边的配套设施数据和信息共享不顺畅我的介绍平台需求稳定可靠的平台:业务只需要关心业务逻辑的实现,平台维护交给专业同学完整的配套设施:测试环境,上线,监控,告警信息共享:数据共享,应用场景共享,互相启发及时的服务:解决从开发,上线,维护整个过程中遇到的问题我的介绍目录缘起小试牛刀成熟和完善新方向和新尝试 不断演进中的平台我的介绍技术选型消息队列:实时处理平台:主要出于稳定性的考虑,我们最后选择Storm作为数据处理的平台我的介绍Kafka简介消息在一个Topic Partition中会按照 它发送的顺序每个partition分布在集群的每台服 务器上,可以为每个partition来设 置 多 个 Replication (Leader / Follower)1个topic的replication factor是N, 能容忍N-1台机器Failed而没有数据 损失我的介绍Storm简介Spout:“水龙头”数据接入单元Bolt:数据处理单元Storm的并发的三个层次:Nimbus:Master节点Supervisor:Worker节点,用 来管理workerWorkerExecutorTask两者之间通过ZK来做通讯我的介绍系统架构示意图DBUBTServerStormKafkaRedisESBUClientsHBASE这样远远不够!我的介绍平台治理 – 数据共享数据共享:数据共享的前提是用户能够清楚地知道可以使用的数据源的业务 的含义以及其中数据的Schema我们的解决方法是统一的Portal的站点和使用AVRO来定义数据的Schema;我们在Storm之上封装了自己的API,来自动完成数据的反序列化用户在 Portal 上传 Schema系统生成POJO并将JAR加入Maven的 仓库用户在项 目中直接 添加POJO 的依赖我的介绍平台治理 – 资源的控制Portal允许用户对于作业设置,对每个Spout和Bolt设置并发相关的参数,通过审核后才能生效Storm之上封装自己的API,屏蔽这些参数的设置我的介绍平台治理 – 统一的管理Portal用户对于作业的管理都能通过Portal上提供的功能完成我的介绍初期的业务接入在平台搭建的同时,我们积极推进数据源和相关业务应用的接入数据源:UBT – 携程所有用户的行为日志Pprobe- 应用的访问日志 相关应用:基于UBT日志分析的实时报表基于Pprobe日志的实时反爬虫分析程序我的介绍平台搭建初期的经验最初尽可能地做好平台治理的规划:重要的设计和规划都需要提前做好, 后续调整时间越晚,付出的成本越大系统只实现核心的功能:集中力量尽量早接入业务前提是核心功能基本稳定系统只有真正被用起来才会得到不断的进化低优先级接入业务需要有一定的量:能够帮助整个平台更快地稳定下来积累技术和运维上的经验我的介绍目录缘起小试牛刀成熟和完善新方向和新尝试 不断演进中的平台我的介绍日志相关系统的完善ES:Logstash - Kanban方便用户进行查询Storm UI:我的介绍Metrics基于Storm封装的API中增加通用的埋点:消息从到达Kafka到开始被消费所花费的时间Topic / Task Level的一些统计信息实现自定义的Metrics Consumer把信息输出到携程的Dashboard和Graphite(告警)我的介绍告警系统任何Storm内置的或是用户自定义的Metrics都能够配置 默认配置Topology的Fails数的告警我的介绍通用Spout和Bolt的开发开发了适配携程通用MQ的Spout,使接入的数据源得到了进一步的扩展,更 多的业务数据能够被Storm使用通用的Bolt,开发了3种针对于不同数据源的Bolt,方便用户把数据输出到外部存储:Redis Bolt:仿照原生的实现,集成携程封装的Redis的客户端HBASE Bolt:支持Kerberos的认证DB Bolt:集成携程的DAL框架我的介绍封装API的版本迭代我们自己在Storm-core和Storm-kafka的基础上封装了自己的API:muise- coremuise-core在不断地迭代和升级,添加各种各样的小功能,并且修复各种 各样的问题,随着接入作业的变多,要推动业务进行升级变成一个很沉重 的负担在muise-core 2。
0版本我们把API相关的接口都整理了一下,之后的版本最大程度地不修改,然后推动业务全线升级了一遍(当时接入的业务不多)然后我们把muise-core作为标准的Jar放到每台Supervisor Storm安装目录的lib文件夹下,每次有API升级的时候可以直接替换,然后重启supervisor进程非强制升级 – 等到用户重启topology生效强制升级 – 在和用户确定影响后,重启每个topology我的介绍大规模的业务接入业务方从原来的1个部门(框架)增加到酒店,机票,度假,团队游,攻 略等BU以及搜索,风控,信息安全等技术部门,基本上覆盖了携程所有的 大部门应用类型也比初期要丰富地多,主要应用的类型和领域包括:实时数据报表业务数据的监控基于用户实时行为的营销风控和信息安全的应用我的介绍应用实例01我的介绍应用实例02功能:实时查看AB Testing的分流效果,有配置问题能够及时发现每个分组的订单数据的监控,如果订单出现下降可以及时停止AB Testing我的介绍应用实例03历史偏好 + 实时偏好-推荐产品相似应用:攻略根据用户实时的行为推送用户感兴趣的攻略团队游根据用户实时的访问推送限时的优惠券酒店根据用户实时的行为和订单的情况给用户推送营销类的Push消息我的介绍我们遇到的坑我们使用的版本是0。
9。4,在这个版本上,我们遇到过两个偶发的问题:STORM-763:Nimbus已经将worker分配到其他的节点,但是其他worker的netty客户端不连接新的worker应急处理:Kill掉这个worker的进程或是重启相关的作业STORM-643:当failed list不为空时,并且一些offset已经超出了Range 范围,KafkaUtils会不断重复地去取相关的message另外我们的用户在使用Storm的过程中也遇到过一些问题,这边简单和大家 分享下:localOrShuffleGrouping的使用:大多数情况下推荐用户使用;前提上下 游的Bolt数要批配;否则会出现下游的大多数Bolt没有收到数据的情况Bolt中的成员变量都要是可以序列化的我的介绍经验总结大量接入前,监控和告警的相关设施需要完善清晰的说明文档 / Q A能够节约很多支持的时间把握接入的节奏全员客服控制同时接入的项目数授人以“渔”我的介绍目录缘起小试牛刀 成熟和完善新方向和新尝试不断演进中的平台我的介绍Stream CQLStream CQL (Stream Continuous Query Language)是华为开源的实时流处理的SQL引擎,它的做法是把StreamCQL - Storm TopologyStream CQL的语法和标准的SQL或是HQL很类似,它支持实时处理的窗口函数下面我们通过一个简单的例子来“感受”下Stream CQL:从kafka中读取数据,类型为ubt_action取出其中的page,type,action,category等字段然后每五秒钟按照 page, type字段做一次聚合最后把结果写到console中我的介绍Stream CQL例子Storm:SteamCQL:public class CtripKafkaSpout implements IRichSpout {……}class ExtractBolt extends AbstractMuiseHermesBoltAutoAckedUserAction { ……}class ConsoleBolt extends CtripBaseBoltAutoAcked {……}class AnalyseBolt extends CtripBaseBoltWithoutAutoAcked { …。
。}public static void main(String[] args) {……CtripStormSubmitter。submitToCluster(conf, builder);……}create input stream kafka_avro (contextpageString, contexttype String, actioncategoryString , actiontype String) serde HermesSerDe source HermesSourceOpproperties(avroclass=hermes。ubt。action。UserAction ,topic=ubt。action,groupid=json_hermes);create output stream console_field(page String, type String, target String , actionType String, count Int) sink consoleoutput;insert into stream console_field select *,count(1) from kafka_avro [range 5 seconds batch] group by contextpage, contexttype;submit application ubt_cql_demo;我的介绍Stream CQL的尝试和工作增加Redis,HBASE,HIVE,DB(小表,加载内存)作为Data Source增加HBASE,MySQL / SQL Server,Redis作为数据输出的Sink修正MultiInsert语句解析错误,并反馈到社区为where语句增加了In的功能支持从携程的消息队列Hermes中读取数据我的介绍StreamingCQL的应用Streaming CQL作为Storm的补充目前的使用场景:能让BI的同学自主地开发逻辑相对简单的实时数据报表 和数据分析的应用实例:度假BU需要实时地统计每个用户访问“自由行”,“跟团游”,“半自助 游”产品的占比,进一步丰富用户画像的数据数据流:UBT的数据Data Source:使用Hive中的product的维度表输出:Hbase70左右的代码就能完成整个功能,缩短了开发时间我的介绍JStormJStorm是阿里开源的实时计算引擎,API上兼容Storm,内核使用Java编写去年它被Storm项目正式接纳,之后会逐步融合到Storm之中去目前与Storm比较,JStorm在计算性能上,资源的 隔离上有一定优势;他也支持与Twitter Heron类 似的Back pressure的机制,能更好地应对消息拥 塞的情况阿里的JStorm的团队非常Open,也非常 Professional,帮我们解决了不少问题,互相之间 的合作也非常愉快!我的介绍Jstorm的尝试和经验我们的目标:把携程现有的实时应用从Storm上迁到JStorm上去目前使用的版本:2。
1。1经验分享:1。 与Kafka集成:在Jstorm中,Spout的实现有两种不同的方式:Multi Thread(nextTuple,ack fail方法在不同的进程中调用)和Single Thread, 原生的Storm的Kafka Spout需要使用Single Thread的方式运行修复了Single Thread模式的1个问题(新版本已经修复)2。 Metrics:Jstorm不支持Storm的Metrics Consumer的机制,Jstorm有一套新的 Metrics的API,感兴趣的同学可以参看AsmMetricsT类,以及子类适配了Kafka Spout和我们Storm的API中的Metrics使用MetricsUploader的功能实现了数据写入Dashboard和Graphite的功能我的介绍目录缘起小试牛刀成熟和完善新方向和新尝试不断演进中的平台我的介绍技术架构的总结Muise PortalToolsCamus CanalAPIServiceHermes Producer APIHermes Consumer APIHermes Meta ServiceMuise Storm APIMuise Storm ServiceHermes KafkaServiceInfrastructureGraphiteESCQLStorm ClusterKafka ClusterDashboardJStorm我的介绍未来的方向平台整体向Jstorm迁移,贡献社区作业按照优先级逐步向Jstorm迁移熟悉Jstorm的架构和实现参与Jstorm的开发,贡献Jstorm的社区关注Spark 2。0在实时处理上的进展Structured Streaming,支持在流上的SQL的查询,查询可以在运行时改 变寻找能够落地的业务场景调研Flink进行基础的调研,对比Spark Streaming( 2。0中的Structured Streaming)寻找能够落地的业务场景