找回密码
 立即注册
首页 业界区 安全 SeaTunnel(2.3.12)核心能力总结

SeaTunnel(2.3.12)核心能力总结

上官银柳 昨天 14:25
seatunnel:抽取数据的引擎


  • SeaTunnel Zeta(本地引擎)
  • Spark(集群)
  • Flink(集群)
seatunnel:执行命令

参数解释备注JAVA_OPTS='-Xmx2g -Xms2g'设置本次运行的内存大小-m local本地模式运行

  • SeaTunnel Zeta(本地引擎)
  1. sh /……/seatunnel-2.3.12/bin/seatunnel.sh --config /……/my.conf -i JAVA_OPTS='-Xmx2g -Xms2g' -m local
复制代码

  • Spark引擎
  • Flink引擎
source:大表分片(分片采集)


  • seatunnel:天生支持分片,和以前版本的做法不一样了
  • partition_num:官方将要弃用
  • 现在分片的配置:只需要两个参数:partition_column、split.size
  • partition_num 通常指期望将数据划分成的分片数量,而 split.size 可能指每个分片预期处理的数据量(如行数)。工具在实际运行时,会根据表的总数据量和 split.size 来动态计算实际的分片数。
  • 版本分界与配置对比
    特性维度v1.x 及 v2.0.0 早期v2.0.0 及之后的新版本分片方式手动策略:需在 query 配置中明确写出 WHERE 条件,并配套使用 split 参数自动分片:通过 partition_column 和 split.size 等参数声明,引擎自动计算配置示例需写 "query": "SELECT * FROM table WHERE id BETWEEN ? AND ?", 并配 "split": {"id": {"range": [1, 100, 200]}}只需配 "partition_column": "id" 和 "split.size": 10核心思想用户驱动:用户需了解数据分布并计算分片点引擎驱动:用户声明意图,引擎负责优化执行文档参考旧版 Wiki 或 v1.x 文档你提供的当前最新官方文档
  1.     # 并行读取配置
  2.     # 分片的字段:支持:String、Number(int, bigint, decimal, ...)、Date
  3.     partition_column = "id"
  4.     # 表的分割大小(行数):每个分片的数据行(默认8096行)。最后分片数=表的总行数 / split.size
  5.     split.size = 10000
  6.     # 分片数,匹配并行度parallelism(2.3.12已不推荐配置了,用split.size来代替)
  7.     # partition_num = 5
复制代码
sink:schema_save_mode、data_save_mode

参数类型是否必须默认值解释schema_save_modeEnum否CREATE_SCHEMA_WHEN_NOT_EXIST在同步任务开启之前,根据目标端现有表结构选择不同处理方案。data_save_modeEnum否APPEND_DATA在同步任务开启之前,根据目标端现有数据选择不同处理方案。

  • schema_save_mode [枚举]
在同步任务开启之前,根据目标端现有表结构选择不同处理方案。
选项介绍:
RECREATE_SCHEMA :当表不存在时将创建,保存时删除并重建。
CREATE_SCHEMA_WHEN_NOT_EXIST :当表不存在时创建,保存时跳过。
ERROR_WHEN_SCHEMA_NOT_EXIST :当表不存在时报告错误。
IGNORE :忽略对表的处理。

  • data_save_mode [枚举]
在同步任务开启之前,根据目标端现有数据选择不同处理方案。
选项介绍:
DROP_DATA:保留数据库结构并删除数据。
APPEND_DATA:保留数据库结构,保留数据。
CUSTOM_PROCESSING:用户定义处理。必须配置:custom_sql参数
ERROR_WHEN_DATA_EXISTS:当存在数据时报告错误。
sink:前置sql执行


  • data_save_mode = "CUSTOM_PROCESSING"
  • custom_sql

    • 当 data_save_mode 选择 CUSTOM_PROCESSING 时,您应该填写 CUSTOM_SQL 参数。此参数通常填入可执行的 SQL。SQL 将在同步任务之前执行。

SQL 将在同步任务之前执行。

  • 可以实现:同步删除(执行前置update、truncate的sql等)
  • 注意:这个sql要执行。

    • 必须设置:generate_sink_sql=true。才会执行custom_sql。(只有自动生成sql的时候,这个才会执行)
    • 不能使用:query的sql

sink:开启postgresql的upsert


  • seatunnel自动生成:INSERT INTO …… ON CONFLICT ("主键") DO UPDATE SET …… 的sql

    • 插入数据的时候:

      • 如果主键不重复就插入;
      • 如果主键重复,数据有变化就更新,没变化就不管;


  1. # 生成类似:INSERT INTO …… ON CONFLICT ("主键") DO UPDATE SET …… 的sql
  2. enable_upsert = true
  3. # 判断值唯一的健:此选项用于支持在自动生成 SQL 时进行 insert,delete 和 update 操作。
  4. primary_keys = ["id"]
复制代码
sink:query(目标的query的sql不仅可以写insert,还可以写update或者其他sql)


  • 可以实现:只更新,不新增数据
  1.     query = """UPDATE "public"."t_8_100w_imp_st_ds_demo6_zl_jgx_sjcjzzzd"
  2. SET "user_name"=?, "sex"=?, "decimal_f"=?, "phone_number"=?, "age"=?, "create_time"=?, "description"=?, "address"=?
  3. WHERE "id"=?;"""
复制代码
seatunnel:数据流的概念


  • 数据流的概念:seatunnel中有数据流的概念,通过:plugin_output、plugin_input 实现数据流向

    • 数据流的名字都是自己取的,唯一就行

  1. # demo4-1-mysql2mysql-qxzh-st-107.conf
  2. env {
  3.    ……
  4. }
  5. source {
  6.   jdbc {
  7.     ……
  8.     plugin_output = "source_data"
  9.     ……
  10.   }
  11. }
  12. # 清洗转换
  13. transform {
  14.   # 1. 字段映射:sql中做了,实际生成中不在这里处理。直接在source的query的sql中处理了就行
  15.   FieldRename {
  16.     plugin_input = "source_data"
  17.     plugin_output = "FieldRename_data"
  18.      ……
  19.   }
  20.   # 2. 手机号脱敏:13812341234 -> 138****1234
  21.   Replace {
  22.     plugin_input = "FieldRename_data"
  23.     plugin_output = "Replace_phone_number_data"
  24.      ……
  25.   }
  26.   # 4. 性别转换:1->男,2->女
  27.   # sql的方式替换(演示成功),这种方式,还不如写到source.query的sql中
  28.   Sql {
  29.     plugin_input = "Replace_phone_number_data"
  30.     plugin_output = "Sql_sex_data"
  31.      ……
  32.   }
  33.   # 6. 地址默认值:空地址设为'未知'
  34.   Sql {
  35.     plugin_input = "Sql_sex_data"
  36.     plugin_output = "Sql_address_data"
  37.   }
  38. }
  39. sink {
  40.   jdbc {
  41.     ……
  42.     # 接收的最终数据集
  43.     plugin_input = "Sql_address_data"
  44.     ……
  45.   }
  46. }
复制代码
transform:清洗转换


  • transform中会用到:数据流 (transform如果只有一个组件,可以不用数据流,有两个及以上必须用数据流的概念)
  • 清洗转换方式:

    • 清洗转换方式1:使用seatunnel的transform插件(不推荐,能力有限,大多数用Sql插件实现)
    • 清洗转换方式2:直接在源头的query的sql中搞定(seatunnel的设计倾向)

  • 清洗转换规则:
  1. 字段映射:name → user_name(不用特殊处理:sql中使用as,出来的字段是目标库的字段名就行)
  2. 数据清洗:手机号脱敏 138****1234
  3. 类型转换:年龄字段:字符串转数字(本身是字符串的数字:可以直接保存,不用特殊操作。如果转换错误会报错)
  4. 值转换:性别字段:1=>男;2=>女
  5. 数据过滤:只保留 age>25 的记录
  6. 默认值设置:地址:空地址设为'未知'
复制代码

  • 清洗转换方式1:conf中的transform插件

    • 完整示例见:https://www.cnblogs.com/kakarotto-chen/p/19336931#_label1_6

  1. env {
  2.   ……
  3. }
  4. source {
  5.   jdbc {
  6.     ……
  7.   }
  8. }
  9. # 清洗转换
  10. transform {
  11.   # 1. 字段映射:sql中做了,实际生成中不在这里处理。直接在source的query的sql中处理了就行
  12.   FieldRename {
  13.     plugin_input = "source_data"
  14.     plugin_output = "FieldRename_data"
  15.     specific = [
  16.       {
  17.         field_name = "name"
  18.         target_name = "user_name"
  19.       }
  20.     ]
  21.   }
  22.   # 还可以用:FieldMapper插件,来映射字段
  23.   # 2. 手机号脱敏:13812341234 -> 138****1234
  24.   Replace {
  25.     plugin_input = "FieldRename_data"
  26.     plugin_output = "Replace_phone_number_data"
  27.     replace_field = "phone_number"
  28.     # 正则匹配:第4位到第7位(共11位手机号)
  29.     pattern = "(\\d{3})\\d{4}(\\d{4})"
  30.     replacement = "$1****$2"
  31.     is_regex = true
  32.     replace_first = true
  33.   }
  34.   # 还可以用Sql插件来做脱敏,用这个方式做,还不如直接写到source中query的sql中直接转换
  35.   #Sql {
  36.   #  query = "select id,user_name,sex,decimal_f,CONCAT(LEFT(phone_number, 3), '****', RIGHT(phone_number, 4)) AS phone_number,age,create_time,description,address FROM dual"
  37.   #}
  38.   # 3. 年龄转换:字符串转整数(实际生产中,不用转换,也没有内置的转换插件,可以直接保存成功)
  39.   # 4. 性别转换:1->男,2->女
  40.   # sql的方式替换(演示成功),这种方式,还不如写到source.query的sql中
  41.   Sql {
  42.     plugin_input = "Replace_phone_number_data"
  43.     plugin_output = "Sql_sex_data"
  44.     # 注意:1、使用sql插件,字段必须和source中的字段一致;2、表名可以固定:dual
  45.     query = "SELECT id,user_name,CASE sex WHEN 1 THEN '男' WHEN 2 THEN '女' ELSE '未知' END AS sex,decimal_f,phone_number,age,create_time,description,address FROM dual"
  46.   }
  47.   # Replace的连续替换方案会报错:因为源头sex是int类型,目标sex是varchar类型,Replace1的时候,用的是源头表的sex的int类型,会报错:转换错误,所以转换只能用Sql或者自己写插件
  48.   # 第一个Replace:将"1"替换为"男"
  49.   #Replace {
  50.   #  plugin_input = "Replace_phone_number_data"
  51.   #  plugin_output = "Replace_sex_1_data"
  52.   #  replace_field = "sex"
  53.   #  pattern = 1
  54.   #  replacement = "男"
  55.   #  is_regex = false
  56.   #  # 当 is_regex=false 时,不需要 replace_first 参数
  57.   #}
  58.   # 第二个Replace:将"2"替换为"女"
  59.   #Replace {
  60.   #  plugin_input = "Replace_sex_1_data"
  61.   #  plugin_output = "Replace_sex_2_data"
  62.   #  replace_field = "sex"
  63.   #  pattern = 2
  64.   #  replacement = "女"
  65.   #  is_regex = false
  66.   #  # 当 is_regex=false 时,不需要 replace_first 参数
  67.   #}
  68.   # 5. 数据过滤:只保留 age > 25 的记录。
  69.   # 注意:不能用:Filter,Filter是过滤字段是否要不要的,不是过滤值的。只有使用Sql插件
  70.   # (只能用这种方式)注意:实际生成中,数据过滤不在这里做,在source.Jdbc.query的sql中的where过滤做(效率高)
  71.   # 注意:age在源头表中的类型是varchar,目标库age的类型是int。这里转换类型会报错
  72.   # Sql插件使用的是SeaTunnel内置的SQL解析与执行引擎,它并非完整的数据库,因此在SQL语法支持(特别是类型转换函数)上远不如真实的MySQL。
  73.   #Sql {
  74.   #  plugin_input = "Sql_sex_data"
  75.   #  plugin_output = "Sql_age_data"
  76.   #  # 注意:1、使用sql插件,字段必须和source中的字段一致;2、表名可以固定:dual
  77.   #  query = "SELECT id,user_name,sex,decimal_f,phone_number,CAST(age AS SIGNED) as age,create_time,description,address FROM dual where age > 25"
  78.   #}
  79.   # 6. 地址默认值:空地址设为'未知'
  80.   Sql {
  81.     plugin_input = "Sql_sex_data"
  82.     plugin_output = "Sql_address_data"
  83.     query = "SELECT id,user_name,sex,decimal_f,phone_number,age,create_time,description,case when address is null then '未知' else address end as address FROM dual"
  84.   }
  85.   # 注意:Replace的正则无法匹配null,会直接跳过,所以不能用Replace
  86.   # 第一步:将 NULL 值替换为特殊标记字符串
  87.   #Replace {
  88.   #  plugin_input = "Sql_sex_data"
  89.   #  plugin_output = "Replace_address_1_data"
  90.   #  replace_field = "address"
  91.   #  pattern = "Null"
  92.   #  replacement = "未知"
  93.   #  is_regex = false
  94.   #}
  95. }
  96. sink {
  97.   jdbc {
  98.     ……
  99.   }
  100. }
复制代码

  • 清洗转换方式2:直接在source的query的sql中实现数据过滤和转换

    • 完整示例见:https://www.cnblogs.com/kakarotto-chen/p/19336931#_label1_6    (2.6.3)

  1. # demo4-3-mysql2mysql-qxzh-st-107.conf
  2. env {
  3.   # 并行度(线程数)
  4.   execution.parallelism = 5
  5.   # 任务模式:BATCH:批处理模式;STREAMING:流处理模式
  6.   job.mode = "BATCH"
  7. }
  8. source {
  9.   jdbc {
  10.     ……
  11.     # 使用sql来做清洗转换
  12.     query = "select id,name as user_name,CASE sex WHEN '1' THEN '男' WHEN '2' THEN '女' ELSE '未知' END AS sex,decimal_f,CONCAT(LEFT(phone_number, 3), '****', RIGHT(phone_number, 4)) AS phone_number,CAST(age AS SIGNED) as age,create_time,description,case when address is null then '未知' else address end as address from t_8_100w where age > 25"
  13.     ……
  14.   }
  15. }
  16. # 清洗转换(简单的清洗转换,直接在source的query的sql中处理了就行)
  17. transform {
  18.   # 1. 字段映射:sql中做了,实际生成中不在这里处理。直接在source的query的sql中处理了就行
  19.   # 还可以用:FieldMapper插件,来映射字段
  20.   # 2. 手机号脱敏:13812341234 -> 138****1234
  21.   # 3. 年龄转换:字符串转整数(实际生产中,不用转换,也没有内置的转换插件,可以直接保存成功)
  22.   # 4. 性别转换:1->男,2->女
  23.   # 5. 数据过滤:只保留 age > 25 的记录。
  24.   # 6. 地址默认值:空地址设为'未知'
  25. }
  26. sink {
  27.   jdbc {
  28.     ……
  29.   }
  30. }
复制代码
sink:自动建表、控制自动建表的字段

自动建表

触发条件:

  • 目标表未建表
  • schema_save_mode=CREATE_SCHEMA_WHEN_NOT_EXIST(默认就是,可以不设置)
  • 启用自动生成sql:
控制自动建表的字段


  • 自动建表的基础上,还可以对建表的字段做映射和修改操作,来控制表的创建
  • 有空再做demo
一个任务:多源头、多目标采集


  • 有空再做demo
1

来源:程序园用户自行投稿发布,如果侵权,请联系站长删除
免责声明:如果侵犯了您的权益,请联系站长,我们会及时删除侵权内容,谢谢合作!

相关推荐

您需要登录后才可以回帖 登录 | 立即注册