Skip to content
Advertisement

How to pass parameter in PostgresOperator Airflow using for loop

I am using PostgresOperator and I want to pass table name suffix to my SQL query so when it queries the data, it reads dynamically from the for loop iteration

for country in countries:

matchTimeStamp = ShortCircuitOperator(task_id='Match_Updated_dates_{}'.format(country), provide_context=True,
                                      python_callable=match_dates,op_kwargs={'key1': country}, default_args=default_args)

So as you can see I have passed .format(country) in task_id. I want to do similar stuff by passing country name like in the below SQL statement but seems like Airflow doesn’t like it. Please suggest a correct way As at the end I have passed .format country at the end of my SQL statement

import_redshift_table = PostgresOperator(
    task_id='copy_data_from_redshift_{}'.format(country),
    postgres_conn_id='postgres_default',
    sql='''
        unload ('select * from angaza_public_{}.accounts')
        to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
        credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
        DELIMITER ','
    HEADER
    PARALLEL OFF
'''.format(country))

—– Update, I was able to find a solution——– I added an extra coutry key word in the .format(country,country)

    import_redshift_table = PostgresOperator(
        task_id='copy_data_from_redshift_{}'.format(country),
        postgres_conn_id='postgres_default',
        sql='''
                unload ('select * from angaza_public_{}.accounts')
                to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{}.csv'
                credentials 'aws_access_key_id=AKIA6J7OV4FRSYH6DIXL;aws_secret_access_key=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'
                DELIMITER ','
            HEADER
            PARALLEL OFF
        '''.format(country, country))

Advertisement

Answer

You have an extra parenthesis that is making it not work. Also, I think f-string is more readable than .format. With this, it would work:

import_redshift_table = PostgresOperator(
    task_id=f'copy_data_from_redshift_{country}',
    postgres_conn_id='postgres_default',  # this is not necessary if its the default
    sql=f"""
        unload ('select * from angaza_public_{country}.accounts')
        to 's3://mygluecrawlerbucket/angaza_accounts/to_be_processed/anagaza_{country}.csv'
        credentials 'aws_access_key_id=AWSDD****HHJJJJ;aws_secret_access_key=ABCDEFDHPASSEORD/JmlGjyEQMVOBme'
        DELIMITER ','
    HEADER
    PARALLEL OFF
"""

By the way, it is a good practice to make unloads from Redshift using an IAM Role instead of credentials, so that they don’t appear in the logs.

User contributions licensed under: CC BY-SA
3 People found this is helpful
Advertisement