How to Perform a Colocated Join¶
When two tables are partitioned on columns and colocated, it forces partitions having the same values for those columns in both tables to be located on the same SnappyData server. Colocating the data of two tables based on a partitioning column's value is a best practice if you frequently perform queries on those tables that join on that column. When colocated tables are joined on the partitioning columns, the join happens locally on the node where data is present, without the need of shuffling the data.
Code Example: ORDERS table is colocated with CUSTOMER table
A partitioned table can be colocated with another partitioned table by using the "COLOCATE_WITH" attribute in the table options.
For example, in the code snippet below, the ORDERS table is colocated with the CUSTOMER table. The complete source for this example can be found in the file ColocatedJoinExample.scala
Get a SnappySession:
val spark: SparkSession = SparkSession
.builder
.appName("ColocatedJoinExample")
.master("local[*]")
.getOrCreate
val snSession = new SnappySession(spark.sparkContext)
Create Table Customer:
snSession.sql("CREATE TABLE CUSTOMER ( " +
"C_CUSTKEY INTEGER NOT NULL," +
"C_NAME VARCHAR(25) NOT NULL," +
"C_ADDRESS VARCHAR(40) NOT NULL," +
"C_NATIONKEY INTEGER NOT NULL," +
"C_PHONE VARCHAR(15) NOT NULL," +
"C_ACCTBAL DECIMAL(15,2) NOT NULL," +
"C_MKTSEGMENT VARCHAR(10) NOT NULL," +
"C_COMMENT VARCHAR(117) NOT NULL)" +
"USING COLUMN OPTIONS (PARTITION_BY 'C_CUSTKEY')")
snSession.sql("CREATE TABLE ORDERS ( " +
"O_ORDERKEY INTEGER NOT NULL," +
"O_CUSTKEY INTEGER NOT NULL," +
"O_ORDERSTATUS CHAR(1) NOT NULL," +
"O_TOTALPRICE DECIMAL(15,2) NOT NULL," +
"O_ORDERDATE DATE NOT NULL," +
"O_ORDERPRIORITY CHAR(15) NOT NULL," +
"O_CLERK CHAR(15) NOT NULL," +
"O_SHIPPRIORITY INTEGER NOT NULL," +
"O_COMMENT VARCHAR(79) NOT NULL) " +
"USING COLUMN OPTIONS (PARTITION_BY 'O_CUSTKEY', " +
"COLOCATE_WITH 'CUSTOMER' )")
Perform a Colocate join: