The query SQL as below cause one node of Redshift cluster disk full
insert into report.agg_info ( pd ,idate ,idate_str ,app_class ,app_superset ,aid ,pf ,is ,camp ,ua_camp_id ,country ,is_predict ,cohort_size ,new_users ,retained ,acc_re ,day_iap_rev ,iap_rev ,day_rev ,rev ) select p.pd , p.idate , p.idate_str , p.app_class , p.app_superset , p.aid , p.pf , p.is , p.camp , p.ua_camp_id , p.country , 1 as is_predict , p.cohort_size , p.new_users , p.retained , ar.acc_re , p.day_iap_rev , ar.iap_rev , p.day_rev , ar.rev from tmp_predict p join tmp_accumulate ar on p.pd = ar.pd and p.idate = ar.idate and p.aid = ar.aid and p.pf = ar.pf and p.is = ar.is and p.camp = ar.camp and p.ua_camp_id = ar.ua_camp_id and p.country = ar.country
And query plan is
XN Hash Join DS_DIST_BOTH (cost=11863664.64..218084556052252.12 rows=23020733790769 width=218) -> XN Seq Scan on tmp_predict p (cost=0.00..3954554.88 rows=395455488 width=188) -> XN Hash (cost=3954554.88..3954554.88 rows=395455488 width=165) -> XN Seq Scan on tmp_accumulate ar (cost=0.00..3954554.88 rows=395455488 width=165)
From the above image we know node-39
hold more data than other nodes. Because data is skewed by join
.
To solve this issue, we try to use update
instead of join
update report.agg_info set acc_re = ar.acc_re, iap_rev = ar.iap_rev, rev = ar.rev from tmp_accumulate ar where report.agg_info.pd = ar.pd and report.agg_info.idate = ar.idate and report.agg_info.aid = ar.aid and report.agg_info.pf = ar.pf and report.agg_info.is = ar.is and report.agg_info.camp = ar.camp and report.agg_info.ua_camp_id = ar.ua_camp_id and report.agg_info.country = ar.country
Query plan
XN Hash Join DS_BCAST_INNER (cost=11863664.64..711819961371132.00 rows=91602 width=254) -> XN Seq Scan on agg_info (cost=0.00..2.70 rows=270 width=224) -> XN Hash (cost=3954554.88..3954554.88 rows=395455488 width=170) -> XN Seq Scan on tmp_accumulate ar (cost=0.00..3954554.88 rows=395455488 width=170)
The data is distributed evenly on all nodes according to the picture. However, there are more data in each nodes.
I want to know, is there any best practice of handling data skew by join in Redshift?
Advertisement
Answer
https://docs.aws.amazon.com/redshift/latest/dg/c-analyzing-the-query-plan.html
Look for the following broadcast operators where there are high-cost operations:
• DS_BCAST_INNER: Indicates the table is broadcast to all the compute nodes, which is fine for a small table but not ideal for a larger table.
• DS_DIST_ALL_INNER: Indicates that all of the workload is on a single slice.
• DS_DIST_BOTH: Indicates heavy redistribution.
DS_DIST_BOTH
in your first query is redistributing both tables on a specific column. You haven’t included the column chosen in your EXPLAIN
snippet but it’s probably the first column in the join.
DS_BCAST_INNER
is broadcasting a complete copy of tmp_accumulate
to every node. Both of these operations are quite expensive and slow.
Your join is very wide and it seems like the first column is quite skewed. You could try 2 approaches to resolve the skew and prevent the broadcast:
- Change the order of columns declared in the join to declare the most unique (or least skewed) column first. The first column will typically be used as the redistribution key. (NB: This would not work if the tables were already diststyle key.)
- Recommended. Since the join is so complex and skewed you could pre-calculate a hash value across these columns and then join on that value.
--Example of Pre-Calculated Hash CREATE TEMP TABLE tmp_predict DISTKEY(dist_hash) AS SELECT FUNC_SHA1(pd||idate::VARCHAR||aid::VARCHAR||pf::VARCHAR ||is::VARCHAR||camp::VARCHAR||ua_camp_id::VARCHAR ||country::VARCHAR) dist_hash ,pd ,idate ,aid ,pf ,is ,camp ,ua_camp_id, country ,… FROM … ; CREATE TEMP TABLE tmp_accumulate DISTKEY(dist_hash) AS SELECT FUNC_SHA1(pd||idate::VARCHAR||aid::VARCHAR||pf::VARCHAR ||is::VARCHAR||camp::VARCHAR||ua_camp_id::VARCHAR ||country::VARCHAR) dist_hash ,pd ,idate ,aid ,pf ,is ,camp ,ua_camp_id, country ,… FROM … ; INSERT INTO report.agg_info SELECT … FROM tmp_predict p JOIN tmp_accumulate ar ON p.dist_hash = ar.dist_hash ;