The query SQL as below cause one node of Redshift cluster disk full
insert
into
report.agg_info
( pd ,idate ,idate_str ,app_class ,app_superset ,aid ,pf ,is ,camp ,ua_camp_id ,country ,is_predict ,cohort_size ,new_users ,retained ,acc_re ,day_iap_rev ,iap_rev ,day_rev ,rev )
select
p.pd ,
p.idate ,
p.idate_str ,
p.app_class ,
p.app_superset ,
p.aid ,
p.pf ,
p.is ,
p.camp ,
p.ua_camp_id ,
p.country ,
1 as is_predict ,
p.cohort_size ,
p.new_users ,
p.retained ,
ar.acc_re ,
p.day_iap_rev ,
ar.iap_rev ,
p.day_rev ,
ar.rev
from
tmp_predict p
join
tmp_accumulate ar
on p.pd = ar.pd
and p.idate = ar.idate
and p.aid = ar.aid
and p.pf = ar.pf
and p.is = ar.is
and p.camp = ar.camp
and p.ua_camp_id = ar.ua_camp_id
and p.country = ar.country
And query plan is
XN Hash Join DS_DIST_BOTH (cost=11863664.64..218084556052252.12 rows=23020733790769 width=218)
-> XN Seq Scan on tmp_predict p (cost=0.00..3954554.88 rows=395455488 width=188)
-> XN Hash (cost=3954554.88..3954554.88 rows=395455488 width=165)
-> XN Seq Scan on tmp_accumulate ar (cost=0.00..3954554.88 rows=395455488 width=165)
From the above image we know node-39
hold more data than other nodes. Because data is skewed by join
.
To solve this issue, we try to use update
instead of join
update
report.agg_info
set
acc_re = ar.acc_re,
iap_rev = ar.iap_rev,
rev = ar.rev
from
tmp_accumulate ar
where
report.agg_info.pd = ar.pd
and report.agg_info.idate = ar.idate
and report.agg_info.aid = ar.aid
and report.agg_info.pf = ar.pf
and report.agg_info.is = ar.is
and report.agg_info.camp = ar.camp
and report.agg_info.ua_camp_id = ar.ua_camp_id
and report.agg_info.country = ar.country
Query plan
XN Hash Join DS_BCAST_INNER (cost=11863664.64..711819961371132.00 rows=91602 width=254)
-> XN Seq Scan on agg_info (cost=0.00..2.70 rows=270 width=224)
-> XN Hash (cost=3954554.88..3954554.88 rows=395455488 width=170)
-> XN Seq Scan on tmp_accumulate ar (cost=0.00..3954554.88 rows=395455488 width=170)
The data is distributed evenly on all nodes according to the picture. However, there are more data in each nodes.
I want to know, is there any best practice of handling data skew by join in Redshift?
Advertisement
Answer
https://docs.aws.amazon.com/redshift/latest/dg/c-analyzing-the-query-plan.html
Look for the following broadcast operators where there are high-cost operations:
• DS_BCAST_INNER: Indicates the table is broadcast to all the compute nodes, which is fine for a small table but not ideal for a larger table.
• DS_DIST_ALL_INNER: Indicates that all of the workload is on a single slice.
• DS_DIST_BOTH: Indicates heavy redistribution.
DS_DIST_BOTH
in your first query is redistributing both tables on a specific column. You haven’t included the column chosen in your EXPLAIN
snippet but it’s probably the first column in the join.
DS_BCAST_INNER
is broadcasting a complete copy of tmp_accumulate
to every node. Both of these operations are quite expensive and slow.
Your join is very wide and it seems like the first column is quite skewed. You could try 2 approaches to resolve the skew and prevent the broadcast:
- Change the order of columns declared in the join to declare the most unique (or least skewed) column first. The first column will typically be used as the redistribution key. (NB: This would not work if the tables were already diststyle key.)
- Recommended. Since the join is so complex and skewed you could pre-calculate a hash value across these columns and then join on that value.
--Example of Pre-Calculated Hash
CREATE TEMP TABLE tmp_predict
DISTKEY(dist_hash)
AS SELECT FUNC_SHA1(pd||idate::VARCHAR||aid::VARCHAR||pf::VARCHAR
||is::VARCHAR||camp::VARCHAR||ua_camp_id::VARCHAR
||country::VARCHAR) dist_hash
,pd ,idate ,aid ,pf ,is ,camp ,ua_camp_id, country
,…
FROM …
;
CREATE TEMP TABLE tmp_accumulate
DISTKEY(dist_hash)
AS SELECT FUNC_SHA1(pd||idate::VARCHAR||aid::VARCHAR||pf::VARCHAR
||is::VARCHAR||camp::VARCHAR||ua_camp_id::VARCHAR
||country::VARCHAR) dist_hash
,pd ,idate ,aid ,pf ,is ,camp ,ua_camp_id, country
,…
FROM …
;
INSERT INTO report.agg_info
SELECT …
FROM tmp_predict p
JOIN tmp_accumulate ar
ON p.dist_hash = ar.dist_hash
;