I want to load the Result Set of a select query in dataframe Spark.
I’m using the following code :
public static void func (Dataset <Row> df){ df.repartition(20); //one connection per partition, see below df.foreachPartition((Iterator<Row> t) -> { Connection conn = DriverManager.getConnection("url", "root", ""); conn.setAutoCommit(true); Statement statement = conn.createStatement(); final int batchSize = 100000; int i = 0; while (t.hasNext()) { Row row = t.next(); try { ResultSet query = statement.executeQuery("SELECT * FROM zones WHERE zones.id IN (" +"'" + row.getAs("idZones") + "'"+ ") "); } catch (SQLException e) { e.printStackTrace(); } finally { } } statement.close(); conn.close(); }); }
There is any posibility to load the ResultSet in a dataframe ?
I need your help
Thank you .
Advertisement
Answer
If I understand your question correctly, you want to load a SQL table in a data frame. To do that, you need to do the following things:
- Create an object of
sparkSession
. - Put your JDBC connection in a
Properties
object. - Load the SQL table by the read method.
- You can apply your relevant filter based on the loaded data frame.
Please find below the code as a sample to do so.
import org.apache.spark.SparkConf; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.functions; import java.util.Properties; import lombok.extern.slf4j.Slf4j; @Slf4j public class ReadFromSQLTable { public static void main(String[] args) { String applicationName = ReadFromSQLTable.class.getName(); SparkConf sparkConf = new SparkConf().setAppName(applicationName).setMaster("local[2]"); // using Dataset<Row> SparkSession sparkSession = SparkSession .builder() .config(sparkConf) .getOrCreate(); Properties connectionProperties = new Properties(); connectionProperties.put("user", "root"); // user name of your SQL database connectionProperties.put("password", "password"); // password of SQL connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver"); // Name of the database that i am interacting with is `test`. You will find this as part of URL. // Name of table that I want to load is the `employee` Dataset<Row> employeeDetail = sparkSession.read().jdbc("jdbc:mysql://127.0.0.1:3306/test", "employee", connectionProperties); log.error("Printing table detail"); employeeDetail.show(); // to show the dataset loaded on the console long count = employeeDetail.count(); System.out.println("The count is = " + count); Dataset<Row> employeeDetail2 = employeeDetail.filter("employee_number < 2"); employeeDetail2.show(); }
You can apply any kind of operation like a filter, select, or any other SQL operation on these dataframe.
I am running this code in my local system. I have added the comment in the code so that you can understand it easily. Do let me know if you have any doubt.
I hope this should give you a fair idea about how to begin loading the SQL table as dataframe.