Skip to content
Advertisement

Load the ResultSet of query in dataframe using Spark / java

I want to load the Result Set of a select query in dataframe Spark.

I’m using the following code :

public static void func (Dataset <Row> df){
    df.repartition(20); //one connection per partition, see below

    df.foreachPartition((Iterator<Row> t) -> {
        Connection conn = DriverManager.getConnection("url",
                "root", "");

        conn.setAutoCommit(true);
        Statement statement = conn.createStatement();

        final int batchSize = 100000;
        int i = 0;
        while (t.hasNext()) {
            Row row = t.next();
            try {

              ResultSet query =   statement.executeQuery("SELECT * FROM zones WHERE zones.id IN ("
                        +"'"  + row.getAs("idZones")
                        + "'"+ ")  ");

     

            }  catch (SQLException e) {
                e.printStackTrace();
            } finally {
              
            }
        }

        statement.close();
        conn.close();


    });

}

There is any posibility to load the ResultSet in a dataframe ?

I need your help

Thank you .

Advertisement

Answer

If I understand your question correctly, you want to load a SQL table in a data frame. To do that, you need to do the following things:

  1. Create an object of sparkSession.
  2. Put your JDBC connection in a Properties object.
  3. Load the SQL table by the read method.
  4. You can apply your relevant filter based on the loaded data frame.

Please find below the code as a sample to do so.

import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;

import java.util.Properties;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ReadFromSQLTable {
    public static void main(String[] args) {
        String applicationName = ReadFromSQLTable.class.getName();
        SparkConf sparkConf = new SparkConf().setAppName(applicationName).setMaster("local[2]");
        // using Dataset<Row>
        SparkSession sparkSession = SparkSession
                .builder()
                .config(sparkConf)
                .getOrCreate();


        Properties connectionProperties = new Properties();

        connectionProperties.put("user", "root"); // user name of your SQL database
        connectionProperties.put("password", "password"); // password of SQL
        connectionProperties.setProperty("driver", "com.mysql.cj.jdbc.Driver");
// Name of the database that i am interacting with is `test`. You will find this as part of URL.
// Name of table that I want to load is the `employee`
        Dataset<Row> employeeDetail = sparkSession.read().jdbc("jdbc:mysql://127.0.0.1:3306/test",
                "employee", connectionProperties);

        log.error("Printing table detail");
        employeeDetail.show(); // to show the dataset loaded on the console
        long count = employeeDetail.count();
        System.out.println("The count is = " + count);
        Dataset<Row> employeeDetail2 = employeeDetail.filter("employee_number < 2");
        employeeDetail2.show();
}

You can apply any kind of operation like a filter, select, or any other SQL operation on these dataframe.

I am running this code in my local system. I have added the comment in the code so that you can understand it easily. Do let me know if you have any doubt.

I hope this should give you a fair idea about how to begin loading the SQL table as dataframe.

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement