Bulk Upload in Google Cloudspanner

A new version of this article can be found here: http://www.googlecloudspanner.com/2017/09/bulk-upload-in-google-cloud-spanner.html. Click on the link if you are not redirected automatically.

Google Cloud Spanner is Google’s horizontally scalable, globally consistent, relational database service. Cloud Spanner claims to be the first and only relational database service that is both strongly consistent and horizontally scalable. Cloud Spanner supports transactions and SQL queries and could in theory be used with any application currently running on other relational databases. This tutorial shows how you could convert an existing relational database and bulk upload all data to a new Google Cloud Spanner database using this open source JDBC driver.

Topics

This tutorial contains the following topics: . Convert an existing relational database to Cloud Spanner . Bulk upload data to Cloud Spanner using multiple parallel workers . Connect to a Cloud Spanner database with JDBC and use the following features (not supported by the JDBC driver supplied by Google): .. Transactions .. DDL statements (CREATE TABLE) .. DML statements (INSERT INTO, DELETE FROM)

How it Works

This bulk upload example uses JDBC to connect to both the source (in this example: PostgreSQL) and destination Google Cloud Spanner database. The JDBC driver supplied by Google does not support DDL and DML statements, so we use this JDBC driver instead: https://github.com/olavloite/spanner-jdbc

Table conversion

The converter reads all tables in the source database and generates Google Cloud Spanner compliant DDL statements (CREATE TABLE…​) for the creation of the tables. The converter can be run in both SkipExisting and DropAndRecreate mode. It defaults to SkipExisting. The table creation process runs as a single thread process in autocommit mode. The main code can be found in TableConverter .

Data conversion

The converter then iterates over all the tables in the source database and copies the data from the source to the destination. This is done by generating DML-statements (INSERT INTO…​) for each row. Whether this process is executed single-threaded or multi-threaded using multiple workers, depends on the size of the table. It defaults to a threshold of 1,000 records. If the source table contains more records, the upload will be handled by multiple workers. This example project defaults to 10 upload workers running in parallel. Each worker opens a new connection to Cloud Spanner and runs its own transactions. Transactions are not supported by the JDBC driver supplied by Google, but the JDBC driver used by this example does.

Step 1: Create a Cloud Spanner test database

This tutorial assumes that you already have a Google Cloud Spanner account and an empty test database. If you do not, and you want to know how to get it, follow the steps in one of my other tutorials: https://olavloite.github.io/2017/03/11/Google-Cloud-Spanner-with-Spring-Boot-JPA-and-Hibernate.html

Step 2: Get and Run the Example Project

I have created a sample project that will be used for this tutorial. Clone the project from https://github.com/olavloite/spanner-jdbc-converter . It is a Maven project with a dependency on a JDBC Driver for Google Cloud Spanner. Then follow these steps:

  1. Find the file converter.properties.example in the root of the project and make a copy of this and name the copy converter.properties.

  2. Have a look at converter.properties. It contains three entries:

    1. TableConverter.convertMode=SkipExisting: This indicates that existing tables in the Cloud Spanner database should not be dropped and recreated, but skipped during conversion. If you want to drop and recreate tables, change the value to DropAndRecreate

    2. DataConverter.convertMode=SkipExisting: Same as TableConverter.convertMode, but now for the data in the table. If SkipExisting is set, no data will be converted if the destination table contains 1 or more records (no data comparison will be done). If set to DropAndRecreate, all data in the destination table will always be deleted and recreated from the source database.

    3. TableConverter.specificColumnMapping.uuid=BYTES(16): Using this configuration you can specify a specific data type mapping for specific columns. The converter contains a default list of data type conversion which can be overridden with this property. This mapping specifies that all columns with the NAME (not data type) uuid should be converted to BYTES(16). If you want a specific mapping for one column of one table, you should specify it as TableConverter.specificColumnMapping.tableName.columnName

  3. Run the Java application (the main class is Converter.class). It takes the following three arguments:

  4. The JDBC connection string of the source database. In my case this is "jdbc:postgresql://localhost:5432/databaseName?user=username&password=secret"

  5. The JDBC connection string of the destination (Cloud Spanner) database. In my case this is "jdbc:cloudspanner://localhost;Project=test-jdbc-161317;Instance=converted;Database=databaseName;PvtKeyPath=pathToKeyFile.json"

  6. The path to the converter.properties file. If you run the test application in Eclipse and keep the converter.properties file in the root of the project, the value of this argument is simply "converter.properties"

The application will scan your source database for all tables and then try to create these in your Cloud Spanner database. After creating all tables, the application will convert the data of the converted tables. Data conversion is done using two strategies, depending on the size of the table: . Tables with 1,000 or less records will be converted using a simple single threaded conversion process. All records will be converted in one transaction. . Tables with more than 1,000 records are converted by 10 parallel workers. Each worker is assigned a proportionate number of the records in the table to convert. The workers then convert the data in batches of 1,000 records, each batch being a single transaction.

The code snippet below shows the main part of the worker. Note how the worker uses standard JDBC functionality to access Cloud Spanner. Both transactions and statements are standard JDBC calls.

@Override
public void run()
{
    try (Connection source = DriverManager.getConnection(urlSource);
            Connection destination = DriverManager.getConnection(urlDestination))
    {
        log.info(name + ": " + sourceTable + ": Starting copying " + numberOfRecordsToCopy + " records");

        destination.setAutoCommit(false);
        String sql = "INSERT INTO " + destinationTable + " (" + cols.getColumnNames() + ") VALUES \n";
        sql = sql + "(" + cols.getColumnParameters() + ")";
        PreparedStatement statement = destination.prepareStatement(sql);

        int lastRecord = beginOffset + numberOfRecordsToCopy;
        int recordCount = 0;
        int currentOffset = beginOffset;
        while (true)
        {
            int limit = Math.min(batchSize, lastRecord - currentOffset);
            String select = selectFormat.replace("$COLUMNS", cols.getColumnNames());
            select = select.replace("$TABLE", sourceTable);
            select = select.replace("$PRIMARY_KEY", cols.getPrimaryKeyColumns());
            select = select.replace("$BATCH_SIZE", String.valueOf(limit));
            select = select.replace("$OFFSET", String.valueOf(currentOffset));
            try (ResultSet rs = source.createStatement().executeQuery(select))
            {
                while (rs.next())
                {
                    int index = 1;
                    for (Integer type : cols.columnTypes)
                    {
                        Object object = rs.getObject(index);
                        statement.setObject(index, object, type);
                        index++;
                    }
                    if (useJdbcBatching)
                        statement.addBatch();
                    else
                        statement.executeUpdate();
                    recordCount++;
                }
                if (useJdbcBatching)
                    statement.executeBatch();
            }
            destination.commit();
            log.info(name + ": " + sourceTable + ": Records copied so far: " + recordCount + " of "
                    + numberOfRecordsToCopy);
            currentOffset = currentOffset + batchSize;
            if (recordCount >= numberOfRecordsToCopy)
                break;
        }
    }
    catch (SQLException e)
    {
        log.severe("Error during data copy: " + e.getMessage());
        throw new RuntimeException(e);
    }
    log.info(name + ": Finished copying");
}

Also note that transactions are committed after each batch and not after copying the entire table. This is not a programming error, but a necessity as Google Cloud Spanner does not allow transactions to contain more than 20,000 mutations. A mutation of one row with five columns counts as five mutations.

Summary

The Google Cloud Spanner JDBC driver allows you to work with Cloud Spanner as it was (almost) any other JDBC compliant relational database, including DDL- and DML-statements, (prepared) JDBC statements and transactions. Cloud Spanner itself has some limitations when it comes to bulk update statements. Insert and update statements can only operate on one row at a time. The JDBC driver can also be used to develop applications using JPA / Hibernate in combination with Google Cloud Spanner. An example can be found here: https://olavloite.github.io/2017/03/11/Google-Cloud-Spanner-with-Spring-Boot-JPA-and-Hibernate.html