Applies the given function to each input row, appending the encoded result at the end of the row.
An optimized version of AppendColumnsExec, that can be executed on deserialized object directly.
True count is > lower Bound & less than Max , with the given probability
True count is > lower Bound & less than Max , with the given probability
Helper trait which defines methods that are shared by both LocalLimitExec and GlobalLimitExec.
Provides support in a SQLContext for caching query results and automatically using these cached results when subsequent queries are executed.
Provides support in a SQLContext for caching query results and automatically using these cached
results when subsequent queries are executed. Data is cached using byte buffers stored in an
InMemoryRelation. This relation is automatically substituted query plans that return the
sameResult
as the originally cached query.
Internal to Spark SQL.
Holds a cached logical plan and its data
Co-groups the data from left and right children, and calls the function with each group and 2 iterators containing all elements in the group from left and right side.
Co-groups the data from left and right children, and calls the function with each group and 2 iterators containing all elements in the group from left and right side. The result of this function is flattened before being output.
Iterates over GroupedIterators and returns the cogrouped data, i.e.
Iterates over GroupedIterators and returns the cogrouped data, i.e. each record is a grouping key with its associated values from all GroupedIterators. Note: we assume the output of each GroupedIterator is ordered by the grouping key.
Physical plan for returning a new RDD that has exactly numPartitions
partitions.
Physical plan for returning a new RDD that has exactly numPartitions
partitions.
Similar to coalesce defined on an RDD, this operation results in a narrow dependency, e.g.
if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead each of
the 100 new partitions will claim 10 of the current partitions. If a larger number of partitions
is requested, it will stay at the current number of partitions.
However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you see ShuffleExchange. This will add a shuffle step, but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
A Partitioner that might group together one or more partitions from the parent.
Catch exceptions in code generation of SnappyData plans and fallback to Spark plans as last resort (including non-code generated paths).
An interface for those physical operators that support codegen.
Allow invoking produce/consume calls on executor without requiring a SparkContext.
Find the chained plans that support codegen, collapse them together as WholeStageCodegen.
Take the first limit
elements and collect them to a single partition.
Take the first limit
elements and collect them to a single partition.
This operator will be used when a logical Limit
operation is the final operator in an
logical plan, which happens when the user is collecting results back to the driver.
This extends Spark's describe to add support for CHAR and VARCHAR types.
Takes the input row from child and turns it into object using the given deserializer expression.
Takes the input row from child and turns it into object using the given deserializer expression. The output of this operator is a single-field safe row containing the deserialized object.
Extended information for ExprCode variable to also hold the variable having dictionary reference and its index when dictionary encoding is being used.
Like Spark's DropTableCommand but checks for non-existent table case upfront to avoid unnecessary warning logs from Spark's DropTableCommand.
Efficient SparkPlan with code generation support to consume an RDD that has an ExpressionEncoder.
The base class for subquery that is used in SparkPlan.
A wrapper plan to immediately execute the child plan without having to do an explicit collect.
A wrapper plan to immediately execute the child plan without having to do an explicit collect. Only use for plans returning small results.
Apply all of the GroupExpressions to every input row, hence we will get multiple output rows for an input row.
Apply all of the GroupExpressions to every input row, hence we will get multiple output rows for an input row.
The group of expressions, all of the group expressions should
output the same schema specified bye the parameter output
The output Schema
Child operator
Logical plan node for scanning data from an RDD.
Physical plan node for scanning data from an RDD.
An interface for relations that are backed by files.
An interface for relations that are backed by files. When a class implements this interface,
the list of paths that it returns will be returned to a user who calls inputPaths
on any
DataFrame that queries this relation.
Physical plan node for scanning data from HadoopFsRelations.
Physical plan node for scanning data from HadoopFsRelations.
The file-based relation to scan.
Output attributes of the scan.
Output schema of the scan.
Predicates to use for partition pruning.
Data source filters to use for filtering data within partitions.
identifier for the table in the metastore.
Physical plan for Filter.
Groups the input rows together and calls the R function with each group and an iterator containing all elements in the group.
Groups the input rows together and calls the R function with each group and an iterator containing all elements in the group. The result of this function is flattened before being output.
Applies a Generator to a stream of input rows, combining the output of each into a new stream of rows.
Applies a Generator to a stream of input rows, combining the
output of each into a new stream of rows. This operation is similar to a flatMap
in functional
programming with one important additional feature, which allows the input rows to be joined with
their output.
the generator expression
when true, each output row is implicitly joined with the input tuple that produced it.
when true, each input row will be output at least once, even if the output of the
given generator
is empty. outer
has no effect when join
is false.
the qualified output attributes of the generator of this node, which constructed in analysis phase, and we can not change it, as the parent node bound with it already.
Take the first limit
elements of the child's single output partition.
Iterates over a presorted set of rows, chunking it up by the grouping expression.
Iterates over a presorted set of rows, chunking it up by the grouping expression. Each call to next will return a pair containing the current group and an iterator that will return all the elements of that group. Iterators for each group are lazily constructed by extracting rows from the input iterator. As such, full groups are never materialized by this class.
Example input:
Input: [a, 1], [b, 2], [b, 3] Grouping: x#1 InputSchema: x#1, y#2
Result:
First call to next(): ([a], Iterator([a, 1]) Second call to next(): ([b], Iterator([b, 2], [b, 3])
Note, the class does not handle the case of an empty input for simplicity of implementation. Use the factory to construct a new instance.
Implements the algorithms and data structures from "Hokusai -- Sketching Streams in Real Time", by Sergiy Matusevych, Alexander Smola, Amr Ahmed.
Implements the algorithms and data structures from "Hokusai -- Sketching Streams in Real Time", by Sergiy Matusevych, Alexander Smola, Amr Ahmed. http://www.auai.org/uai2012/papers/231.pdf
Aggregates state, so this is a mutable class.
Since we are all still learning scala, I thought I'd explain the use of implicits in this file. TimeAggregation takes an implicit constructor parameter: TimeAggregation[T]()(implicit val cmsMonoid: CMSMonoid[T]) The purpose for that is: + In Algebird, a CMSMonoid[T] is a factory for creating instances of CMS[T] + TimeAggregation needs to occasionally make new CMS instances, so it will use the factory + By making it an implicit (and in the curried param), the outer context of the TimeAggregation can create/ensure that the factory is there. + Hokusai[T] will be the "outer context" so it can handle that for TimeAggregation
TODO 1. Decide if the underlying CMS should be mutable (save memory) or functional (algebird) I'm afraid that with the functional approach, and having so many, every time we merge two CMS, we create a third and that is wasteful of memory or may take too much memory. If we go with a mutable CMS, we have to either make stream-lib's serializable, or make our own.
2. Clean up intrusion of algebird shenanigans in the code (implicit factories etc)
3. Decide on API for managing data and time. Do we increment time in a separate operation or add a time parameter to addData()?
4. Decide if we want to be mutable or functional in this datastruct. Current version is mutable.
A subquery that will check the value of child
whether is in the result of a query or not.
InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
InputAdapter is used to hide a SparkPlan from a subtree that support codegen.
This is the leaf node of a tree with WholeStageCodegen that is used to generate code that consumes an RDD iterator of InternalRow.
Allow execution of adhoc scala code on the Lead node.
Allow execution of adhoc scala code on the Lead node. Creates a new Scala interpreter for a Snappy Session. But, cached for the life of the session. Subsequent invocations of the 'interpret' command will resuse the cached interpreter. Allowing any variables (e.g. dataframe) to be preserved across invocations. State will not be preserved during Lead node failover.
Application is injected (1) The SnappySession in variable called 'session' and (2) The Options in a variable called 'intp_options'.
To return values set a variable called 'intp_return' - a Seq[Row].
Take the first limit
elements of each child partition, but do not collect or shuffle them.
Physical plan node for scanning data from a local collection.
Logical plan node for scanning data from an RDD of InternalRow.
Applies the given function to each input object.
Applies the given function to each input object. The output of its child must be a single-field row containing the input object.
This operator is kind of a safe version of ProjectExec, as its output is custom object, we need to use safe row to contain it.
Groups the input rows together and calls the function with each group and an iterator containing all elements in the group.
Groups the input rows together and calls the function with each group and an iterator containing all elements in the group. The result of this function is flattened before being output.
Applies the given function to input object iterator.
Applies the given function to input object iterator. The output of its child must be a single-field row containing the input object.
Base class for SparkPlan implementations that have only the code-generated version and use the same for non-codegenerated case.
Base class for SparkPlan implementations that have only the code-generated version and use the same for non-codegenerated case. For that case this prevents recursive calls into code generation in case it fails for some reason.
Physical version of ObjectConsumer
.
Provides helper methods for generated code to use ObjectHashSet with a generated class (having key and value columns as corresponding java type fields).
Provides helper methods for generated code to use ObjectHashSet with a generated class (having key and value columns as corresponding java type fields). This implementation saves the entire overhead of UnsafeRow conversion for both key type (like in BytesToBytesMap) and value type (like in BytesToBytesMap and VectorizedHashMapGenerator).
It has been carefully optimized to minimize memory reads/writes, with minimalistic code to fit better in CPU instruction cache. Unlike the other two maps used by HashAggregateExec, this has no limitations on the key or value column types.
The basic idea being that all of the key and value columns will be individual fields in a generated java class having corresponding java types. Storage of a column value in the map is a simple matter of assignment of incoming variable to the corresponding field of the class object and access is likewise read from that field of class . Nullability information is crammed in long bit-mask fields which are generated as many required (instead of unnecessary overhead of something like a BitSet).
Hashcode and equals methods are generated for the key column fields. Having both key and value fields in the same class object helps both in cutting down of generated code as well as cache locality and reduces at least one memory access for each row. In testing this alone has shown to improve performance by ~25% in simple group by queries. Furthermore, this class also provides for inline hashcode and equals methods so that incoming register variables in generated code can be directly used (instead of stuffing into a lookup key that will again read those fields inside). The class hashcode method is supposed to be used only internally by rehashing and that too is just a field cached in the class object that is filled in during the initial insert (from the inline hashcode).
For memory management this uses a simple approach of starting with an estimated size, then improving that estimate for future in a rehash where the rehash will also collect the actual size of current entries. If the rehash tells that no memory is available, then it will fallback to dumping the current map into MemoryManager and creating a new one with merge being done by an external sorter in a manner similar to how UnsafeFixedWidthAggregationMap handles the situation. Caller can instead decide to dump the entire map in that scenario like when using for a HashJoin.
Overall this map is 5-10X faster than UnsafeFixedWidthAggregationMap and 2-4X faster than VectorizedHashMapGenerator. It is generic enough to be used for both group by aggregation as well as for HashJoins.
Physical version of ObjectProducer
.
This rule optimizes the execution of queries that can be answered by looking only at partition-level metadata.
This rule optimizes the execution of queries that can be answered by looking only at partition-level metadata. This applies when all the columns scanned are partition columns, and the query has an aggregate operator that satisfies the following conditions: 1. aggregate expression is partition columns. e.g. SELECT col FROM tbl GROUP BY col. 2. aggregate function on partition columns with DISTINCT. e.g. SELECT col1, count(DISTINCT col2) FROM tbl GROUP BY col1. 3. aggregate function on partition columns which have same result w or w/o DISTINCT keyword. e.g. SELECT col1, Max(col2) FROM tbl GROUP BY col1.
A plan node that does nothing but lie about the output of its child.
A plan node that does nothing but lie about the output of its child. Used to spice a (hopefully structurally equivalent) tree from a different optimization sequence into an already resolved tree.
Plans scalar subqueries from that are present in the given SparkPlan.
Physical plan for Project.
The primary workflow for executing relational queries using Spark.
The primary workflow for executing relational queries using Spark. Designed to allow easy access to the intermediate phases of query execution for developers.
While this is not a public class, we should avoid changing the function names for the sake of changing them, because a lot of developers use the feature for debugging.
base RDD KryoSerializable class that will serialize minimal RDD fields
Physical plan node for scanning data from an RDD of InternalRow.
Physical plan for range (generating a range of 64 bit numbers).
Created by vivekb on 21/10/16.
Find out duplicated exchanges in the spark plan, then use the same exchange for all the references.
Physical plan node for scanning data from a relation.
An internal iterator interface which presents a more restrictive API than scala.collection.Iterator.
An internal iterator interface which presents a more restrictive API than scala.collection.Iterator.
One major departure from the Scala iterator API is the fusing of the hasNext()
and next()
calls: Scala's iterator allows users to call hasNext()
without immediately advancing the
iterator to consume the next row, whereas RowIterator combines these calls into a single
advanceNext() method.
Physical plan for sampling the dataset.
Physical plan for sampling the dataset.
Lower-bound of the sampling probability (usually 0.0)
Upper-bound of the sampling probability. The expected fraction sampled will be ub - lb.
Whether to sample with replacement.
the random seed
the SparkPlan
A subquery that will return only one row and one column.
A subquery that will return only one row and one column.
This is the physical copy of ScalarSubquery to be used inside SparkPlan.
Takes the input object from child and turns in into unsafe row using the given serializer expression.
Takes the input object from child and turns in into unsafe row using the given serializer expression. The output of its child must be a single-field row containing the input object.
Changes the name of "database" column to "schemaName" over Spark's ShowTablesCommand.
Changes the name of "database" column to "schemaName" over Spark's ShowTablesCommand. Also when hive compatibility is turned on, then this does not include the schema name or "isTemporary" to return hive compatible result.
This is a specialized version of org.apache.spark.rdd.ShuffledRDD that is optimized for shuffling rows instead of Java key-value pairs.
This is a specialized version of org.apache.spark.rdd.ShuffledRDD that is optimized for shuffling rows instead of Java key-value pairs. Note that something like this should eventually be implemented in Spark core, but that is blocked by some more general refactorings to shuffle interfaces / internals.
This RDD takes a ShuffleDependency (dependency
),
and an optional array of partition start indices as input arguments
(specifiedPartitionStartIndices
).
The dependency
has the parent RDD of this RDD, which represents the dataset before shuffle
(i.e. map output). Elements of this RDD are (partitionId, Row) pairs.
Partition ids should be in the range [0, numPartitions - 1].
dependency.partitioner
is the original partitioner used to partition
map output, and dependency.partitioner.numPartitions
is the number of pre-shuffle partitions
(i.e. the number of partitions of the map output).
When specifiedPartitionStartIndices
is defined, specifiedPartitionStartIndices.length
will be the number of post-shuffle partitions. For this case, the i
th post-shuffle
partition includes specifiedPartitionStartIndices[i]
to
specifiedPartitionStartIndices[i+1] - 1
(inclusive).
When specifiedPartitionStartIndices
is not defined, there will be
dependency.partitioner.numPartitions
post-shuffle partitions. For this case,
a post-shuffle partition is created for every pre-shuffle partition.
Alternative to Spark's CacheTableCommand that shows the plan being cached in the GUI rather than count() plan for InMemoryRelation.
Custom Sort plan.
Custom Sort plan. Currently this enables lazy sorting i.e. sort only when iterator is consumed the first time. Useful for SMJ when the left-side is empty. Useful only for non code-generated plans, since latter are already lazy (SortExec checks for "needToSort" so happens only on first processNext).
This is a TaskContext listener (for both success and failure of the task) that handles startup, commit and rollback of snapshot transactions for the task.
This is a TaskContext listener (for both success and failure of the task) that handles startup, commit and rollback of snapshot transactions for the task. It also provides a common connection that can be shared by all plans executing in the task. In conjunction with the apply methods of the companion object, it ensures that only one instance of this listener is attached in a TaskContext which is automatically removed at the end of the task execution.
This is the preferred way for all plans that need connections and/or snapshot transactions so that handling transaction start/commit for any level of plan nesting etc can be dealt with cleanly for the entire duration of the task. Additionally cases where an EXCHANGE gets inserted between two plans are also handled as expected where separate transactions and connections will be used for the two plans. Both generated code and non-generated code (including RDD.compute) should use the apply methods of the companion object to obtain an instance of the listener, then use its connection() method to obtain the connection.
One of the overloads of the apply method also allows one to send a custom connection creator instead of using the default one, but it is also assumed to return SnappyData connection only (either embedded or thin) for snapshot transactions to work. Typical usage of custom creator is for smart connector RDDs to use direct URLs without load-balance to the preferred hosts for the buckets being targeted instead of the default creator that will always use the locator.
Performs (external) sorting.
Performs (external) sorting.
when true performs a global sort of all partitions by shuffling the data first if necessary.
Method for configuring periodic spilling in unit tests. If set, will
spill every frequency
records.
The base class for physical operators.
The base class for physical operators.
The naming convention is that physical operators end with "Exec" suffix, e.g. ProjectExec.
:: DeveloperApi :: Stores information about a SQL SparkPlan.
:: DeveloperApi :: Stores information about a SQL SparkPlan.
Builder that converts an ANTLR ParseTree into a LogicalPlan/Expression/TableIdentifier.
Concrete parser for Spark SQL statements.
Converts a logical plan into zero or more SparkPlans.
Converts a logical plan into zero or more SparkPlans. This API is exposed for experimenting with the query planner and is not designed to be stable across spark releases. Developers writing libraries should instead consider using the stable APIs provided in org.apache.spark.sql.sources
Perform stratified sampling given a Query-Column-Set (QCS).
Perform stratified sampling given a Query-Column-Set (QCS). This variant can also use a fixed fraction to be sampled instead of fixed number of total samples since it is also designed to be used with streaming data.
A stratified sampling implementation that uses a fraction and initial cache size.
A stratified sampling implementation that uses a fraction and initial cache size. Latter is used as the initial reservoir size per stratum for reservoir sampling. It primarily tries to satisfy the fraction of the total data repeatedly filling up the cache as required (and expanding the cache size for bigger reservoir if required in next rounds). The fraction is attempted to be satisfied while ensuring that the selected rows are equally divided among the current stratum (for those that received any rows, that is).
Created by vivekb on 14/10/16.
Created by vivekb on 14/10/16.
A stratified sampling implementation that uses an error limit with confidence on a numerical column to sample as much as required to maintaining the expected error within the limit.
A stratified sampling implementation that uses an error limit with confidence on a numerical column to sample as much as required to maintaining the expected error within the limit. An optional initial cache size can be specified that is used as the initial reservoir size per stratum for reservoir sampling. The error limit is attempted to be honoured for each of the stratum independently and the sampling rate increased or decreased accordingly. It uses standard closed form estimation of the sampling error increasing or decreasing the sampling as required (and expanding the cache size for bigger reservoir if required in next rounds).
A simple reservoir based stratified sampler that will use the provided reservoir size for every stratum present in the incoming rows.
An extension to StratumReservoir to also track total samples seen since last time slot and short fall from previous rounds.
For each stratum (i.e.
For each stratum (i.e. a unique set of values for QCS), keep a set of meta-data including number of samples collected, total number of rows in the stratum seen so far, the QCS key, reservoir of samples etc.
Physical plan for a subquery.
Base class for bulk insert/mutation operations for column and row tables.
Take the first limit elements as defined by the sortOrder, and do projection if needed.
Take the first limit elements as defined by the sortOrder, and do projection if needed. This is logically equivalent to having a Limit operator after a SortExec operator, or having a ProjectExec operator between them. This could have been named TopK, but Spark's top operator does the opposite in ordering so we name it TakeOrdered to avoid confusion.
Extends Spark's ScalarSubquery to avoid emitting a constant in generated code rather pass as a reference object using TokenLiteral to enable generated code re-use.
Physical plan for unioning two plans, without a distinct.
Physical plan for unioning two plans, without a distinct. This is UNION ALL in SQL.
Serializer for serializing UnsafeRows during shuffle.
Serializer for serializing UnsafeRows during shuffle. Since UnsafeRows are already stored as bytes, this serializer simply copies those bytes to the underlying output stream. When deserializing a stream of rows, instances of this serializer mutate and return a single UnsafeRow instance that is backed by an on-heap byte array.
Note that this serializer implements only the Serializer methods that are used during shuffle, so certain SerializerInstance methods will throw UnsupportedOperationException.
WholeStageCodegen compile a subtree of plans that support codegen together into single Java function.
WholeStageCodegen compile a subtree of plans that support codegen together into single Java function.
Here is the call graph of to generate Java source (plan A support codegen, but plan B does not):
WholeStageCodegen Plan A FakeInput Plan B
-> execute() | doExecute() ---------> inputRDDs() -------> inputRDDs() ------> execute() | +-----------------> produce() | doProduce() -------> produce() | doProduce() | doConsume() <--------- consume() | doConsume() <-------- consume()
SparkPlan A should override doProduce() and doConsume().
doCodeGen() will create a CodeGenContext, which will hold a list of variables for input, used to generated code for BoundReference.
A global way to obtain a pooled DataSource with a given set of pool and connection properties.
A global way to obtain a pooled DataSource with a given set of pool and connection properties.
Supports Tomcat-JDBC pool and HikariCP.
Makes use of dictionary indexes for strings if any.
Makes use of dictionary indexes for strings if any. Depends only on the presence of dictionary per batch of rows (where the batch must be substantially greater than its dictionary for optimization to help).
For single column hash maps (groups or joins), it can be turned into a flat indexed array instead of a map. Create an array of class objects as stored in ObjectHashSet having the length same as dictionary so that dictionary index can be used to directly lookup the array. Then for the first lookup into the array for a dictionary index, lookup the actual ObjectHashSet for the key to find the map entry object and insert into the array. An alternative would be to pre-populate the array by making one pass through the dictionary, but it may not be efficient if many of the entries in the dictionary get filtered out by query predicates and never need to consult the created array.
For multiple column hash maps having one or more dictionary indexed columns, there is slightly more work. Instead of an array as in single column case, create a new hash map where the key columns values are substituted by dictionary index value. However, the map entry will remain identical to the original map so to save space add the additional index column to the full map itself. As new values are inserted into this hash map, lookup the full hash map to locate its map entry, then point to the same map entry in this new hash map too. Thus for subsequent look-ups the new hash map can be used completely based on integer dictionary indexes instead of strings.
An alternative approach can be to just store the hash code arrays separately for each of the dictionary columns indexed identical to dictionary. Use this to lookup the main map which will also have additional columns for dictionary indexes (that will be cleared at the start of a new batch). On first lookup for key columns where dictionary indexes are missing in the map, insert the dictionary index in those additional columns. Then use those indexes for equality comparisons instead of string.
The multiple column dictionary optimization will be useful for only string dictionary types where cost of looking up a string in hash map is substantially higher than integer lookup. The single column optimization can improve performance for other dictionary types though its efficacy for integer/long types will be reduced to avoiding hash code calculation. Given this, the additional overhead of array maintenance may not be worth the effort (and could possibly even reduce overall performance in some cases), hence this optimization is currently only for string type.
Helper functions for physical operators that work with user defined objects.
Common security related calls.
This companion class is primarily to ensure that only a single listener is attached in a TaskContext (e.g.
This companion class is primarily to ensure that only a single listener is attached in a TaskContext (e.g. delta buffer + column table scan, or putInto may try to attach twice).
Physical execution operators for join operations.
The physical execution component of Spark SQL. Note that this is a private package. All classes in catalyst are considered an internal API to Spark SQL and are subject to change between minor releases.