I have an Scala/Spark question. I’m using Spark 2.1.1. I have a Dataframe that looks like this:
client | transaction | amount | machine |
---|---|---|---|
0000001 | transaction1 | -0.010000 | user000000001 |
0000002 | transaction2 | 0.010000 | user000000001 |
0000002 | transaction2 | 0.010000 | user000000002 |
0000002 | transaction2 | 0.010000 | user000000003 |
0000003 | transaction3 | -0.010000 | user000000004 |
0000003 | transaction3 | -0.010000 | user000000002 |
0000003 | transaction3 | -0.010000 | user000000003 |
0000003 | transaction3 | -0.010000 | user000000011 |
0000001 | transaction4 | 0.010000 | user000000011 |
Also, I have another Dataframe, a subset:
client | transaction | amount | machine |
---|---|---|---|
0000002 | transaction2 | 0.010000 | user000000001 |
0000002 | transaction2 | 0.010000 | user000000002 |
0000002 | transaction2 | 0.010000 | user000000003 |
0000003 | transaction3 | -0.010000 | user000000004 |
0000003 | transaction3 | -0.010000 | user000000002 |
0000003 | transaction3 | -0.010000 | user000000003 |
0000003 | transaction3 | -0.010000 | user000000011 |
How can I filter the first using the second one? I don’t know if it’s possible a substract option but using two fields as conditions to filter the dataframe. Or a way to perform a join/union but using two different conditions. Why to use two fields as condition? well, if you analize the table, you can see that the transaction2 and transaction3 are repeated n times with different machine identifiers. I need to preserve only the rows with a unique transaction where the machine match with the non-repeated transaction machine. I mean, I need a table like this:
client | transaction | amount | machine |
---|---|---|---|
0000001 | transaction1 | -0.010000 | user000000001 |
0000002 | transaction2 | 0.010000 | user000000001 |
0000003 | transaction3 | -0.010000 | user000000011 |
0000001 | transaction4 | 0.010000 | user000000011 |
I would greatly appreciate your help and guidance with this!
Advertisement
Answer
If you want to substract subset dataframe from first dataframe, you can use a left anti join, as follows:
dataframe.join(subset, dataframe.columns, "left_anti")
Given your input dataframe and your subset, you will get:
+-------+------------+------+-------------+ |client |transaction |amount|machine | +-------+------------+------+-------------+ |0000001|transaction1|-0.01 |user000000001| |0000001|transaction4|0.01 |user000000011| +-------+------------+------+-------------+
Then you can get the machine
column and use an inner join to filter duplicates in your first dataframe. Complete code would be as follows:
dataframe.join(subset, dataframe.columns, "left_anti") .select("machine") .join(dataframe, Seq("machine"))
And you will get your expected result:
+-------------+-------+------------+------+ |machine |client |transaction |amount| +-------------+-------+------------+------+ |user000000001|0000001|transaction1|-0.01 | |user000000001|0000002|transaction2|0.01 | |user000000011|0000003|transaction3|-0.01 | |user000000011|0000001|transaction4|0.01 | +-------------+-------+------------+------+
However, in your case, I don’t think you need to build the subset dataframe, you can get your result by using only first dataframe, as follows:
dataframe.groupBy("transaction") .agg(count("transaction").as("total"), first("machine").as("machine")) .filter(col("total") === 1) .select("machine") .join(dataframe, Seq("machine"))