Skip to content

Building SnappyData Applications using Spark API

SnappySession Usage

Create Columnar Tables using API

Other than create and drop table, rest are all based on the Spark SQL Data Source APIs.

     val props = Map("BUCKETS" -> "8")// Number of partitions to use in the SnappyStore

     case class Data(COL1: Int, COL2: Int, COL3: Int)

     val data = Seq(Seq(1, 2, 3), Seq(7, 8, 9), Seq(9, 2, 3), Seq(4, 2, 3), Seq(5, 6, 7))
     val rdd = spark.sparkContext.parallelize(data, data.length).map(s => new Data(s(0), s(1), s(2)))

     val df = snappy.createDataFrame(rdd)

     // create a column table
     snappy.dropTable("COLUMN_TABLE", ifExists = true)

     // "column" is the table format (that is row or column)
     // dataDF.schema provides the schema for table
     snappy.createTable("COLUMN_TABLE", "column", df.schema, props)
     // append dataDF into the table
     df.write.insertInto("COLUMN_TABLE")

     val results = snappy.sql("SELECT * FROM COLUMN_TABLE")
     println("contents of column table are:")
     results.foreach(r => println(r))
     Map<String, String> props1 = new HashMap<>();
     props1.put("buckets", "16");

     JavaRDD<Row> jrdd = jsc.parallelize(Arrays.asList(
      RowFactory.create(1, 2, 3),
      RowFactory.create(7, 8, 9),
      RowFactory.create(9, 2, 3),
      RowFactory.create(4, 2, 3),
      RowFactory.create(5, 6, 7)
     ));

     StructType schema = new StructType(new StructField[]{
      new StructField("col1", DataTypes.IntegerType, false, Metadata.empty()),
      new StructField("col2", DataTypes.IntegerType, false, Metadata.empty()),
      new StructField("col3", DataTypes.IntegerType, false, Metadata.empty()),
     });

     Dataset<Row> df = snappy.createDataFrame(jrdd, schema);

    // create a column table
     snappy.dropTable("COLUMN_TABLE", true);

    // "column" is the table format (that is row or column)
    // dataDF.schema provides the schema for table
     snappy.createTable("COLUMN_TABLE", "column", df.schema(), props1, false);
    // append dataDF into the table
     df.write().insertInto("COLUMN_TABLE");

     Dataset<Row>  results = snappy.sql("SELECT * FROM COLUMN_TABLE");
     System.out.println("contents of column table are:");
     for (Row r : results.select("col1", "col2", "col3"). collectAsList()) {
       System.out.println(r);
     }
    from pyspark.sql.types import *

    data = [(1,2,3),(7,8,9),(9,2,3),(4,2,3),(5,6,7)]
    rdd = sc.parallelize(data)
    schema=StructType([StructField("col1", IntegerType()),
                       StructField("col2", IntegerType()),
                       StructField("col3", IntegerType())])

    dataDF = snappy.createDataFrame(rdd, schema)

    # create a column table
    snappy.dropTable("COLUMN_TABLE", True)
    #"column" is the table format (that is row or column)
    #dataDF.schema provides the schema for table
    snappy.createTable("COLUMN_TABLE", "column", dataDF.schema, True, buckets="16")

    #append dataDF into the table
    dataDF.write.insertInto("COLUMN_TABLE")
    results1 = snappy.sql("SELECT * FROM COLUMN_TABLE")

    print("contents of column table are:")
    results1.select("col1", "col2", "col3"). show()

The optional BUCKETS attribute specifies the number of partitions or buckets to use. In SnappyStore, when data migrates between nodes (say if the cluster is expanded) a bucket is the smallest unit that can be moved around. For more details about the properties ('props1' map in above example) and createTable API refer to the documentation for row and column tables.

Create Row Tables using API, Update the Contents of Row Table

    // create a row format table called ROW_TABLE
    snappy.dropTable("ROW_TABLE", ifExists = true)
    // "row" is the table format
    // dataDF.schema provides the schema for table
    val props2 = Map.empty[String, String]
    snappy.createTable("ROW_TABLE", "row", dataDF.schema, props2)

    // append dataDF into the data
    dataDF.write.insertInto("ROW_TABLE")

    val results2 = snappy.sql("select * from ROW_TABLE")
    println("contents of row table are:")
    results2.foreach(println)

    // row tables can be mutated
    // for example update "ROW_TABLE" and set col3 to 99 where
    // criteria "col3 = 3" is true using update API
    snappy.update("ROW_TABLE", "COL3 = 3", org.apache.spark.sql.Row(99), "COL3" )

    val results3 = snappy.sql("SELECT * FROM ROW_TABLE")
    println("contents of row table are after setting col3 = 99 are:")
    results3.foreach(println)

    // update rows using sql update statement
    snappy.sql("UPDATE ROW_TABLE SET COL1 = 100 WHERE COL3 = 99")
    val results4 = snappy.sql("SELECT * FROM ROW_TABLE")
    println("contents of row table are after setting col1 = 100 are:")
    results4.foreach(println)

SnappyStreamingContext Usage

SnappyData extends Spark streaming so stream definitions can be declaratively written using SQL and these streams can be analyzed using static and dynamic SQL.

     import org.apache.spark.sql._
     import org.apache.spark.streaming._
     import scala.collection.mutable
     import org.apache.spark.rdd._
     import org.apache.spark.sql.types._
     import scala.collection.immutable.Map

     val snsc = new SnappyStreamingContext(spark.sparkContext, Duration(1))
     val schema = StructType(List(StructField("id", IntegerType) ,StructField("text", StringType)))

     case class ShowCaseSchemaStream (loc:Int, text:String)

     snsc.snappyContext.dropTable("streamingExample", ifExists = true)
     snsc.snappyContext.createTable("streamingExample", "column",  schema, Map.empty[String, String] , false)

     def rddList(start:Int, end:Int) = sc.parallelize(start to end).map(i => ShowCaseSchemaStream( i, s"Text$i"))

     val dstream = snsc.queueStream[ShowCaseSchemaStream](
                     mutable.Queue(rddList(1, 10), rddList(10, 20), rddList(20, 30)))

     val schemaDStream = snsc.createSchemaDStream(dstream )

     schemaDStream.foreachDataFrame(df => {
         df.write.format("column").
         mode(SaveMode.Append).
         options(Map.empty[String, String]).
         saveAsTable("streamingExample")    })

     snsc.start()
     snsc.sql("select count(*) from streamingExample").show
     StructType schema = new StructType(new StructField[]{
         new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
         new StructField("text", DataTypes.StringType, false, Metadata.empty())
     });

     Map<String, String> props = Collections.emptyMap();
     jsnsc.snappySession().dropTable("streamingExample", true);
     jsnsc.snappySession().createTable("streamingExample", "column", schema, props, false);

     Queue<JavaRDD<ShowCaseSchemaStream>> rddQueue = new LinkedList<>();// Define a JavaBean named ShowCaseSchemaStream
     rddQueue.add(rddList(jsc, 1, 10));
     rddQueue.add(rddList(jsc, 10, 20));
     rddQueue.add(rddList(jsc, 20, 30));

     //rddList methods is defined as
    /* private static JavaRDD<ShowCaseSchemaStream> rddList(JavaSparkContext jsc, int start, int end){
        List<ShowCaseSchemaStream> objs = new ArrayList<>();
          for(int i= start; i<=end; i++){
            objs.add(new ShowCaseSchemaStream(i, String.format("Text %d",i)));
          }
        return jsc.parallelize(objs);
     }*/

     JavaDStream<ShowCaseSchemaStream> dStream = jsnsc.queueStream(rddQueue);
     SchemaDStream schemaDStream = jsnsc.createSchemaDStream(dStream, ShowCaseSchemaStream.class);

     schemaDStream.foreachDataFrame(new VoidFunction<Dataset<Row>>() {
       @Override
       public void call(Dataset<Row> df) {
         df.write().insertInto("streamingExample");
       }
     });

     jsnsc.start();

     jsnsc.sql("select count(*) from streamingExample").show();
    from pyspark.streaming.snappy.context import SnappyStreamingContext
    from pyspark.sql.types import *

    def  rddList(start, end):
      return sc.parallelize(range(start,  end)).map(lambda i : ( i, "Text" + str(i)))

    def saveFunction(df):
       df.write.format("column").mode("append").saveAsTable("streamingExample")

    schema=StructType([StructField("loc", IntegerType()),
                       StructField("text", StringType())])

    snsc = SnappyStreamingContext(sc, 1)

    dstream = snsc.queueStream([rddList(1,10) , rddList(10,20), rddList(20,30)])

    snsc._snappycontext.dropTable("streamingExample" , True)
    snsc._snappycontext.createTable("streamingExample", "column", schema)

    schemadstream = snsc.createSchemaDStream(dstream, schema)
    schemadstream.foreachDataFrame(lambda df: saveFunction(df))
    snsc.start()
    time.sleep(1)
    snsc.sql("select count(*) from streamingExample").show()