Skip to content

Getting Started with Snappy JDBC Streaming Connector

Databases like Microsoft SQL Server provide a mechanism to capture changed data. The Change Data Capture (CDC) functionality can be used to source event streams from JDBC sources to SnappyData streaming. Once the data is ingested in SnappyData, it can be used for both real-time analysis and batch analysis.

In this quickstart, we talk about how SnappyData's smart connector application can use the JDBC streaming connector to pull changed data from Microsoft SQL Server and ingest it into SnappyData tables.

Start the SnappyData Cluster

Run the sbin/snappy-start-all.sh script to start the SnappyData cluster

$ <SnappyData_home>/sbin/snappy-start-all.sh

Dependencies

SnappyData core and SnappyData jdbc streaming connector maven dependencies would be needed for your application.

<dependency>
  <groupId>io.snappydata</groupId>
  <artifactId>snappydata-jdbc-stream-connector_2.11</artifactId>
  <version>1.3.1</version>
  <scope>compile</scope>
</dependency>
<dependency>
  <groupId>io.snappydata</groupId>
  <artifactId>snappydata-core_2.11</artifactId>
  <version>1.3.1</version>
  <scope>compile</scope>
</dependency>

Also add cloudera and atlassian repositories to the set of Maven repositories to be searched:

  <repositories>
    <repository>
      <id>cloudera-repo</id>
      <name>cloudera repo</name>
      <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
    </repository>
    <repository>
      <id>atlassian-repo</id>
      <name>atlassian repo</name>
      <url>https://packages.atlassian.com/maven-3rdparty</url>
    </repository>
    ...
  </repositories>

Configuring the JDBC Source

Spark(SnappyData) Streaming polls the JDBC source and reads the data that is newer than the offset last read. The offset management is done by streaming but the offset has to be defined by the JDBC source. An offset has to be a monotonically increasing column inside the source table. Following few properties of the JDBC source have to be configured.

  • table: Table from which the data would be sourced.
  • offsetColumn: the designated column for offset. For e.g. in SQLServer CDC tables __$start_lsn should be used.
  • getNextOffset: function to get the next offset. This can be a JDBC function or query.
  • offsetToStrFunc and strToOffsetFunc - functions to convert offset to and from a string. This is needed to internally manipulate an offset.
  • offsetIncFunc - If getNextOffset is not available, a function to increment the offset

In this quickstart since we are using spark-shell, use the following command which loads the required dependencies of Snappy and SQL Server.

$ <SnappyData-product-home>/bin/spark-shell --master local[*]
    --conf snappydata.connection=localhost:1527
    --packages com.microsoft.sqlserver:mssql-jdbc:6.1.0.jre8
    --jars <SnappyData-product-home>/connectors/snappydata-jdbc-stream-connector_2.11-1.3.1-HF-1.jar
Run the following commands to create a Stream reader for JDBC source

import org.apache.spark.sql._
val snc = new SnappySession(spark.sparkContext);

// query to read the next offset from the table
// it finds the maxEvents from the last offset
val getNextOffset = "select master.dbo.fn_varbintohexstr(max(__$start_lsn)) nextLSN from " +
     " ( select __$start_lsn, sum(count(1)) over (order by __$start_lsn) runningCount from $table " +
     " where __$start_lsn > master.dbo.fn_cdc_hexstrtobin('$currentOffset') " +
     " group by __$start_lsn) x where runningCount <= $maxEvents"

// Properties needed to configure the JDBC Source
val props = Map("driver" -> "com.microsoft.sqlserver.jdbc.SQLServerDriver",
                "url" -> "jdbc:sqlserver://somehost.com:1433",
                "user" -> "xxx",
                "password" -> "xxx",
                "dbtable" -> "tengb.cdc.dbo_customer_CT", // CDC table for Customer
                "maxEvents" -> "100",
                "jdbc.offsetColumn" -> "__$start_lsn",
                "jdbc.getNextOffset" -> getNextOffset,
                "jdbc.offsetToStrFunc" -> "master.dbo.fn_varbintohexstr",
                "jdbc.strToOffsetFunc" -> "master.dbo.fn_cdc_hexstrtobin",
                "jdbc.offsetIncFunc" -> "master.dbo.fn_cdc_increment_lsn")

// Creates a stream reader
val reader = snc.readStream.format("jdbcStream").options(props).load

Configuring the Sink

The events generated by the reader i.e. JDBC source can be sent to console sink.

reader.writeStream.outputMode("append").format("console").start

For ingesting the events in a SnappyData table, one needs to implement a SnappyStoreSink to ingest the events inside SnappyData.

class Mysink extends org.apache.spark.sql.streaming.jdbc.SnappyStreamSink {
  def process(snappySession: org.apache.spark.sql.SnappySession, sinkProps: java.util.Properties,
              batchId: Long, df: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]): Unit = {
    df.write().format("row").insertInto("snappytable")
  }
}

return reader.writeStream()
        .format("snappystore")
        .option("sink", Mysink.class.getName())
        .option("checkpointLocation",
            Utils.createTempDir("/data/wrk/w/snappydata/abc-temp", "abc-spark")
            Utils.createTempDir("/data/wrk/w/snappydata/temp", "snappy-sink")
                .getCanonicalPath())
        .option("tableName", tableName)
        .start();

The above code needs to be in a jar though.

You can find the working code for a SnappyData Smart Connector Streaming application that sources data from a JDBC source here.