I want to load the Result Set of a select query in dataframe Spark.
I’m using the following code :
x
public static void func (Dataset <Row> df){
df.repartition(20); //one connection per partition, see below
df.foreachPartition((Iterator<Row> t) -> {
Connection conn = DriverManager.getConnection("url",
"root", "");
conn.setAutoCommit(true);
Statement statement = conn.createStatement();
final int batchSize = 100000;
int i = 0;
while (t.hasNext()) {
Row row = t.next();
try {
ResultSet query = statement.executeQuery("SELECT * FROM zones WHERE zones.id IN ("
+"'" + row.getAs("idZones")
+ "'"+ ") ");
} catch (SQLException e) {
e.printStackTrace();
} finally {
}
}
statement.close();
conn.close();
});
}
There is any posibility to load the ResultSet in a dataframe ?
I need your help
Thank you .
Advertisement
Answer
If I understand your question correctly, you want to load a SQL table in a data frame. To do that, you need to do the following things:
- Create an object of
sparkSession
. - Put your JDBC connection in a
Properties
object. - Load the SQL table by the read method.
- You can apply your relevant filter based on the loaded data frame.
Please find below the code as a sample to do so.
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import java.util.Properties;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ReadFromSQLTable {
public static void main(String[] args) {
String applicationName = ReadFromSQLTable.class.getName();
SparkConf sparkConf = new SparkConf().setAppName(applicationName).setMaster("local[2]");
// using Dataset<Row>
SparkSession sparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate();
Properties connectionProperties = new Properties();
connectionProperties.put("user", "root"); // user name of your SQL database
connectionProperties.put("password", "password"); // password of SQL
connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
// Name of the database that i am interacting with is `test`. You will find this as part of URL.
// Name of table that I want to load is the `employee`
Dataset<Row> employeeDetail = sparkSession.read().jdbc("jdbc:mysql://127.0.0.1:3306/test",
"employee", connectionProperties);
log.error("Printing table detail");
employeeDetail.show(); // to show the dataset loaded on the console
long count = employeeDetail.count();
System.out.println("The count is = " + count);
Dataset<Row> employeeDetail2 = employeeDetail.filter("employee_number < 2");
employeeDetail2.show();
}
You can apply any kind of operation like a filter, select, or any other SQL operation on these dataframe.
I am running this code in my local system. I have added the comment in the code so that you can understand it easily. Do let me know if you have any doubt.
I hope this should give you a fair idea about how to begin loading the SQL table as dataframe.