Centos7.8 如何快速地将Hive中的数据导入ClickHouse

比眉伴天荒 2023-01-20 09:34 12阅读 0赞

1. 前景概要

ClickHouse是面向OLAP的分布式列式DBMS。我们部门目前已经把所有数据分析相关的日志数据存储至ClickHouse这个优秀的数据仓库之中,当前日数据量达到了300亿。
针对存储在Hive中的结构化数据,我们应该怎么操作呢?

1.1. Hive to ClickHouse

假定我们的数据已经存储在Hive中,我们需要读取Hive表中的数据并筛选出我们关心的字段,或者对字段进行转换,最后将对应的字段写入ClickHouse的表中。

1.2. Hive Schema

我们在Hive中存储的数据表结构如下,存储的是很常见的Nginx日志

  1. CREATE TABLE `nginx_msg_detail`(
  2. `hostname` string,
  3. `domain` string,
  4. `remote_addr` string,
  5. `request_time` float,
  6. `datetime` string,
  7. `url` string,
  8. `status` int,
  9. `data_size` int,
  10. `referer` string,
  11. `cookie_info` string,
  12. `user_agent` string,
  13. `minute` string)
  14. PARTITIONED BY (
  15. `date` string,
  16. `hour` string)

1.3. ClickHouse Schema

我们的ClickHouse建表语句如下,我们的表按日进行分区

  1. CREATE TABLE cms.cms_msg
  2. (
  3. date Date,
  4. datetime DateTime,
  5. url String,
  6. request_time Float32,
  7. status String,
  8. hostname String,
  9. domain String,
  10. remote_addr String,
  11. data_size Int32
  12. ) ENGINE = MergeTree PARTITION BY date ORDER BY (date, hostname) SETTINGS index_granularity = 16384

2. Waterdrop with ClickHouse

接下来会给大家介绍,我们如何通过Waterdrop将Hive中的数据写入ClickHouse中。
Waterdrop的环境准备以及安装步骤这里就不一一赘述了,具体安装步骤可以参考上一篇文章

2.1. Waterdrop Pipeline

我们仅需要编写一个Waterdrop Pipeline的配置文件即可完成数据的导入。
配置文件包括四个部分,分别是Spark、Input、filter和Output。

2.1.1 Spark

这一部分是Spark的相关配置,主要配置Spark执行时所需的资源大小。

  1. spark {
  2. // 这个配置必需填写
  3. spark.sql.catalogImplementation = "hive"
  4. spark.app.name = "Waterdrop"
  5. spark.executor.instances = 2
  6. spark.executor.cores = 1
  7. spark.executor.memory = "1g"
  8. }

2.1.2 Input

这一部分定义数据源,如下是从Hive文件中读取text格式数据的配置案例。

  1. input {
  2. hive {
  3. pre_sql = "select * from access.nginx_msg_detail"
  4. table_name = "access_log"
  5. }
  6. }

看,很简单的一个配置就可以从Hive中读取数据了。其中pre_sql是从Hive中读取数据SQL,table_name是将读取后的数据,注册成为Spark中临时表的表名,可为任意字段。

需要注意的是,必须保证hive的metastore是在服务状态。

在Cluster、Client、Local模式下运行时,必须把hive-site.xml文件置于提交任务节点的$HADOOP_CONF目录下

2.1.3 Filter

在Filter部分,这里我们配置一系列的转化,我们这里把不需要的minute和hour字段丢弃。当然我们也可以在读取Hive的时候通过pre_sql不读取这些字段

  1. filter {
  2. remove {
  3. source_field = ["minute", "hour"]
  4. }
  5. }

2.1.4 Output

最后我们将处理好的结构化数据写入ClickHouse

  1. output {
  2. clickhouse {
  3. host = "your.clickhouse.host:8123"
  4. database = "waterdrop"
  5. table = "nginx_log"
  6. fields = ["date", "datetime", "hostname", "url", "http_code", "request_time", "data_size", "domain"]
  7. username = "username"
  8. password = "password"
  9. }
  10. }

3. 运行Waterdrop

我们将上述四部分配置组合成为我们的配置文件config/batch.conf

  1. vim config/batch.conf
  2. spark {
  3. spark.app.name = "Waterdrop"
  4. spark.executor.instances = 2
  5. spark.executor.cores = 1
  6. spark.executor.memory = "1g"
  7. // 这个配置必需填写
  8. spark.sql.catalogImplementation = "hive"
  9. }
  10. input {
  11. hive {
  12. pre_sql = "select * from access.nginx_msg_detail"
  13. table_name = "access_log"
  14. }
  15. }
  16. filter {
  17. remove {
  18. source_field = ["minute", "hour"]
  19. }
  20. }
  21. output {
  22. clickhouse {
  23. host = "your.clickhouse.host:8123"
  24. database = "waterdrop"
  25. table = "access_log"
  26. fields = ["date", "datetime", "hostname", "uri", "http_code", "request_time", "data_size", "domain"]
  27. username = "username"
  28. password = "password"
  29. }
  30. }

执行命令,指定配置文件,运行Waterdrop,即可将数据写入ClickHouse。这里我们以本地模式为例。

  1. ./bin/start-waterdrop.sh --config config/batch.conf -e client -m 'local[2]'

在这篇文章中,我们介绍了如何使用Waterdrop将Hive中的数据导入ClickHouse中。仅仅通过一个配置文件便可快速完成数据的导入,无需编写任何代码,十分简单。

发表评论

表情:
评论列表 (有 0 条评论,12人围观)

还没有评论,来说两句吧...

相关阅读