Flink-Kafka-MySQL 喜欢ヅ旅行 2021-12-24 00:27 250阅读 0赞 **2018年开始处理大数据相关的业务,Flink作为流处理新秀,在实时计算领域发挥着越来越大作用,本文主要整理在以往开发中Flink使用Kafka作为数据源,计算处理之后,再将数据存到MySQL的处理过程。** ![在这里插入图片描述][watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIzMDA2Mw_size_16_color_FFFFFF_t_70] **前置条件** 启动zookeeper,启动kafka **业务系统发送消息到Kafka,使之作为数据源** import com.alibaba.fastjson.JSON; import com.example.flinkdemo.model.User; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; @Slf4j public class KafkaSender { private static final String kafkaTopic = "flink-kafka"; private static final String brokerAddress = "192.168.100.10:9092"; private static Properties properties; private static void init() { properties = new Properties(); properties.put("bootstrap.servers", brokerAddress); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); } public static void main(String[] args) throws InterruptedException { init(); sendUrlToKafka(); // 发送kafka消息 } private static void sendUrlToKafka() { KafkaProducer producer = new KafkaProducer<String, String>(properties); long currentMills = System.currentTimeMillis(); User user = new User(); for (int i = 0; i <100000 ; i++) { user.setId(i+""); user.setName("test-flink-kafka-mysql"+i); user.setAge(i+""); // 确保发送的消息都是string类型 String msgContent = JSON.toJSONString(user); ProducerRecord record = new ProducerRecord<String, String>(kafkaTopic, null, null, msgContent); producer.send(record); log.info("send msg:" + msgContent); } producer.flush(); } } **Flink从Kafka取数据进行计算** import com.alibaba.fastjson.JSON; import com.example.flinkdemo.flink1.MySQLSink; import com.example.flinkdemo.model.User; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import java.util.Properties; /** * kafka作为数据源,消费kafka中的消息 */ public class KafkaDatasouceForFlinkJob { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.put("bootstrap.servers","192.168.100.10:9092"); properties.put("zookeeper.connect","192.168.100.10:2181"); properties.put("group.id","metric-group"); properties.put("auto.offset.reset","latest"); properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); DataStreamSource<String> dataStreamSource = env.addSource( new FlinkKafkaConsumer010<String>( // topic "flink-kafka" , new SimpleStringSchema(), properties ) ).setParallelism(1); DataStream<Tuple3<String, String, String>> sourceStreamTra = dataStreamSource.filter(new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return StringUtils.isNotBlank(value); } }).map(new MapFunction<String, Tuple3<String, String, String>>() { private static final long serialVersionUID = 1L; @Override public Tuple3<String, String, String> map(String value) throws Exception { User user = JSON.parseObject(value,User.class); return new Tuple3<String, String, String>(user.getId(),user.getName(),user.getAge()); } }); //传入数据库 sourceStreamTra.addSink(new MySQLSink()); env.execute("Flink add kafka data source"); } } **数据库存储类** import java.sql.DriverManager; import java.sql.Connection; import java.sql.PreparedStatement; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; public class MySQLSink extends RichSinkFunction<Tuple3<String, String, String>> { private static final long serialVersionUID = 1L; private Connection connection; private PreparedStatement preparedStatement; String username = "root"; String password = "123456"; String drivername = "com.mysql.jdbc.Driver"; String dburl = "jdbc:mysql://localhost:3306/cmcc?serverTimezone=GMT"; @Override public void invoke(Tuple3<String, String, String> value) throws Exception { Class.forName(drivername); connection = DriverManager.getConnection(dburl, username, password); String sql = "replace into user(id,name,age) values(?,?,?)"; preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(1, value.f0); preparedStatement.setString(2, value.f1); preparedStatement.setString(3, value.f2); preparedStatement.executeUpdate(); if (preparedStatement != null) { preparedStatement.close(); } if (connection != null) { connection.close(); } } } **pom依赖** <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <scope>runtime</scope> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--引入flink 相关依赖--> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-runtime --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-runtime_2.12</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table_2.11</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-jdbc_2.11</artifactId> <version>1.7.2</version> </dependency> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connectr-kafka --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.1</version> </dependency> <!--Kafka--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>1.0.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.10_2.11</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.7.2</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.12</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.22</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-shaded-guava</artifactId> <version>18.0-5.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.1</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka</artifactId> <version>0.9.1</version> </dependency>``` [watermark_type_ZmFuZ3poZW5naGVpdGk_shadow_10_text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L3dlaXhpbl80MzIzMDA2Mw_size_16_color_FFFFFF_t_70]: /images/20211223/f12579df2333442495c79062cf72f719.png
还没有评论,来说两句吧...