Skip to main content
 首页 » 编程设计

Logstash jdbc插件实现数据增量更新

2022年07月19日132Leo_wl

Logstash jdbc插件实现数据增量更新

上节我们看了如何抽取关系型数据库数据至elasticsearch,但实际中我们需要实现增量更新,本文讨论如何实现增量更新。

更新elasticsearch数据

确保elasticsearch中数据为最新的,即原数据表数据更新后推送至elasticsearch中。一般有两种方法:

  1. 定时运行相同的logstash配置文件,则会一遍一遍发送数据————包括已改变的数据和未改变的数据。
  2. 仅发送表中已改变的数据。

下面我们分别探讨这两种方法,逐步掌握一些关键配置,最终实现数据高效地增量更新。

第一种方法——全量更新

第一种方法,配置文件大致如下:

input { 
    jdbc { 
        jdbc_driver_library => "config.d/ojdbc5.jar" 
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
        jdbc_connection_string => "jdbc:oracle:thin:@192.168.0.192:1521:orcl" 
        jdbc_user => "testuser" 
        jdbc_password => "yourpassword" 
        jdbc_paging_enabled => "true" 
        statement => "select comp_name,comp_type,unified_code,id as person_id from leg_info" 
        schedule => "0 5 * * * *" 
    } 
} 
output { 
    elasticsearch { 
        index => "leg_base_info" 
        document_id=> "%{person_id}" 
        hosts => ["http://localhost:9200"] 
    } 
} 

使用’schedule’选项实现周期运行。这里指定每5分钟运行一次。你可以根据业务需要进行定义,语法可以参考cron教程

那为什么要指定document_id选项呢?

在elasticsearch中,每个文档都会创建唯一ID,用于标识该文档。因为需要反复运行logstash配置文件,会重复创建相同的文档。通过使用document_id字段可以避免创建重复文档。这里我们告诉插件使用表的主键(id字段)作为文档ID。这样elasticsearch就不会对相同记录创建多个文档,已存在的文档会被覆盖。elasticsearch会保存新增的记录和更新的记录。

第二种方法——增量更新

为了在elasticsearch中实现增量更新,则需要数据库中有一列用作参考,示例表中有update_date字段表示该行数据最后更新时间。下面使用该列,配置文件如下:

input { 
    jdbc { 
        jdbc_driver_library => "config.d/ojdbc5.jar" 
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
        jdbc_connection_string => "jdbc:oracle:thin:@192.168.0.192:1521:orcl" 
        jdbc_user => "testuser" 
        jdbc_password => "yourpassword" 
        jdbc_paging_enabled => "true" 
        statement => "select comp_name,comp_type,unified_code,id as person_id from leg_info where update_date > :sql_last_value" 
        tracking_column_type => "timestamp" 
        schedule => "0 5 * * * *" 
    } 
} 
output { 
    elasticsearch { 
        index => "leg_base_info" 
        document_id=> "%{person_id}" 
        hosts => ["http://localhost:9200"] 
    } 
} 

这里我使用了新的选项 :sql_last_value. 其值用于过滤数据,初始为1970 1 月1 日 星期四。然后跳至当前时间,随后根据schedule 选项中指定的时间周期进行递增。

当运行上述logstash配置文件时,控制台输出大概如下:

select * from leg_info where update_date>’2019–07–20 13:08:00' 
select * from leg_info where update_date>’2019–07–20 13:13:00' 
select * from leg_info where update_date>’2019–07–20 13:18:00' 

sql_last_value参数值根据scheduler自动更新。sql_last_value 存储在哪儿?

其自动被保存在名为 .logstash_jdbc_last_run 的元数据文件中. windows 中的位置为 C:/Users/%username% , Linux 则相应的home文件夹中。

我们也可以配置仅抽取update_date大于最后推送记录update_date的数据,而不是根据scheduler增加 :sql_last_value 的值。如果update_date是在程序级别而不是数据库级别设置的,这非常合适(因为Logstash可以在记录保存到数据库时运行查询)。因此,我们可能会错过一些记录)。

下面的示例配置文件,使用use_column_value字段告诉logstash使用该列的值:

input { 
    jdbc { 
        jdbc_driver_library => "config.d/ojdbc5.jar" 
        jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver" 
        jdbc_connection_string => "jdbc:oracle:thin:@192.168.0.192:1521:orcl" 
        jdbc_user => "testuser" 
        jdbc_password => "yourpassword" 
        jdbc_paging_enabled => "true" 
        statement => "select comp_name,comp_type,unified_code,comp_address,id as person_id from leg_info where update_date > :sql_last_value" 
        use_column_value => true 
        tracking_column => "update_date" 
        tracking_column_type => "timestamp" 
        schedule => "0 5 * * * *" 
    } 
} 
output { 
    elasticsearch { 
        index => "leg_base_info" 
        document_id=> "%{person_id}" 
        hosts => ["http://localhost:9200"] 
    } 
} 

update_date是timestamp列。因此我们额外指定tracking_column_type选项。另外需通过use_column_value选项指定用于被跟踪的特定列。这样:sql_last_value在包括最后被推送记录的update_date字段值。因此,当运行该计划,将查询增量数据。

## 总结

本文我们讨论如何使用Logstash实现增量抽取关系型数据库数据至Elasticsearch中。


本文参考链接:https://blog.csdn.net/neweastsun/article/details/96581340
阅读延展