个性化离线实时分析系统pora -- 系统架构 -- IT技术博客大学习 -- 共学习 共进步!
个性化离线实时分析系统pora
欢迎您,来自新浪微博的朋友!如果喜欢,您也可以把这篇文章分享到新浪微博:,这样除了作为收藏还分享给了您的朋友!您也可以通过RSS订阅来获取我们的文章:Goolge源(推荐), FeedSky源。
祝您好运!
1.业务场景
伴随着市场和技术的发展,个性化已经成为淘宝搜索的一个重要目标。简单来说,个性化就是让每个用户在使用淘宝搜索时都能够获取自己最想要的结果,而不再是千篇一律的展示。实现个性化最直接的手段就是通过分析用户的历史行为日志,为用户打上不同的标签,在搜索中根据这些标签来展示最贴近的结果。
在淘宝,用户属性分析是通过每天在云梯上定时运行的map reduce job来完成的,产出结果导入我们的在线kv存储ups中,搜索引擎通过查询ups获取用户属性来为用户返回个性化的结果。在云梯上执行的全量计算能够进行复杂的模型计算,并且由于利用了云梯强大的计算能力,计算全部用户几十天的日志也只需花费几个小时。
全量计算的不足之处在每次计算的输入数据都是前一天到前N天的日志,无法将用户当天的行为考虑进去,因此得到的用户属性永远是滞后一天的,无法将某些用户当前的属性很好地反映出来。实时增量弥补了这一空缺,通过实时分析用户的行为日志,将最新的用户属性反馈给搜索引擎,能够为用户展现最贴近其当前需求的结果。
2.系统需求
结合我们的业务场景和现状,对实时分析系统大致有以下几点需求。
(1)不影响在线查询的效率。这是一个最基本的需求,也决定了我们系统的定位:离线分析,将相对较重的分析过程放在离线阶段完成,在线过程只需要查询离线计算产出的结果即可。
(2)实时。既然称为实时系统,这也是一个起码的要求。至于要多实时,初步的目标是从用户一次行为发生到最后的属性更新在几秒内完成。
(3)可水平扩展。个性化是一个需要长期打磨的系统工程,在不同的阶段对系统的容量自然有不同的需求,这就需要我们的系统能够具备良好的水平扩展能力。
(4)能应对复杂多变的业务。算法同学会在个性化方面做各种尝试,我们系统需要提供便利的方式来支持这些尝试,最好是能够将相对公用的东西与具体的业务逻辑剥离开,简单来说,就是算法逻辑插件化。
(5)高效。实时分析每天需要处理的日志量是巨大的,但是在其业务价值没有得到足够证明之前,是不可能占用太多的机器资源的,因此高效也成为了我们的一个基本需求。
3.系统架构
说到实时分析,前提是实时日志收集,这方面淘宝已经有了一套的强大的日志收集和分发系统-TimeTunnel,俗称TT,TT的延迟在几百毫秒以内,并且提供根据游标来取消息的功能,基本满足了我们消息对消息实时性和完整性的需求。全量计算的输出是实时分析系统的另一个重要的数据源,因为我们写入到ups提供给搜索引擎的是用户属性的最终结果,合并全量和增量的过程需要在实时分析系统中完成。全量计算是在云梯上完成的,结果存放在hdfs中,hdfs不能够提供记录级别的操作,考虑到我们的系统需求,必须要有另外一个提供高效的记录级操作的存储系统来保存这些数据。此外,由于算法逻辑通常会将用户近两天的行为都考虑进去,我们还需要保存用户近期的行为记录。我们选择hbase作为全量结果和近期行为数据的存储介质,一是由于hbase具有良好的水平扩展性,二是由于我们对hbase的使用比较熟悉。在计算系统的选型上,我们选择了人见人爱的开源系统storm.各个组件的选型确定,整个系统的架构也就出来了。
系统架构
(1)全量数据的导入。首先通过distcp方式将云梯上的数据拷贝到我们的hadoop集群中,然后使用bulk-load方式将数据导入到hbase表中。bulk-load是hbase提供的一种高效的数据批量导入工具,具体使用方法可以参考 http://hbase.apache.org/book/arch.bulk.load.html。 全量导入过程每天运行一次,我们会根据日期新建对应的表。
(2)全量数据的切换和删除。为了让运行在storm中的实时分析拓扑检测并使用到新全量表,我们另外创建了一张全量数据索引表,每次导入到新的全量数据表时更新对应的索引,实时分析拓扑定期扫描索引,在检测到索引更新时自动切换到使用新表。
(3)消息完整性的保证。实时分析拓扑中会保存消息处理的游标,并定期刷入到hbase中,这样即使在节点失败或者拓扑重启的情况下也能够恢复游标,处理堆积的消息。
4.实时分析拓扑
当一条日志进入pora系统后,首先通过解析器解析出若干字段,然后通过过滤逻辑来判断该条日志是否需要进行分析,如果需要,则会根据这些字段执行需要的join操作,例如将用户、宝贝的信息补全,然后将join好的日志以及用户的近期行为和全量属性传递给系统中的算法插件,依次进行分析,最后将最新的用户属性更新到ups中,提供给外部使用。分析流程对应于storm的拓扑结构大致如下:
(1)parser. 负责解析日志,根据配置文件取出需要的字段来。
(2)filter. 过滤逻辑,根据某些规则过滤掉一些不感兴趣的用户日志。
(3)joiner. 日志中的字段往往不能够提供完整的信息,需要一个join过程来补全字段。在当前的实现中,我们会根据日志中的”行为”字段来使用不同的join方式。
(4)analyzer. 主体分析逻辑。我们将这部分做成了一个 framework + plugins 的结构,其中framework负责取全量属性、取近期行为、取当前行为,合并计算结果。每个plugin只需要实现analyze(全量属性 + 近期行为 + 当前行为)的方法。framework对用户属性进行了字段切分,每个plugin只需要关心自己处理的那个字段即可。
在joiner和analyzer阶段,我们做了一个很小的批量处理,不一定每条日志都会触发计算,只有当累积够一定条数后,才做一次集中处理,这样在latency方面会有一些损失,但是能够将对hbase的访问打包,提高hbase的读写性能,从而大大提高系统的qps.这个批量的大小是可配的,用户可以根据场景选择配置,在qps和latency之间做trade-off,当配置为1的时候,就是完全的单条计算。
(5)updater.负责将analyzer计算后发生更新的用户属性发送到ups中,继而提供给搜索引擎使用。
5.系统监控
监控是一个线上系统必不可少的一部分。我们除了使用了一些基础的机器状态监控外,hbase集群还使用了集团hbase团队开发的专用监控系统,非常直观。此外,我们还需要一些业务指标的监控,例如我们的qps,latency,gap(日志处理时间与日志生产时间质检单 间隔),这方面也花费了我们一些心思。例如latency的监控,storm ui本身提供了即时数字的显示,但是没有我们想要的曲线图(或许0.9版本中会有吧)。最后我们选择了基于hbase的监控绘图工具openTSDB。我们通过借助storm的ack机制来统计消息处理的latency,打印到日志中,然后使用一个脚本来搜集这些信息发送给openTSDB服务器来展示曲线。
pora目前在淘宝个性化搜索中稳定运行,每天处理几十亿的日志信息,平均延迟在秒级。
6.经验教训
(1).zookeeper集群独立。因为zookeeper无论对于hbase还是storm都是至关重要的,最好将其单独搞一个负载较低的集群。
(2).hbase表的预分区。尽量将请求分散到各个节点上,至于预分区的原则,就根据业务场景来制定了。例如我们在存储用户全量属性数据时是按照用户名做哈希取模的。
(3).storm使用经验
(a).根据需要修改默认参数。这点是显然的,storm的默认参数并不能符合每个业务场景的需要,在storm源码中的conf/defaults.xml目录下有各个参数的默认取值,用户可根据需要修改。
(b).emit tuple时一定要new list.出于效率的考虑,storm底层的发送线程不会对该list进行深拷贝,会直接使用。如果用户不小心修改了该list,会导致一些莫名其妙的失误。
(c).重启supervopior前删除本地data目录。storm的supervisor会在本地data目录保存一些状态信息,在某些情况下这些状态与zk中的最新状态并不能保持一致,如果不删除data目录,容易导致supervisor重启失败。