What I’m trying to do is to save the greatest version of rows in the sql_last_value
parameter as shown below in my log.conf
file after each time it executes.
This is my log.conf
for logstash:
input { jdbc { jdbc_connection_string => "jdbc:postgresql://postgres:5432/test" jdbc_driver_library => "/home/postgresql-42.2.18.jar" # last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run" jdbc_driver_class => "org.postgresql.Driver" jdbc_user => "postgres" jdbc_password => "postgres" parameters => { "sql_last_version" => 400 } schedule => "*/5 * * * * *" statement => "SELECT * FROM test where xmin::text::int >= :sql_last_version ORDER BY xmin::text::int " } } filter{ } output { stdout { codec => json_lines } }
for instance, this is my table:
test=# select *,xmin from test where xmin::text::int >= 400 order by xmin::text::int; id | name | xmin ----+----------+------ 1 | Robert | 300 2 | Jessica | 400 3 | Jennifer | 500 4 | Jack | 600 (4 rows)
I’m expecting to store 600 as the last_version
parameter.
I’m sure if mistaken or not, I’ve got no ideas here, please help. I’m a total newbie to logstash.
Thanks in advance.
Advertisement
Answer
The parameter is called sql_last_value
and here is how you can use it:
input { jdbc { jdbc_connection_string => "jdbc:postgresql://postgres:5432/test" jdbc_driver_library => "/home/postgresql-42.2.18.jar" # last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run" jdbc_driver_class => "org.postgresql.Driver" jdbc_user => "postgres" jdbc_password => "postgres" schedule => "*/5 * * * * *" use_column_value => true tracking_column => "xmin" statement => "SELECT * FROM test where xmin::text::int >= :sql_last_value ORDER BY xmin::text::int " } }
Here are the modification I made:
- You don’t need the
parameters
setting - You’re missing the
use_column_value
setting - You’re missing the
tracking_column
setting - The SQL query needs to reference
sql_last_value
notsql_last_version
After the first run, 600
will be stored in sql_last_value
and used in the next call to only retrieve records whose xmin
value is >= 600 (which means the last record will be fetched again)