Chronicle-----流计算中避免FullGC

背景

最近在Jstorm中遇到一个问题,在一个流量很大的Jstorm集群中,其中一个Spout会定期从mysql中同步一些元信息并广播到下游的相关Bolt。当元数据信息不大时,这种广播的方式并不会造成太大的影响。

然而,当元数据信息愈发庞大时,广播元数据带来了各种各样的负面影响。

  1. 首先,当Spout读元数据广播时,大量元数据信息被序列化后,发送给下游Bolt,当下游Bolt收到时,元信息又被反序列化为一个大的对象,并更新。这时候对GC产生了很大的压力。
  2. 其次,每个Bolt作为Worker的一个线程去分别持有一份元数据信息,会占用很大的内存。

解决FGC引起的流量异常

从我们流计算的业务监控中,看到流量突然下跌。几分钟后重新恢复流量。立马看了一下Jstorm日志,发现没有发生系统调度,所以说在Jstorm层面应该没有引起流量的下跌。

那么既然不是Jstorm的原因的话,那只能从Topology层面找找原因。这时候在@简离的指导下,用Jstat看了对应Worker的GC情况。 发现FGC 4->5 的时候,old区增长了近10%,同时触发了FGC。这时候就要结合业务日志来看,这个时间点系统到底在做什么导致old区的迅速增长,由于有近10%的内存增长,应该是一个很大的对象分配。 从业务log中发现,发生FGC的时候,Spout从我们的元数据库读了一份元信息,并广播给下游Bolt。这份元信息是一个有近200W+ entry的HashMap。 为了验证这一猜想,我去看了其他集群运行较久的Job。 可以看到这个Job运行了9天,发生了5000+次FGC,平均下来5分钟发生近两次FGC。正好是我们Spout同步元数据的频率。

小插曲

在部署某个集群时,发现Job总是运行个五分钟就挂了,当时也是“百思不得骑姐”。放大worker的内存,以及适当降低并发数就正常了,当时只是简单的觉得是因为OOM,机器资源不足导致的。后面想到并发数特别高时,每个线程持有元数据相当占用内存空间,比较容易引起OOM。

怎么解决?

又是@简离的一番教育,介绍了我什么是堆外内存,什么是FreeGC编程-。- 主角登场chronicle-map,这个是开源的堆外内存实现的ConcurrentHashMap。原本应用在高频交易的场景下,在高频交易中FGC也是灾难性的。

ChronicleMap is a ConcurrentMap implementation which stores the entries off-heap, serializing/deserializing key and value objects to/from off-heap memory transparently. Chronicle Map supports

Key and value objects caching/reusing for making zero allocations (garbage) on queries. Flyweight values for eliminating serialization/deserialization cost and allowing direct read/write access to off-heap memory.

堆存储 VS 非堆存储

堆存储的优势
  • 常见的,写普通的Java代码。所有有经验的Java开发人员都可以做到。
  • 访问内存的安全性问题。
  • 自动的GC服务——无需自身管理的malloc()/free()操作。
  • 完整的 Java Lock API和JMM相结合。
  • 添加无序列化/复制数据到一个结构中去。
非堆存储的优势
  • 控制"停止一切(Stop the World)"的GC事件到你比较满意的层次。
  • 可以超越在规模上的堆存储结构(当使用堆存储的时候会变得很高)
  • 可以作为一个本地的IPC传输(无需java.net.Socket的IP回送)
  • NIO DirectByteBuffer到/dev/shm (tmpfs)的map 或者直接sun.misc.Unsafe.malloc()

什么才是非堆存储

上图介绍了两个使用ShardHashMap(SHM)作为进程间通信(IPC)的Java VM过程(PID1和PID2)。图表底部的水平轴代表的是完全SHM操作系统的所在域。当OpenHFT对象被操作时,它就会在操作系统中物理内存的用户地址空间或者内核地址空间的某处。往深一层思考,我们知道他们以“关于进程”的局部开始着手。从Linux操作系统来看,JVM是一个a.out (通过调用 gcc呈现的)。当a.out在运行的时候会有一个PID。一个 PID的 a.out (在运行时)包含以下三个方面:

  • 文档(低地址……执行代码的地方)
  • 数据(通过sbrk(2)从低地址升级到高地址来掌管)
  • 栈(从高地址到低地址来掌管)

这是PID在操作系统中的表现形式。也就是说,PID是一个执行的JVM,JVM有它自己操作对象潜在的局部性。

从JVM来看,操作对象作为On-PID-on-heap(一般的Java)或者On-PID-off-heap(通过Unsafe或者NIO到Linux mmap(2)的桥梁)。无论在On-PID-on-heap还是在On-PID-off-heap,所有的操作对象仍然存活在用户的地址空间。在C/C++中,API(操作系统调用的)提供了允许C++操作对象有 Off-PID-off-heap的地方,这些操作对象都寄存在内核地址空间内。

小插曲

在使用chronicle-map的过程中,因为Jstorm只支持java 1.7的缘故,我们只能使用chronicle-map 2.x的版本,在使用过程中,我的测试起了10个进程,每个进程500个线程去创建chronicle-map,发现了其builder源码的bug,源码中,当指定的共享文件存在时并且文件内容长度不为0时,它会通过创建一个ObjectStream 8个bytes地读取文件,然而当并发足够大时,会出现第一个线程开始向文件中写入meta信息,而其他的线程读文件时,恰巧8个bytes中读到了EOF引发EOFERROR,在公司内部版本中我已经修复了这个bug。

使用效果

从图中可以看出,FGC时,内存并不像之前那样出现了old区跳跃式增长;同时从业务监控中,也没有再出现断崖式地流量下跌。然而,我们Job每秒大概处理接近200W+的sql信息,db写入接近每秒600Mb,由于大量的YGC,整个Job并没有能够完全避免FGC。

路漫漫其修远兮,吾将上下而求索。

数据采集的优化之旅

最近在做一个数据同步的小工具,具体场景就是将某个数据库中的几张表,同步到另一个数据库中。

没有采取抓binlog的方式去做,一是解析binlog不是一个很简单的工作。二是对延迟的要求不是很高,2min是可接受范围。所以采取扫全表,全量同步的方式去做。

简单的业务代码写完,发现同步一张9w条数据的表,竟然TMD要近10分钟。立马检查代码的问题。首先要确认主要耗时的是SQL执行、还是在序列化阶段的耗时,通过Guava库中的stopwatch,很好的分析了各个阶段的耗时。惊奇的发现一个简单的SELECT查询,竟然扫了全表。立马检查表结果,发现select查询中的column并没有建立索引。这是踩的第一个坑。

解决了索引问题之后,测试之后,扫一张9w条数据的表耗时接近2min,然而我们线上环境,一张表的数据接近90w条,那么单线程扫一遍,耗时也将接近20min,同样是一个不可接受的时间。

自然而然地想到采用了多线程的方式去完成同步任务。首先,将数据分片,select min(id), max(id) from table;查询到主键最大、最小ID之后,将ID range到不同的线程去执行业务逻辑。

测试之后,发现有的线程耗时很低,有的线程耗时远远超过其他线程几个数量级的耗时。这时,多线程的耗时,取决于最长耗时。立马想到,主键id分布不是均匀的。

为了保证绝对的平均任务数,首先select count(*) from table;得到表中数据的数量之后,num = count / threadNums;相当于每个线程执行的row行数是相同的。select max(id) from (select id from table where id >= min order by id limit num) as t;这条SQL就拿到了每个线程的最大ID值。select * from table where id >= min and id < max;这样就得到了每个线程的数据。

测试之后,任务耗时接近9s,属于可接受范围。

做的一个小工具,也是学到了不少知识。还有很多值得优化的地方。

sql parser By ANTLR

Antlr

识别出所有有效的句子、词组、字词组等,识别语言的程序叫做解析器(Parser)或者语法分析器(syntax analyzer)。

  • recognize all the valid sentences and subphrase
  • parser of syntax analyzers: recognize languages
  • ANTLR meta-language
  • sentence as a input stream, look up words in dictionary

phrase 1:

  • lexical analysis: group characters into words or symbols(tokens)
  • lexer: group the related tokens into token classes or token types

phrase 2:

  • feed off these tokens to recognize the sentence structure
  • parse tree or syntax tree: ANTLR generated parsers build a data structure

basic data flow of a language recognizer

Implementing Parsers

  • recursive-descent parser -> kind of top-down parser implementation

  • graph traced out by invoking methods stat(), assign(), and expr() mirrors the interior parse tree nodes.

charStream -> tokenStream -> parse tree

parse-tree Listeners

ANTLR会为Token生成子类--parseTreeListener,并且实现了每个规则的进入和退出的方法。
ANTLR为每个Rule都会生成一个Context对象,它会记录识别时的所有信息。ANTLR提供了Listener和Visitor两种遍历机制。Listener是全自动化的,ANTLR会主导深度优先遍历过程,我们只需处理各种事件就可以了。而Visitor则提供了可控的遍历方式,我们可以自行决定是否显示地调用子结点的visit方法。

presto中ANTLR的应用

what is Presto?

Presto is an open source distributed SQL query engine for running interactive analytic queries against data sources of all sizes ranging from gigabytes to petabytes.

Presto was designed and written from the ground up for interactive analytics and approaches the speed of commercial data warehouses while scaling to the size of organizations like Facebook.

presto-parser

首先来看一下SqlBase.g4的内容,其中定义了SQL Base的TOKEN。

截取一部分

grammar SqlBase;  
tokens {  
    DELIMITER
}
singleStatement  
    : statement EOF
    ;
singleExpression  
    : expression EOF
    ;
statement  
    : query                                                            #statementDefault
    | USE schema=identifier                                            #use
    | USE catalog=identifier '.' schema=identifier                     #use
    | CREATE TABLE (IF NOT EXISTS)? qualifiedName
        (WITH tableProperties)? AS query
        (WITH (NO)? DATA)?                                             #createTableAsSelect
    | CREATE TABLE (IF NOT EXISTS)? qualifiedName
        '(' tableElement (',' tableElement)* ')'
        (WITH tableProperties)?                                        #createTable
    | DROP TABLE (IF EXISTS)? qualifiedName                            #dropTable
    | INSERT INTO qualifiedName columnAliases? query                   #insertInto
    | DELETE FROM qualifiedName (WHERE booleanExpression)?             #delete
    | ALTER TABLE from=qualifiedName RENAME TO to=qualifiedName        #renameTable
    | ALTER TABLE tableName=qualifiedName
        RENAME COLUMN from=identifier TO to=identifier                 #renameColumn
    | ALTER TABLE tableName=qualifiedName
        ADD COLUMN column=tableElement                                 #addColumn
    | CREATE (OR REPLACE)? VIEW qualifiedName AS query                 #createView
    | DROP VIEW (IF EXISTS)? qualifiedName                             #dropView
    | CALL qualifiedName '(' (callArgument (',' callArgument)*)? ')'   #call
    | GRANT
        (privilege (',' privilege)* | ALL PRIVILEGES)
        ON TABLE? qualifiedName TO grantee=identifier
        (WITH GRANT OPTION)?                                           #grant
    | REVOKE
        (GRANT OPTION FOR)?
        (privilege (',' privilege)* | ALL PRIVILEGES)
        ON TABLE? qualifiedName FROM grantee=identifier                #revoke
    | EXPLAIN ANALYZE?
        ('(' explainOption (',' explainOption)* ')')? statement        #explain

定义好g4之后,antlr会根据定义的Token生成对应的Parser。 包com.facebook.presto.parser.SqlParser中InvokeParser的实现:

private Node invokeParser(String name, String sql, Function<SqlBaseParser, ParserRuleContext> parseFunction)  
    {
        try {
            SqlBaseLexer lexer = new SqlBaseLexer(new CaseInsensitiveStream(new ANTLRInputStream(sql)));
            CommonTokenStream tokenStream = new CommonTokenStream(lexer);
            SqlBaseParser parser = new SqlBaseParser(tokenStream);
            parser.addParseListener(new PostProcessor());
            lexer.removeErrorListeners();
            lexer.addErrorListener(ERROR_LISTENER);
            parser.removeErrorListeners();
            parser.addErrorListener(ERROR_LISTENER);
            ParserRuleContext tree;
            try {          parser.getInterpreter().setPredictionMode(PredictionMode.SLL);
                tree = parseFunction.apply(parser);
            }
            catch (ParseCancellationException ex) {
                tokenStream.reset(); // rewind input stream
                parser.reset();
                parser.getInterpreter().setPredictionMode(PredictionMode.LL);
                tree = parseFunction.apply(parser);
            }
            return new AstBuilder().visit(tree);
        }
        catch (StackOverflowError e) {
            throw new ParsingException(name + " is too large (stack overflow while parsing)");
        }
    }

InvokeParser() 返回的是一个AST语法树。
ASTBuilder 类中定义了各个Node节点的访问方法。

记阿里人工智能研讨会

题外话

之前没有了解过知识图谱,同样也是第一次参加研讨会。 总体而言,研讨会能够快速,高效地了解到学术界目前的研究方向。

Background

知识图谱,简而言之,就是以三元组代表(HeadEntity, Relationship, TailEntity)头实体,尾实体以及它们之间的关系。

知识图谱(Mapping Knowledge Domain)也被称为科学知识图谱,在图书情报界称为知识域可视化或知识领域映射地图,是显示知识发展进程与结构关系的一系列各种不同的图形,用可视化技术描述知识资源及其载体,挖掘、分析、构建、绘制和显示知识及它们之间的相互联系。

具体来说,知识图谱是通过将应用数学、图形学、信息可视化技术、信息科学等学科的理论与方法与计量学引文分析、共现分析等方法结合,并利用可视化的图谱形象地展示学科的核心结构、发展历史、前沿领域以及整体知识架构达到多学科融合目的的现代理论。它把复杂的知识领域通过数据挖掘、信息处理、知识计量和图形绘制而显示出来,揭示知识领域的动态发展规律,为学科研究提供切实的、有价值的参考。

Google知识图谱Wiki

其他代表知识库:

  1. WordNet
  2. Freebase

目前研究方向

分布式表示学习(distributed representation, embeddings)

主要研究思路: 将知识图谱嵌入到低维向量空间
  • 实体和关系都表示为低维向量
  • 有效表示和度量实体、关系间的语义关联

知识表示代表模型:

对每个事实(head, relation, tail),将relation看做从head到tail的翻译操作。

训练的优化目标为: h + r = t

此外还有Neural Tensor Network(NTN)以及Energy Model。 NTN Energy Model

表示学习在处理一对多、多对一、多对多的关系时,不能较好的处理。当出现多个结果时,每个结果的权重相当。

在TransE的基础上考虑关系对实体的影响

有以下两个典型的算法:

  • TransH
  • TransR

TransH TransR

Path Ranking

关系路径的表示学习: Recursive Neural Network(RNN)

考虑了关系路径的TransE算法为PTransE:

relation之间的组合语义,通常包括 ADD, MULTIPLY, RNN

通常关系之间的每个组合,需要单独训练一个目标函数。 在大规模复杂的知识图谱中,目标函数也会呈现指数级增长。

Probabilistic Graphical Models

这个算法,由于落地难的问题,大家都没有讲=.=

王志春-讲解了规则学习的几个方法:

  • 归纳逻辑程序设计 ILP
  • 类似数据挖掘中的关联规则
  • 关系路径
  • 分布式表示

韩先培-介绍了相关无监督语义关系抽取:

  • bootstrapping
  • distant supervision
  • Open IE(Stanford OpenIE)

写在最后

刘知远讲解的TransE非常的Solid,而且开源了算法实现https://github.com/thunlp/KG2E

王泉研究员,我只能献上我的膝盖了,语速很快,思路无敌清晰。简简单单的一个slide就能把当前知识图谱的研究方向洋洋洒洒的讲出来。

最后附上 刘知远的 ppt 大规模知识图谱的表示学习

大规模分布式系统

背景:

Google发表了MapReduce计算范型及其框架的论文。MapReduce和并行数据库系统(MPP)各有优劣并且两者有一定的互补和学习。与传统MPP架构相比,MapReduce更适合非结构化数据的ETL处理类操作,并且可扩展性和容错性占优,但是单机处理效率较低。
DAG计算模型是MapReduce计算机制的一种扩展。MapReduce对于子任务之间复杂的交互和依赖关系缺乏表达能力,DAG计算模型可以表达复杂的并发任务之间的依赖关系。
Spark本质上是DAG批处理系统,其最能发挥特长的领域是迭代式机器学习。

MapReduce计算模型与架构

计算模型

MapReduce计算任务的输入是Key/Value数据对,输出也以Key/Value数据对方式表示。开发者要根据业务逻辑实现Map和Reduce两个接口函数内的具体操作内容,即可完成大规模数据的并行批处理任务。

实例一:单词统计

map(String key, String value):  
    //key: 文档名
    //value: 文档内容
    for each word in value:
        Emit Intermedia(w, "1");

reduce(String key, Iterator values):  
    //key: 单词
    //value: 出现次数列表
    int result = 0;
    for each v in values:
        result += ParseInt(v);//累加values值
    Emit(AsString(result));

实例二: 链表反转

map(String source_url, Iterator outlinks):  
    //key: 网页url
    //value: 出链列表
    for each outlink o in outlinks:
        Emit Intermedia(o, source_url)

reduce(String target_url, Iterator source_urls):  
    //key: target网页url
    //values: source网页url
    list result = [];
    for each v in source_urls:
        Result.append(v);
    Emit(AsString(result));

实例三: 页面点击统计

map(String tuple_id, String tuple):  
    Emit Intermedia(url, "1");

reduce(String url, Iterator list_tuples):  
    int result = 0;
    for each v in list_tuples:
        result += ParseInt(v);
    Emit(AsString(result));

系统架构

MapReduce计算框架架构

处理流程:

  1. MapReduce框架将应用的输入数据切分成M个模块,典型的数据块大小为64MB,然后可以启动位于集群中不同机器上若干程序。
  2. 全局唯一的主控Master以及若干个Worker,Master负责为Worker分配具体的Map任务或Reduce任务并做一些全局管理。
  3. Map任务的Worker读取对应的数据块内容,从数据块中解析一个个Key/Value记录数据并将其传给用户自定义的Map函数,Map函数输出的中间结果Key/Value数据在内存中缓存
  4. 缓存的Map函数产生的中间结果周期性写入磁盘,每个Map函数中间结果在写入磁盘前被分割函数切割成R份,R是Reduce个数。一般用key对R进行哈希取模。Map函数完成对应数据块处理后将R个临时文件位置通知Master,Master再转交给Reduce任务的Worker
  5. Reduce任务Worker接到通知时,通过RPC远程调用将Map产生的M份数据文件pull到本地。(只有所有Map函数完成,Reduce才能执行)。Reduce任务根据中间数据的Key对记录进行排序,相同key的记录聚合在一起
  6. Reduce任务Worker遍历有序数据,将同一个Key及其对应的多个Value传递给用户定义的Reduce函数,reduce函数执行业务逻辑后将结果追加到Reduce对应的结果文件末尾
  7. 所有Map、Reduce任务完成,Master唤醒用户应用程序

为了优化执行效率,MapReduce计算框架在Map阶段还可以执行Combiner操作。

hadoop的MapReduce运行机制基本与google的类似。
hadoop的MapReduce运行机制 不同的是,hadoop采用https协议来进行数据传输,并采用归并排序对中间结果进行排序。
Google的MapReduce框架支持细粒度的容错机制。Master周期性Ping各个Worker,如果Worker没有响应,则认为其已经发生故障。
如果Master故障则单点失效,重新提交任务。

MapReduce不足

  1. 无高层抽象数据操作语言
  2. 数据无Schema及索引
  3. 单节点效率低下
  4. 任务流描述方法单一

优势:

  1. 数据吞吐量高
  2. 支持海量数据处理的大规模并行处理
  3. 细粒度容错
    但是不适合对时效性高的应用场景,比如交互查询或流处理,也不适合迭代计算类的机器学习及数据挖掘类应用。

由于:

  1. 启动时间长
  2. 多处读写磁盘及网络传输

DAG计算模型

有向无环图的简称。在大数据处理领域,DAG计算模型是将计算任务在内部分解成若干子任务,这些子任务之间由逻辑关系或运行先后顺序等因素被构建成有向无环图结构。

DAG计算系统三层结构

  • 最上层是应用表达层,通过一定手段将计算任务分解成若干子任务形成的DAG结构
  • 最下层是物理机集群,由大量物理机器搭建的分布式计算环境
  • 中间层是DAG执行引擎层,将上层以特殊方式表达的DAG计算任务通过转换和映射,将其部署到下层的物理机集群中运行

Dryad

微软DAG计算系统,dryad在实现时以共享内存、TCP连接以及临时文件的方式进行数据传输

Dryad整体架构

Dryad整体架构

job manager负责将逻辑形式存在的DAG描述映射到物理机。NS负责维护集群中当前可以的机器资源。Daemon守护进程作为JM在计算节点上的代理,具体负责子任务的执行和监控。

FlumeJava和Tez

  • FlumeJava构建了java库,本质上是在MapReduce基础上的DAG计算系统,图中每个节点可以看作单个MapReduce子任务。
  • Tez使用Map任务或者Reduce任务作为DAG的图节点,图节点的有向边是数据传输通道。Tez消除Map阶段中间文件输出到磁盘过程以及引入Reduce-Reduce结构改进措施提升执行效率