Skip to content
Advertisement

how to save database versioning as a sql_last_version variable in logstash

What I’m trying to do is to save the greatest version of rows in the sql_last_value parameter as shown below in my log.conf file after each time it executes.

This is my log.conf for logstash:

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://postgres:5432/test"
        jdbc_driver_library => "/home/postgresql-42.2.18.jar"
        # last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_user => "postgres"
        jdbc_password => "postgres"
        parameters => { "sql_last_version" => 400 }
        schedule => "*/5 * * * * *"
        statement => "SELECT * FROM test where xmin::text::int >= :sql_last_version ORDER BY xmin::text::int "
    }
}

filter{
    
}

output {   
    stdout {
        codec => json_lines
    }
}

for instance, this is my table:

test=# select *,xmin from test where xmin::text::int >= 400 order by xmin::text::int;
 id |   name   | xmin 
----+----------+------
  1 | Robert   |  300
  2 | Jessica  |  400
  3 | Jennifer |  500
  4 | Jack     |  600
(4 rows)

I’m expecting to store 600 as the last_version parameter.

I’m sure if mistaken or not, I’ve got no ideas here, please help. I’m a total newbie to logstash.

Thanks in advance.

Advertisement

Answer

The parameter is called sql_last_value and here is how you can use it:

input {
    jdbc {
        jdbc_connection_string => "jdbc:postgresql://postgres:5432/test"
        jdbc_driver_library => "/home/postgresql-42.2.18.jar"
        # last_run_metadata_path => "/usr/share/logstash/.logstash_jdbc_last_run"
        jdbc_driver_class => "org.postgresql.Driver"
        jdbc_user => "postgres"
        jdbc_password => "postgres"
        schedule => "*/5 * * * * *"
        use_column_value => true
        tracking_column => "xmin"
        statement => "SELECT * FROM test where xmin::text::int >= :sql_last_value ORDER BY xmin::text::int "
    }
}

Here are the modification I made:

  • You don’t need the parameters setting
  • You’re missing the use_column_value setting
  • You’re missing the tracking_column setting
  • The SQL query needs to reference sql_last_value not sql_last_version

After the first run, 600 will be stored in sql_last_value and used in the next call to only retrieve records whose xmin value is >= 600 (which means the last record will be fetched again)

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement