Spring Boot中整合Flink CDC

news/2025/2/25 14:11:37

Flink CDC(Change Data Capture)是Flink的一种数据实时获取的扩展,用于捕获数据库中的数据变化,并且通过实时流式处理机制来操作这些变化的数据,在Flink CDC中通过Debezium提供的数据库变更监听器来实现对MySQL数据库的监听操作,通过与Spring Boot技术的集成可以更加高效的实现数据实时同步的操作。

下面我们就来介绍一下如何在Spring Boot中集成Flink CDC。

环境搭建

首先我们可以通过Docker容器技术来构建一个MySQL的数据库容器如下所示。

docker run --name mysql -e MYSQL_ROOT_PASSWORD=root -d -p 3306:3306 mysql:8.0

然后我们可以连接数据库然后创建用于测试的数据库表结构,如下所示。

CREATE DATABASE testdb;
USE testdb;

CREATE TABLE employee (
    id INT AUTO_INCREMENT PRIMARY KEY,
    name VARCHAR(255),
    age INT
);

INSERT INTO employee (name, age) VALUES ('John', 28), ('Alice', 30), ('Bob', 25);

搭建好MySQL数据库服务之后,接下来我们可以通过Docker启动Flink服务,如下所示。

docker run -d -p 8081:8081 --name flink-jobmanager flink:latest
docker run -d --link flink-jobmanager --name flink-taskmanager flink:latest taskmanager

在Spring Boot项目中集成Flink CDC

准备好服务之后,接下来我们就来构建一个Spring Boot的项目用来连接Flink CDC。如下所示,首先需要在项目的POM文件中添加Flink CDC和其他所需的依赖

<dependencies>
    <!-- Spring Boot dependencies -->
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-data-jpa</artifactId>
    </dependency>

    <!-- Flink dependencies -->
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-connector-jdbc_2.11</artifactId>
        <version>1.16.0</version>  <!-- 根据需要调整版本 -->
    </dependency>

    <!-- Flink CDC dependencies -->
    <dependency>
        <groupId>com.ververica</groupId>
        <artifactId>flink-connector-debezium-mysql_2.11</artifactId>
        <version>1.16.0</version>
    </dependency>

    <!-- MySQL JDBC driver -->
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.25</version>
    </dependency>
</dependencies>

接下来就需要将Flink CDC连接到MySQL数据库并监听数据变动,需要在Spring Boot的配置文件中添加Flink CDC连接参数,如下所示。

spring.datasource.url=jdbc:mysql://localhost:3306/testdb?useSSL=false&serverTimezone=UTC
spring.datasource.username=root
spring.datasource.password=root

Flink CDC作业实现

接下来就是需要创建一个Flink作业来捕获数据库的变更情况并进行相关的逻辑处理,如下所示。

public class FlinkCDCJob {

    public static void main(String[] args) throws Exception {
        // 1. 创建流处理环境
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2. 配置Flink CDC的Debezium源
        DebeziumSourceFunction<String> sourceFunction = DebeziumSourceFunction
            .<String>builder()
            .hostname("localhost")
            .port(3306)
            .username("root")
            .password("root")
            .databaseList("testdb")
            .tableList("testdb.employee")
            .startupMode(DebeziumSourceFunction.StartupMode.LATEST_OFFSET)
            .deserializer(new JsonNodeDeserializationSchema())
            .build();

        // 3. 创建CDC数据流
        DataStream<String> stream = env.addSource(sourceFunction);

        // 4. 打印数据到控制台
        stream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                return "CDC 数据:" + value;
            }
        }).print();

        // 5. 执行作业
        env.execute("Flink CDC Example");
    }
}

根据上面的代码实现,DebeziumSourceFunction用来配置一个数据库的连接,然后制定好需要监听的数据库以及数据库表,然后我们可以启动项目然后可以尝试往MySQL数据库的employee表中插入、更新或者是删除数据,这个时候我们就可以看到控制台中有对应的数据变化监听打印信息。

监听到数据变化情况之后,接下来,我们可以通过Flink的实时流处理操作将数据推送到Kafka、ElasticSearch等数据存储中。

总结

在上面介绍中,我们介绍了如何在Spring Boot中整合Flink CDC来实现数据库数据变化的实时捕获监听操作,在实际实现中,我们可以根据具体的业务需求对操作进行进一步的扩展,例如可以将CDC数据写入Kafka、Hadoop、Elasticsearch等实时数据平台,构建更强大的数据流处理系统。


http://www.niftyadmin.cn/n/5865587.html

相关文章

Docker 部署 OnlyOffice 文档服务器

Docker 部署 OnlyOffice 文档服务器 前言一、准备工作二、设置变量和目录结构三、创建并运行 OnlyOffice 容器四、访问 OnlyOffice 文档服务器五、配置和管理总结 前言 OnlyOffice 是一个强大的开源文档编辑平台&#xff0c;支持文档、表格、演示文稿等文件格式的编辑。通过 D…

网络安全之Web后端Python

目录 一、安装使用PyCharm及Python 基础语法 1.PyCharm &#xff08;1&#xff09;安装python &#xff08;2&#xff09;安装PyCharm 2.Python基础语法 &#xff08;1&#xff09;打印&#xff08;输出&#xff09; &#xff08;2&#xff09;注释 &#xff08;3&#…

MyBatis在Spring配置文件中注册

Spring集成Mybatis的配置文件中&#xff0c; 1.引入jdbc.properties,是为了注册数据源。 2.注册数据源是为了引入SqlSessionFactoryBean。 3.SqlSessionFactoryBean才是真正Spring与Mybatis的桥梁&#xff0c;引入SqlSessionFactoryBean是为了操作Mapper。 4.所以第四步&am…

angular登录页

说明:登录 logindialog 效果图&#xff1a; step1: import { Component } from angular/core; import {FormGroup, FormControl, Validators, FormsModule, ReactiveFormsModule} from angular/forms; import { MatDialog } from angular/material/dialog; import { AlertDia…

FFmpeg进化论:从av_register_all手动注册到编译期自动加载的技术跃迁

介绍 音视频开发都知道 FFmpeg,因此对 av_register_all 这个 API 都很熟悉,但ffmpeg 4.0 版本开始就已经废弃了,是旧版本中用于全局初始化的重要接口。 基本功能 核心作用:av_register_all() 用于注册所有封装器(muxer)、解封装器(demuxer)和协议处理器(protocol),…

什么是 OCP 数据库专家

OCP 即 Oracle Certified Professional&#xff0c;Oracle 认证专业人员&#xff0c;代表持证人在 Oracle 数据库领域具备专业的技能和知识。获得 OCP 数据库专家认证意味着你在 Oracle 数据库管理、开发、优化等方面达到了较高的水平&#xff0c;能够独立承担复杂的数据库相关…

**模式的好处 (设计模式)

what’s up !? 这样整理下发现更容易理解设计模式了 学习嘛&#xff0c;就是拿着 rua 横着rua 竖着rua 前面rua 后面rua 【’ _ ’ 】 目录 简单工厂模式工厂模式抽象工厂模式单例模式建造者模式原型模式代理模式适配器 模式桥梁 模式装饰 模式门面 模式 &#xff08;也叫 外…

前端(layui表单对应行颜色、登陆页面、轮播)

1&#xff1a;动态获取数据根据数据的不同改变对应行颜色&#xff08;JavaScript&#xff09; done: function (res, curr, count) {console.log(res);// 检查返回的数据是否包含code字段&#xff0c;并且code为0if (res.code "0") {// 数据加载成功console.log(…