Skip to content
Advertisement

Is there any better way to avoid data skew by join in Redshift?

The query SQL as below cause one node of Redshift cluster disk full

insert
into
  report.agg_info
  ( pd ,idate ,idate_str ,app_class ,app_superset ,aid ,pf ,is ,camp ,ua_camp_id ,country ,is_predict ,cohort_size ,new_users ,retained ,acc_re ,day_iap_rev ,iap_rev ,day_rev ,rev ) 
select
    p.pd ,
    p.idate ,
    p.idate_str ,
    p.app_class ,
    p.app_superset ,
    p.aid ,
    p.pf ,
    p.is ,
    p.camp ,
    p.ua_camp_id ,
    p.country ,
    1 as is_predict ,
    p.cohort_size ,
    p.new_users ,
    p.retained ,
    ar.acc_re ,
    p.day_iap_rev ,
    ar.iap_rev ,
    p.day_rev ,
    ar.rev
from
    tmp_predict p
join
    tmp_accumulate ar
        on p.pd = ar.pd
        and p.idate = ar.idate
        and p.aid = ar.aid
        and p.pf = ar.pf
        and p.is = ar.is
        and p.camp = ar.camp
        and p.ua_camp_id = ar.ua_camp_id
        and p.country = ar.country

And query plan is

XN Hash Join DS_DIST_BOTH (cost=11863664.64..218084556052252.12 rows=23020733790769 width=218)
    -> XN Seq Scan on tmp_predict p (cost=0.00..3954554.88 rows=395455488 width=188)
    -> XN Hash (cost=3954554.88..3954554.88 rows=395455488 width=165)
      -> XN Seq Scan on tmp_accumulate ar (cost=0.00..3954554.88 rows=395455488 width=165)

enter image description here

From the above image we know node-39 hold more data than other nodes. Because data is skewed by join.

To solve this issue, we try to use update instead of join

update
  report.agg_info
set
  acc_re = ar.acc_re,
  iap_rev = ar.iap_rev,
  rev = ar.rev
from
  tmp_accumulate ar
where
  report.agg_info.pd = ar.pd
  and report.agg_info.idate = ar.idate
  and report.agg_info.aid = ar.aid
  and report.agg_info.pf = ar.pf
  and report.agg_info.is = ar.is
  and report.agg_info.camp = ar.camp
  and report.agg_info.ua_camp_id = ar.ua_camp_id
  and report.agg_info.country = ar.country

Query plan

XN Hash Join DS_BCAST_INNER (cost=11863664.64..711819961371132.00 rows=91602 width=254)
    -> XN Seq Scan on agg_info (cost=0.00..2.70 rows=270 width=224)
    -> XN Hash (cost=3954554.88..3954554.88 rows=395455488 width=170)
      -> XN Seq Scan on tmp_accumulate ar (cost=0.00..3954554.88 rows=395455488 width=170)

enter image description here

The data is distributed evenly on all nodes according to the picture. However, there are more data in each nodes.

I want to know, is there any best practice of handling data skew by join in Redshift?

Advertisement

Answer

https://docs.aws.amazon.com/redshift/latest/dg/c-analyzing-the-query-plan.html

Look for the following broadcast operators where there are high-cost operations:
• DS_BCAST_INNER: Indicates the table is broadcast to all the compute nodes, which is fine for a small table but not ideal for a larger table.
• DS_DIST_ALL_INNER: Indicates that all of the workload is on a single slice.
• DS_DIST_BOTH: Indicates heavy redistribution.

DS_DIST_BOTH in your first query is redistributing both tables on a specific column. You haven’t included the column chosen in your EXPLAIN snippet but it’s probably the first column in the join.

DS_BCAST_INNER is broadcasting a complete copy of tmp_accumulate to every node. Both of these operations are quite expensive and slow.

Your join is very wide and it seems like the first column is quite skewed. You could try 2 approaches to resolve the skew and prevent the broadcast:

  1. Change the order of columns declared in the join to declare the most unique (or least skewed) column first. The first column will typically be used as the redistribution key. (NB: This would not work if the tables were already diststyle key.)
  2. Recommended. Since the join is so complex and skewed you could pre-calculate a hash value across these columns and then join on that value.
--Example of Pre-Calculated Hash
CREATE TEMP TABLE tmp_predict 
    DISTKEY(dist_hash)
AS SELECT FUNC_SHA1(pd||idate::VARCHAR||aid::VARCHAR||pf::VARCHAR
                      ||is::VARCHAR||camp::VARCHAR||ua_camp_id::VARCHAR
                      ||country::VARCHAR) dist_hash
         ,pd ,idate ,aid ,pf ,is ,camp ,ua_camp_id, country
         ,…
     FROM …
;
CREATE TEMP TABLE tmp_accumulate 
    DISTKEY(dist_hash)
AS SELECT FUNC_SHA1(pd||idate::VARCHAR||aid::VARCHAR||pf::VARCHAR
                      ||is::VARCHAR||camp::VARCHAR||ua_camp_id::VARCHAR
                      ||country::VARCHAR) dist_hash
         ,pd ,idate ,aid ,pf ,is ,camp ,ua_camp_id, country
         ,…
     FROM …
;
INSERT INTO report.agg_info
SELECT …
FROM    tmp_predict p
JOIN    tmp_accumulate ar
    ON  p.dist_hash = ar.dist_hash
;
3 People found this is helpful
Advertisement