I would like to format my existing SQL queries inside the PySpark file.
This is how my existing source file looks like:
from flow import flow f = flow(["xxx"], ["xxxxxxxx"]) # this is a comment f.spark.sql( """ select dealer_number location_path_id, '2099-12-31' location_path_end_date, dealer_to_salespoint(dealer_number) sales_point_id, true can_rollup_owner, dealer_number entity, 5 as location_level, calendar_date, 'Sales' period_type, coalesce(m.model,'OTH') model_id, 'daily' as cadence, cpo_coverage_code, cpo_contract_status, 'cpo' feed_name from ( select *, row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status order by a.calendar_date asc, a.filekey desc) rn from ( select `DEALER NUMBER` dealer_number, `CONTRACT STATUS` cpo_contract_status,`COVERAGE CODE` cpo_coverage_code, `CONTRACT NUMBER` cpo_contract_number, `vehicle model` cpo_vehicle_model, to_date(`CONTRACT TRANSACTION DATE`) calendar_date, filekey, * from cpo_v1 ) a ) f where f.rn = 1 """ )
And this is how I wanted it to look like:
from flow import flow f = flow(["xxx"],["abc"],filename=True) f.spark.sql(""" select dealer location_path_id, '2099-12-31' location_path_end_date, dealer_to_salespoint(dealer_number) sales_point_id, true can_rollup_owner, dealer_number entity, 5 as location_level, calendar_date, 'Sales' period_type, coalesce(m.model,'OTH') model_id, 'daily' as cadence, cpo_coverage_code, cpo_contract_status, 'cpo' feed_name from ( select *, row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status order by a.calendar_date asc, a.filekey desc) rn from ( select `DEALER NUMBER` dealer_number, `CONTRACT STATUS` cpo_contract_status, `COVERAGE CODE` cpo_coverage_code, `CONTRACT NUMBER` cpo_contract_number, `vehicle model` cpo_vehicle_model, to_date(`CONTRACT TRANSACTION DATE`) calendar_date, filekey, * from cpo_v1 ) a ) f left join ( select model, alternate_model_name from models_v1 lateral view explode(nvl2(alternate_modelname, split(model_name || ',' || alternate_modelname, ","), split(model_name, ","))) as alternate_model_name ) m on lower(split(f.cpo_vehicle_model,' ')[0]) = lower(m.alternate_model_name) where f.rn = 1 """ ).createOrReplaceTempView("xxx") f.save_view("xxx")
I have already tried using black and other vscode extensions for formatting my code base but no luck since the SQL code is being treated as a python string. Please suggest any workaround
P.S.: I’m having an existing codebase of more than 700+ such files.
Advertisement
Answer
One of the possible options is to use sql-formatter
.
Let’s say we have a test.py
file:
from flow import flow f = flow(["xxx"], ["xxxxxxxx"]) f.spark.sql( """ select dealer_number location_path_id, '2099-12-31' location_path_end_date, dealer_to_salespoint(dealer_number) sales_point_id, true can_rollup_owner, dealer_number entity, 5 as location_level, calendar_date, 'Sales' period_type, coalesce(m.model,'OTH') model_id, 'daily' as cadence, cpo_coverage_code, cpo_contract_status, 'cpo' feed_name from ( select *, row_number() over (partition by a.dealer_number, a.cpo_contract_number, a.cpo_contract_status order by a.calendar_date asc, a.filekey desc) rn from ( select `DEALER NUMBER` dealer_number, `CONTRACT STATUS` cpo_contract_status,`COVERAGE CODE` cpo_coverage_code, `CONTRACT NUMBER` cpo_contract_number, `vehicle model` cpo_vehicle_model, to_date(`CONTRACT TRANSACTION DATE`) calendar_date, filekey, * from cpo_v1 ) a ) f where f.rn = 1 """ )
We can create a script that will read the file as string, find queries by searching for """
, extract them, run them through formatter and replace them:
import re from sql_formatter.core import format_sql with open("test.py", "r") as f_in: text = f_in.read() text = re.sub('"""(.*)"""', lambda x: format_sql(x.group()), text) with open("test.py", "w") as f_out: f_out.write(text)