Skip to content

CREATE STREAM TABLE

To Create Stream Table:

// DDL for creating a stream table
CREATE STREAM TABLE [IF NOT EXISTS] table_name
    ( column-definition [ , column-definition  ] * )
    USING kafka_stream | file_stream | twitter_stream | socket_stream
    OPTIONS (
    // multiple stream source specific options
      storagelevel 'cache-data-option',
      rowConverter 'rowconverter-class-name',
      subscribe 'comma-seperated-topic-name',
      kafkaParams 'kafka-related-params',
      consumerKey 'consumer-key',
      consumerSecret 'consumer-secret',
      accessToken 'access-token',
      accessTokenSecret 'access-token-secret',
      hostname 'socket-streaming-hostname',
      port 'socket-streaming-port-number',
      directory 'file-streaming-directory'
    )

For more information on column-definition, refer to Column Definition For Column Table.

Refer to these sections for more information on Creating Table, Creating Sample Table, Creating External Table and Creating Temporary Table.

Description

Create a stream table using a stream data source. If a table with the same name already exists in the database, an exception will be thrown.

USING <data source> Specify the streaming source to be used for this table.

storageLevel Provides different trade-offs between memory usage and CPU efficiency.

rowConverter Converts the unstructured streaming data to a set of rows.

topics Subscribed Kafka topics.

kafkaParams Kafka configuration parameters such as metadata.broker.list, bootstrap.servers etc.

directory HDFS directory to monitor for the new file.

hostname Hostname to connect to, for receiving data.

port Port to connect to, for receiving data.

consumerKey Consumer Key (API Key) for your Twitter account.

consumerSecret Consumer Secret key for your Twitter account.

accessToken Access token for your Twitter account.

accessTokenSecret Access token secret for your Twitter account.

Note

You need to register to https://apps.twitter.com/ to get the consumerKey, consumerSecret, accessToken and accessTokenSecret credentials.

Example

//create a connection
snappy> connect client 'localhost:1527';

// Initialize streaming with batchInterval of 2 seconds
snappy> streaming init 2secs;

// Create a stream table
snappy> create stream table streamTable (id long, text string, fullName string, country string,
        retweets int, hashtag  string) using twitter_stream options (consumerKey '', consumerSecret '',
        accessToken '', accessTokenSecret '', rowConverter 'org.apache.spark.sql.streaming.TweetToRowsConverter');

// Start the streaming
snappy> streaming start;

//Run ad-hoc queries on the streamTable on current batch of data
snappy> select id, text, fullName from streamTable where text like '%snappy%';

// Drop the streamTable
snappy> drop table streamTable;

// Stop the streaming
snappy> streaming stop;