Skip to content
Advertisement

Copy table from one database to another in multiple threads in C++

There is a huge SQL table in Postgres Database. I’d like to copy the contents of it to another different database (SQL Server) using C++. I have written a single-thread application and it works fine.

I decided to use multiple threads to increase the performance of reading and writing data. Here in the code below I execute the SELECT query in order to get the data from one DB and write it into KeyValue data structure, which holds the data intermediately. Then I form an INSERT query for another DB and execute it.

I have parallelized writing into KeyValue by dividing the data into multiple blocks.

void copy_table(dbName, ...) {
    // PostgreSQL
    Postgres* pg = new Postgres();
    int res = pg->Connect(dbName);
    
    // define number of blocks the table is to be divided in
    std::size_t blockNum = get_block_num(pg, dbName, copyTableName);

    // select all the data from table
    PGresult* pgres = nullptr;
    std::string pqQuery = "SELECT * FROM " + copyTableName;
    res = pg->Execute(pqQuery, &pgres);

    // write the attribute names into a KeyValue store
    int nFields = PQnfields(pgres);

    std::vector<std::string> keys;
    for (int i = 0; i < nFields; i++)
    {
        keys.push_back(std::string(PQfname(pgres, i)));
    }

    int nRows = PQntuples(pgres);

    // Create a KeyValue store of size nFields * nRows with keys
    KeyValue* keyValue = new KeyValue(nFields, nRows, keys);

    // check hardware concurrency
    int threadNum = std::thread::hardware_concurrency();
    if (threadNum < blockNum)
        blockNum = threadNum;

    // parse the data and write it into KeyValue store
    std::vector<std::thread> v_thread(blockNum);
    for (int i = 0; i < blockNum; ++i)
    {
        v_thread[i] = std::thread(get_value, i, i, nRows, nFields, blockNum, keyValue, pgres);
    }
    for (int i = 0; i < blockNum; ++i)
    {
        v_thread[i].join();
    }
    delete pg;

    // SQL Server
    OleDB* oleDBp = new OleDB(serverName, dataSource, userName, password);
    std::string query{};
    create_query(query, keyValue, values, replicaTableName, blockNum);
    int result = oleDBp->Connect();

    result = oleDBp->Execute(query);
    delete oleDBp;
    delete keyValue;
}

In get_data each thread fills the KeyValue with data in different blocks.

// get value by row
// thread routine function
void get_value(int tid, int block_counter, int nRows, int nFields, std::size_t block_num,
    KeyValue* keyValue, PGresult* pgres)
{
    for (int i = 0; i < nFields; i++) {
        // read rows corresponding to certain block
        for (int j = block_counter * (nRows / block_num); j < (std::size_t(block_counter) + 1) * (nRows / block_num); j++) {
            char* value = PQgetvalue(pgres, j, i);
            keyValue->insert(value, i + j * nFields);
        }
    }
}
  1. Is it possible to increase the performance of SELECT query using multiple threads? The point is that it requires more queries to be executed and it might increase the latency.
  2. The same question regarding INSERT. In addition, is there any safety issues followed by insert operation?
  3. Could you give some suggestions how to improve the performance? The approach I used in get_data does not seem to be extremely helpful.

Thanks in advance!

Advertisement

Answer

  • Start several threads or processes that each connect to both databases.

  • Start a REPEATABLE READ READ ONLY transaction to the source database in each thread.

  • Use the snapshot synchronization functions so that all sessions see the same snapshot of the source database and get consistent data.

  • Have each session perform a query like

    SELECT * FROM atable WHERE id % n = k;
    

    where n is the number of threads and k is the thread number.

  • If you start all queries at about the same time, you will get synchonized sequential scans, which will cause each block to be read from storage only once.

  • Make sure that you insert rows into the target databases in transactions spanning some 1000 rows.

User contributions licensed under: CC BY-SA
4 People found this is helpful
Advertisement