推广

IMPALA&HIVE大数据平台数据血缘与数据地图

iseeyu2年前 (2024-02-21)推广127

测试:使用impala-shell 指定impala daemon节点启动命令行,执行SQL命令,然后查看该daemon节点最新日志。

impala-shell -i uathd01

这里我创建一个视图:

然后到uathd01的节点看最新血缘日志:

把这段json串拿出来格式化一下看看:

queryText:执行的命令

queryId : impala的执行ID

hash: sql的hash

user:执行该命令的用户

timestamp: 开始时间戳

endTime:结束时间戳

edges: 记录每个source到target的映射关系,edgeType为PREDICATE的部分是所有source字段到所有的target字段id的映射,edgeType为PROJECTION的是每个source字段到每个target字段的映射,这里是多对一的关系,即如果有一个目标字段是由两个源字段处理得来的话,这里的sourceid和targetid就是一个多对一的关系,但如果是一个源字段处理出了两个目标字段,在这里仍旧是两个代码块。

vertices: 该SQL内所有的源和目标字段,与edges中的id一一对应。

{

“queryText”:”create view vw_lineage_test5 as select acc.gid,acc.decrypt_name,ind.company_name from dl_nccp.account acc inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted=’0′ and acc.is_valid=’0′ limit 100″,

“queryId”:”1d435f512faba59e:adc0fa8c00000000″,

“hash”:”f1b6e2813084ca457ebb78292715144c”,

“user”:”hive@NOAHGROUPTEST.COM.LOCAL”,

“timestamp”:1586397809,

“endTime”:1586397818,

“edges”: [

{

“sources”: [

1

],

“targets”: [

0

],

“edgeType”:”PROJECTION”

},

{

“sources”: [

3

],

“targets”: [

2

],

“edgeType”:”PROJECTION”

},

{

“sources”: [

5

],

“targets”: [

4

],

“edgeType”:”PROJECTION”

},

{

“sources”: [

1,

6,

7,

8

],

“targets”: [

0,

2,

4

],

“edgeType”:”PREDICATE”

}

],

“vertices”: [

{

“id”:0,

“vertexType”:”COLUMN”,

“vertexId”:”default.vw_lineage_test5.gid”

},

{

“id”:1,

“vertexType”:”COLUMN”,

“vertexId”:”dl_nccp.account.gid”

},

{

“id”:2,

“vertexType”:”COLUMN”,

“vertexId”:”default.vw_lineage_test5.decrypt_name”

},

{

“id”:3,

“vertexType”:”COLUMN”,

“vertexId”:”dl_nccp.account.decrypt_name”

},

{

“id”:4,

“vertexType”:”COLUMN”,

“vertexId”:”default.vw_lineage_test5.company_name”

},

{

“id”:5,

“vertexType”:”COLUMN”,

“vertexId”:”dl_nccp.individual.company_name”

},

{

“id”:6,

“vertexType”:”COLUMN”,

“vertexId”:”dl_nccp.account.is_deleted”

},

{

“id”:7,

“vertexType”:”COLUMN”,

“vertexId”:”dl_nccp.account.is_valid”

},

{

“id”:8,

“vertexType”:”COLUMN”,

“vertexId”:”dl_nccp.individual.gid”

}

]

}

HIVE:

 同impala类似,不再赘述,区别仅仅是日志的json格式以及记录的详细程度的区别。

 应用:

接下来就是如何使用这些血缘的日志,我们已经分析了impala血缘日志的结构,接下来只要使用日志采集工具filebeat或flume,logstash等工具采集每个impala daemon节点上的日志,然后对每个json串进行解析即可,后面的文章会演示如何实时采集impala血缘到kafka,消费kafka里的血缘数据处理后写入neo4j数据库内进行数据血缘数据地图的展示。

impala数据血缘与数据地图系列:

1. 解析impala与hive的血缘日志

2. 实时采集impala血缘日志推送到kafka

—————————————–实时采集impala血缘日志推送到kafka—————————————————–

使用filebeat采集impala的血缘日志并推送到kafka

 采用filebeat的主要原因是因为轻量,对impala的血缘日志采集不需要进行数据过滤和格式转换,因此不需要使用flume或logstash这样占用资源较大的工具。

filebeat的安装及使用请参考官方手册:

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-overview.html

参数配置:

vim conf/filebeat_impala_lineage.yml

#=========================== Filebeat inputs =============================

filebeat.inputs:

-type:log

# Change to true to enable this input configuration.

enabled:true

# Paths that should be crawled and fetched. Glob based paths.

paths:

#这里指定impala血缘目录,会读取该目录下所有日志

-/var/log/impalad/lineage/*

#============================= Filebeat modules ===============================

filebeat.config.modules:

# Glob pattern for configuration loading

path:${path.config}/modules.d/*.yml

# Set to true to enable config reloading

reload.enabled:false

# Period on which files under path should be checked for changes

#reload.period: 10s

#===========kafka output===============

output.kafka:

#指定kafka的节点和topic

hosts:[“uatka01:9092″,”uatka02:9092″,”uatka03:9092”]

topic:wyk_filebeat_impala_lineage_new_demo

required_acks:1

#output.console:

#  pretty: true

DEMO:

启动filebeat,注意每个机器上只能启动一个filebeat进程,因此上面的读取日志不要指定文件名。

$FILEBEAT_HOME/filebeat –c $FILEBEAT_HOME/conf/filebeat_impala_lineage.yml -e

启动kafka consumer:

./kafka-console-consumer.sh –bootstrap-server uatka01:9092,uatka02:9092,uatka03:9092 –topic wyk_filebeat_impala_lineage_new_demo –zookeeper uatka01:2181,uatka02:2181,uatka03:2181

启动impala-shell:

impala-shell -i uathd03

1. 在impala-shell内建一个视图:vw_lineage_test11

 2.查看impala lineage 日志文件,血缘已记录日志:

3. 查看filebeat控制台,已监听日志文件并写入kafka topic内:

4. 查看kafka consumer是否消费到该血缘记录:

流程结束:

impalaSQL–> impala血缘日志–>Filebeat–>Kafka

完成监控impala脚本并将血缘日志推送到kafka内。

后续只需要实时消费kafka里的信息即可。

impala数据血缘与数据地图系列:

1. 解析impala与hive的血缘日志

2. 实时采集impala血缘日志推送到kafka

3. 实时消费血缘记录写入neo4j并验证

—————————————–实时消费impala血缘数据写入neo4j—————————————————–

前两篇介绍了如何采集impala和hive的血缘日志以及如何实时将该日志采集到kafka消息队列中,今天来介绍如何实时消费血缘日志并写入neo4j图数据库进行血缘的展现。

血缘记录Exactly Once

首先要保证filebeat采集的日志记录不会丢数据,因此需要在filebeat监控impala日志的yml配置文件中指定acks=-1,该参数保证kafka的Leader 在返回确认或错误响应之前,会等待所有同步副本都收到消息,因此吞吐量会降低,但血缘监控impala里的执行日志,一般来说我们在impala或hive中的脚本执行也不会有那么高频的查询提交。

filebeat.yml  [kafka参数部分]

#===========kafka output===============

output.kafka:

hosts:[“uatka01:9092″,”uatka02:9092″,”uatka03:9092”]

topic:wyk_filebeat_impala_lineage_new_demo

required_acks:-1

消费的时候也要保证每条数据都准确的被消费到而不是在某一条失败后仍提交offset。因此我们需要在消费kafka的时候配置参数:

enable_auto_commit=false

以及消费每条记录成功之后执行

consumer.commit()

在neo4j中创建唯一约束

这里我使用”库名.表名”作为表的节点唯一标识,”库名.表名.列名”作为列的节点唯一标识。

使用如下命令创建neo4j唯一约束:

CREATECONSTRAINTON(n:IMPALA_TABLE) ASSERT n.nameISUNIQUE;

CREATECONSTRAINTON(n:IMPALA_TABLE_COLUMN) ASSERT n.nameISUNIQUE;

效果验证

开启kafka server –>开启filebeat–> 运行消费者程序 –>开启impala命令行 –>执行DML –>验证效果 –>执行DML –>验证效果

1. 开启kafka sever:

(在kafka篇章里有具体介绍)

$KAFKA_HOME/bin/kafka-server-start.sh -daemon$KAFKA_HOME/config/server.properties

2. 启动filebeat

(参数请参考前面两篇文章):

nohup$FILEBEAT_HOME/filebeat –c$FILEBEAT_HOME/conf/filebeat_impala_lineage_prod.yml -e >$FILEBEAT_HOME/nohup_out.file 2>&1 &

3. 运行消费者程序

执行消费程序.实时解析kafka的数据并写入neo4j.

4. 进入impala命令行

这里是演示,所以我指定其中一台impala节点,同样在第二步里的filebeat也是在该impala daemon节点执行的

impala-shell -i uathd03

impala-shell -i 指定filebeat监控的impala节点

5.执行DML 创建视图

vw_lineage_test,逻辑来源于表dl_nccp.account以及dl_nccp.individual表,字段有account表的gid和decrypt_name,branch_name以及individual表的company_name。

createviewvw_lineage_testas

selectacc.gid,acc.decrypt_name,ind.company_name ,acc.branch_name

fromdl_nccp.accountacc

innerjoindl_nccp.individual indonacc.gid=ind.gidandacc.is_deleted=’0’andacc.is_valid=’0′;

6. 验证效果

使用match命令查看所有到节点’vw_lineage_test’的关系。

MATCHp=()-[r]->(d:IMPALA_TABLE)whered.name=’default.vw_lineage_test’RETURNp

7.执行DML 修改视图逻辑

新增一个来源表dl_nccp.contact以及该表的字段telephone。

alterviewvw_lineage_test2as

selectacc.gid,acc.decrypt_name,ind.company_name,acc.branch_name,c.telephone

fromdl_nccp.accountacc

innerjoindl_nccp.individual indonacc.gid=ind.gidandacc.is_deleted=’0’andacc.is_valid=’0′

innerjoindl_nccp.contact conacc.gid = c.gid

7. 验证效果

可以看到新增的contact表以及telephone字段的关系已经实时更新到neo4j内

MATCHp=()-[r]->(d:IMPALA_TABLE)whered.name=’default.vw_lineage_test’RETURNp

8. 最终效果

实现字段粒度的血缘分析以及表粒度血缘和业务来源库表的血缘 以及元数据的实时采集。

数据地图:

 元数据管理:

架构图:

如果想了解如何实现请参照前面几篇文章:

impala数据血缘与数据地图系列:

1. 解析impala与hive的血缘日志

2. 实时采集impala血缘日志推送到kafka

3. 实时消费血缘记录写入neo4j并验证

———————————Impala血缘 架构图———————————————————–

红色部分是用户会接触到的部分,绿色部分对于用户无感知。

解读:

1. impala是无主的MPP架构,因此用户每次SQL指定的impala节点就是主节点,当用户通过SQL或jdbc/odbc接口查询impala时,SQL命令首先 会发送到impala daemon节点,由该节点的QueryPlanner解析SQL成执行计划后发送给其他daemon节点分别计算各自的数据然后返回给该impala daemon节点。 所以我们只要在每台impala daemon节点部署filebeat并监控血缘日志即可。

2. 使用Filebeat监控impala血缘日志后发送到kafka集群指定的topic中;

3. 解析kafka内的血缘日志,将元数据(user,timestamp,id等信息),实体(表,字段),关系(表到表,字段到字段,字段到表)识别出来;

4. 将第三步里的结果存储进Neo4J;

5. 用户可以使用CQL或封装的接口对Neo4J里存储的impala血缘进行实时的查询;

功能介绍:

实时血缘:

建视图:逻辑如下

create view vw_lineage_test as

select acc.gid,acc.decrypt_name,ind.company_name ,acc.branch_name

from dl_nccp.account acc

inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted=’0′ and acc.is_valid=’0′;

修改视图逻辑:新增一个来源表contract以及该表的telephone字段

alter view vw_lineage_test as

select acc.gid,acc.decrypt_name,ind.company_name,acc.branch_name,c.telephone

from dl_nccp.account acc

inner join dl_nccp.individual ind on acc.gid=ind.gid and acc.is_deleted=’0′ and acc.is_valid=’0′

inner join dl_nccp.contact c on acc.gid = c.gid

全类型血缘:

目前已实现字段到表,字段到字段,表到表,表到库级别的全类型血缘关系:

技术元数据管理:

实时更新数据字典、ETL任务元数据:

影响分析:

指定节点向后进行影响分析:

血缘分析:

指定节点向前进行血缘分析:

深度查询:

可指定血缘的查询深度:

扫描二维码推送至手机访问。

版权声明:本文由西安泽虎代运营发布,如需转载请注明出处。

转载请注明出处https://0291.com.cn/post/57070.html

相关文章

三个维度分析新媒体抖音IP变现 没有千万粉丝也可以赚钱。

三个维度分析新媒体抖音IP变现 没有千万粉丝也可以赚钱。

这些账号看着比较简单,没有几千万粉丝,但是却可以赚很多钱?是因为粉丝。做抖音是为了什么?是为了,不赚钱不要做抖音!一定要变现!所以在做每一个账号的时候,一定要做到实现变现。一个账号一个月之内不变现就抛弃了,哪怕有100万粉丝都没有用。 在之初,有做一个抖音号,30多万粉丝,可是一个月一分钱也没...

友链链接SEO优化返回值是关键。

友链链接SEO优化返回值是关键。

目前,关于的优化有很多常识,比如必然的关联性,如何找到一个有成长潜力的友情链接,友情链接的数量不能太多等等,但是这些友情链接的培养是受作者的限制在一个框架内的,那就是在网上做友情链接基于SEO角度的网站优化。也许从一个新站诞生之日起,友链的影响力就很长,尤其是那些与你友链网站权重抗衡的人,那对于你...

新站整站优化从哪些方面着手布局。

新站整站优化从哪些方面着手布局。

岳阳SEO:新站整站优化从哪些方面着手布局。整站优化为顾客出示:网站宣传推广、互联网营销管理方法、网站的健全转变、网站中后期升级维护保养、网站的集约化实际操作等运营管理。网站的维护和营销推广以及关键。 整站优化基本原理是意见反馈高品质数据信息给百度搜索,从而提高网站排名、词量和总...

网站SEO伪原创文章内容如何增加收录。

网站SEO伪原创文章内容如何增加收录。

随着做网络优化的企业数量不断地增长,而做网站推广要想能够在搜索引擎中获得更多的流量与更多关键词排名,那么原创内容是必不可少的,此外也可以大量更新伪原创内容,来帮助网站获取排名,要增加网站排名,首先就要让网站有一个稳定的收录。那么网站SEO伪原创文章内容如何增加收录? 1、检查网站是否出现...

拼多多上现金提现是真的吗?揭秘背后的真相!

拼多多上现金提现是真的吗?揭秘背后的真相!

近年来,社交电商拼多多在我国迅速崛起,凭借其低价策略和微信社交传播,吸引了大量用户。然而,随着拼多多的火爆,有关“拼多多上现金提现是真的吗”的疑问也随之而来。今天,我就来为大家揭秘背后的真相,让你用更明智的眼光看待这个问题。 我们要明确拼多多的商业模式。拼多多是一家基于社交网络的电商...

小编分享想要比同行更具优势,公司网站如何做推广。

小编分享想要比同行更具优势,公司网站如何做推广。

作为一些消费者而言,有时购买产品,如果其品牌知名度更高,受众会更愿意倾向于选择购买。这时,对于一些企业来说,积极做网站推广,提高品牌的影响力就显得尤为重要。那么,想要比同行优势,公司网站推广?别急,今天小编将为你分享有关这方面的一些干货,快来看看吧。 1、网络推广,必须有自己的一套策...

现在,非常期待与您的又一次邂逅

我们努力让每一部企业宣传片和抖音短视频成为商业大片