flink-connector-clickhouse 刺骨的言语ヽ痛彻心扉 2022-09-12 15:57 271阅读 0赞 # 情景介绍 目前要对flink进行多数据源适配工作,需要支持的有pclickhouse,elasticsearch # 版本介绍 flink:1.13.1 elasticsearch:7.6.2 clickhouse:21.9.3.30 # 参考文献 # github flink全量案例demo:https://github.com/zhisheng17/flink-learning flink-connector连接器:https://github.com/DTStack/flinkx flink-sink-clickhouse:https://github.com/ivi-ru/flink-clickhouse-sink # csdn jdbc模式flink写入clickhouse:https://blog.csdn.net/weixin_32265569/article/details/112133937 中间件模式flink写入clickhouse:https://blog.csdn.net/weixin_32265569/article/details/112133937 注:以防原文连接丢失,此处综合两种方式的内容如下所示 flink-sink-clickhouse/elasticsearch 我亲测的github项目地址:https://github.com/ainusers/flink-adapter-datasource # jdbc模式 - flink写入clickhouse 1. clickhouse jdbc 依赖 <!-- 写入数据到clickhouse --> <dependency> <groupId>ru.yandex.clickhouse</groupId> <artifactId>clickhouse-jdbc</artifactId> <version>0.1.54</version> </dependency> 2. User实体类 package com.lei.domain; public class J_User { public int id; public String name; public int age; public J_User(int id, String name, int age) { this.id = id; this.name = name; this.age = age; } public static J_User of(int id, String name, int age) { return new J_User(id, name, age); } } 3. 工具类 package com.lei.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class ClickHouseUtil { private static Connection connection; public static Connection getConn(String host, int port, String database) throws SQLException, ClassNotFoundException { Class.forName("ru.yandex.clickhouse.ClickHouseDriver"); String address = "jdbc:clickhouse://" + host + ":" + port + "/" + database; connection = DriverManager.getConnection(address); return connection; } public static Connection getConn(String host, int port) throws SQLException, ClassNotFoundException { return getConn(host,port,"default"); } public static Connection getConn() throws SQLException, ClassNotFoundException { return getConn("node-01",8123); } public void close() throws SQLException { connection.close(); } } 4. 自定义clickhouse-connector package com.lei.util; import com.lei.domain.J_User; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; public class J_MyClickHouseUtil extends RichSinkFunction<J_User> { Connection connection = null; String sql; public J_MyClickHouseUtil(String sql) { this.sql = sql; } @Override public void open(Configuration parameters) throws Exception { super.open(parameters); connection = ClickHouseUtil.getConn("node-01", 8123, "default"); } @Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } } @Override public void invoke(J_User user, Context context) throws Exception { PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setLong(1, user.id); preparedStatement.setString(2, user.name); preparedStatement.setLong(3, user.age); preparedStatement.addBatch(); long startTime = System.currentTimeMillis(); int[] ints = preparedStatement.executeBatch(); connection.commit(); long endTime = System.currentTimeMillis(); System.out.println("批量插入完毕用时:" + (endTime - startTime) + " -- 插入数据 = " + ints.length); } } 5. 主程序测试类 package com.lei.sinktest; import com.lei.domain.J_User; import com.lei.util.J_MyClickHouseUtil; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; /* 进入clickhouse-client use default; drop table if exists user_table; CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog(); */ public class J05_ClickHouseSinkTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); env.setParallelism(1); // source DataStream<String> inputStream = env.socketTextStream("localhost", 7777); // Transform 操作 SingleOutputStreamOperator<J_User> dataStream = inputStream.map(new MapFunction<String, J_User>() { @Override public J_User map(String data) throws Exception { String[] split = data.split(","); return J_User.of(Integer.parseInt(split[0]), split[1], Integer.parseInt(split[2])); } }); // sink String sql = "INSERT INTO default.user_table (id, name, age) VALUES (?,?,?)"; J_MyClickHouseUtil jdbcSink = new J_MyClickHouseUtil(sql); dataStream.addSink(jdbcSink); dataStream.print(); env.execute("clickhouse sink test"); } } 6. 创建clickhouse表 -- 进入clickhouse-client use default; drop table if exists user_table; CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog(); ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70][] ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70 1][] # 中间件模式 - flink写入clickhouse 1. 添加clickhouse-maven依赖 <dependency> <groupId>ru.ivi.opensource</groupId> <artifactId>flink-clickhouse-sink</artifactId> <version>1.2.0</version> </dependency> 2. User实体类 package com.lei.domain; public class J_User { public int id; public String name; public int age; public J_User(int id, String name, int age) { this.id = id; this.name = name; this.age = age; } public static J_User of(int id, String name, int age) { return new J_User(id, name, age); } // Java Bean 必须实现的方法,信息通过字符串进行拼接 public static String convertToCsv(J_User user) { StringBuilder builder = new StringBuilder(); builder.append("("); // add user.id builder.append(user.id); builder.append(", "); // add user.name builder.append("'"); builder.append(String.valueOf(user.name)); builder.append("', "); // add user.age builder.append(user.age); builder.append(" )"); return builder.toString(); } } 3. flink测试方法 package com.lei.sinktest; import com.lei.domain.J_User; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import ru.ivi.opensource.flinkclickhousesink.ClickHouseSink; import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseClusterSettings; import ru.ivi.opensource.flinkclickhousesink.model.ClickHouseSinkConst; import java.util.HashMap; import java.util.Map; import java.util.Properties; /* 进入clickhouse-client use default; drop table if exists user_table; CREATE TABLE default.user_table(id UInt16, name String, age UInt16 ) ENGINE = TinyLog(); */ public class J05_ClickHouseSinkTestByLib { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); Map<String, String> globalParameters = new HashMap<>(); // ClickHouse cluster properties globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_HOSTS, "http://node-01:8123/"); //globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_USER, ...); //globalParameters.put(ClickHouseClusterSettings.CLICKHOUSE_PASSWORD, ...); // sink common globalParameters.put(ClickHouseSinkConst.TIMEOUT_SEC, "1"); globalParameters.put(ClickHouseSinkConst.FAILED_RECORDS_PATH, "d:/"); globalParameters.put(ClickHouseSinkConst.NUM_WRITERS, "2"); globalParameters.put(ClickHouseSinkConst.NUM_RETRIES, "2"); globalParameters.put(ClickHouseSinkConst.QUEUE_MAX_CAPACITY, "2"); globalParameters.put(ClickHouseSinkConst.IGNORING_CLICKHOUSE_SENDING_EXCEPTION_ENABLED, "false"); // set global paramaters ParameterTool parameters = ParameterTool.fromMap(globalParameters); env.getConfig().setGlobalJobParameters(parameters); env.setParallelism(1); // source DataStream<String> inputStream = env.socketTextStream("localhost", 7777); // Transform 操作 SingleOutputStreamOperator<String> dataStream = inputStream.map(new MapFunction<String, String>() { @Override public String map(String data) throws Exception { String[] split = data.split(","); J_User user = J_User.of(Integer.parseInt(split[0]), split[1], Integer.parseInt(split[2])); return J_User.convertToCsv(user); } }); // create props for sink Properties props = new Properties(); props.put(ClickHouseSinkConst.TARGET_TABLE_NAME, "default.user_table"); props.put(ClickHouseSinkConst.MAX_BUFFER_SIZE, "10000"); ClickHouseSink sink = new ClickHouseSink(props); dataStream.addSink(sink); dataStream.print(); env.execute("clickhouse sink test"); } } 4. 结果验证 ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70 2][] ![watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70 3][] [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70]: /images/20220828/cd29aef0eaf4455fa1846ca9f368afd6.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70 1]: /images/20220828/65d9d9a6e87b464f8622530ecb989dc3.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70 2]: /images/20220828/5ba4eab2243741d9b6b800ef0f33bad1.png [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl8zMjI2NTU2OQ_size_16_color_FFFFFF_t_70 3]: /images/20220828/90e9a25dcb9a4cdf81b09c6eb2a98df1.png
还没有评论,来说两句吧...