Class/Object

org.apache.spark.sql.execution

Hokusai

Related Docs: object Hokusai | package execution

Permalink

class Hokusai[T] extends AnyRef

Implements the algorithms and data structures from "Hokusai -- Sketching Streams in Real Time", by Sergiy Matusevych, Alexander Smola, Amr Ahmed. http://www.auai.org/uai2012/papers/231.pdf

Aggregates state, so this is a mutable class.

Since we are all still learning scala, I thought I'd explain the use of implicits in this file. TimeAggregation takes an implicit constructor parameter: TimeAggregation[T]()(implicit val cmsMonoid: CMSMonoid[T]) The purpose for that is: + In Algebird, a CMSMonoid[T] is a factory for creating instances of CMS[T] + TimeAggregation needs to occasionally make new CMS instances, so it will use the factory + By making it an implicit (and in the curried param), the outer context of the TimeAggregation can create/ensure that the factory is there. + Hokusai[T] will be the "outer context" so it can handle that for TimeAggregation

TODO 1. Decide if the underlying CMS should be mutable (save memory) or functional (algebird) I'm afraid that with the functional approach, and having so many, every time we merge two CMS, we create a third and that is wasteful of memory or may take too much memory. If we go with a mutable CMS, we have to either make stream-lib's serializable, or make our own.

2. Clean up intrusion of algebird shenanigans in the code (implicit factories etc)

3. Decide on API for managing data and time. Do we increment time in a separate operation or add a time parameter to addData()?

4. Decide if we want to be mutable or functional in this datastruct. Current version is mutable.

Linear Supertypes
AnyRef, Any
Known Subclasses
Ordering
  1. Alphabetic
  2. By Inheritance
Inherited
  1. Hokusai
  2. AnyRef
  3. Any
  1. Hide All
  2. Show All
Visibility
  1. Public
  2. All

Instance Constructors

  1. new Hokusai(cmsParams: CMSParams, windowSize: Long, epoch0: Long, taList: MutableList[CountMinSketch[T]], itemList: MutableList[CountMinSketch[T]], intervalTracker: IntervalTracker, intitalInterval: Long, mBarInitializer: Option[CountMinSketch[T]])(implicit arg0: ClassTag[T])

    Permalink

Type Members

  1. class ItemAggregation extends AnyRef

    Permalink
  2. class TimeAggregation extends AnyRef

    Permalink

    Data Structures and Algorithms to maintain Time Aggregation from the paper.

    Data Structures and Algorithms to maintain Time Aggregation from the paper. Time is modeled as a non-decreasing integer starting at 0.

    The type parameter, T, is the key type. This needs to be numeric. Typical value is Long, but Short can cut down on size of resulting data structure, but increased chance of collision. BigInt is another possibility.

    From Theorem 4 in the Paper:

    At time t, the sketch Mj contains statistics for the period [t-delta, t-delta-2j] where delta = t mod 2^j

    The following shows an example of how data ages through m() as t starts at 0 and increases:

    === t = 0
      t=0  j=0 m is EMPTY
    === t = 1
      t=1  j=0 m(0)=[0, 1) # secs in m(0): 1
    === t = 2
      t=2  j=0 m(0)=[1, 2) # secs in m(0): 1
      t=2  j=1 m(1)=[0, 2) # secs in m(1): 2
    === t = 3
      t=3  j=0 m(0)=[2, 3) # secs in m(0): 1
      t=3  j=1 m(1)=[0, 2) # secs in m(1): 2
    === t = 4
      t=4  j=0 m(0)=[3, 4) # secs in m(0): 1
      t=4  j=1 m(1)=[2, 4) # secs in m(1): 2
      t=4  j=2 m(2)=[0, 4) # secs in m(2): 4
    === t = 5
      t=5  j=0 m(0)=[4, 5) # secs in m(0): 1
      t=5  j=1 m(1)=[2, 4) # secs in m(1): 2
      t=5  j=2 m(2)=[0, 4) # secs in m(2): 4
    === t = 6
      t=6  j=0 m(0)=[5, 6) # secs in m(0): 1
      t=6  j=1 m(1)=[4, 6) # secs in m(1): 2
      t=6  j=2 m(2)=[0, 4) # secs in m(2): 4
    === t = 7
      t=7  j=0 m(0)=[6, 7) # secs in m(0): 1
      t=7  j=1 m(1)=[4, 6) # secs in m(1): 2
      t=7  j=2 m(2)=[0, 4) # secs in m(2): 4
    === t = 8
      t=8  j=0 m(0)=[7, 8) # secs in m(0): 1
      t=8  j=1 m(1)=[6, 8) # secs in m(1): 2
      t=8  j=2 m(2)=[4, 8) # secs in m(2): 4
      t=8  j=3 m(3)=[0, 8) # secs in m(3): 8
    === t = 9
      t=9  j=0 m(0)=[8, 9) # secs in m(0): 1
      t=9  j=1 m(1)=[6, 8) # secs in m(1): 2
      t=9  j=2 m(2)=[4, 8) # secs in m(2): 4
      t=9  j=3 m(3)=[0, 8) # secs in m(3): 8

    numIntervals The number of sketches to keep in the exponential backoff. the last one will have a sketch of all data. Default value is 16.

  3. class TimeAndItemAggregation extends AnyRef

    Permalink

Value Members

  1. final def !=(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  2. final def ##(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  3. final def ==(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  4. def accummulate[K](data: Iterable[K], extractor: (K) ⇒ (T, Long, Long)): Unit

    Permalink
  5. def accummulate(data: Map[T, Long]): Unit

    Permalink
  6. def accummulate(data: Seq[T]): Unit

    Permalink
  7. def addEpochData(data: Map[T, Long]): Unit

    Permalink
  8. def addEpochData(data: Map[T, Long], time: Long): Unit

    Permalink
  9. def addEpochData(data: Seq[T]): Unit

    Permalink
  10. def addTimestampedData(data: ArrayBuffer[KeyFrequencyWithTimestamp[T]]): Unit

    Permalink
  11. final def asInstanceOf[T0]: T0

    Permalink
    Definition Classes
    Any
  12. def clone(): AnyRef

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  13. val cmsParams: CMSParams

    Permalink
  14. def createZeroCMS(intervalFromLast: Int): CountMinSketch[T]

    Permalink
  15. val epoch0: Long

    Permalink
  16. final def eq(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  17. def equals(arg0: Any): Boolean

    Permalink
    Definition Classes
    AnyRef → Any
  18. def finalize(): Unit

    Permalink
    Attributes
    protected[java.lang]
    Definition Classes
    AnyRef
    Annotations
    @throws( classOf[java.lang.Throwable] )
  19. final def getClass(): Class[_]

    Permalink
    Definition Classes
    AnyRef → Any
  20. def hashCode(): Int

    Permalink
    Definition Classes
    AnyRef → Any
  21. def increment(): Unit

    Permalink
  22. final def isInstanceOf[T0]: Boolean

    Permalink
    Definition Classes
    Any
  23. var mBar: CountMinSketch[T]

    Permalink
  24. val mergeCreator: (Array[CountMinSketch[T]]) ⇒ CountMinSketch[T]

    Permalink
  25. final def ne(arg0: AnyRef): Boolean

    Permalink
    Definition Classes
    AnyRef
  26. final def notify(): Unit

    Permalink
    Definition Classes
    AnyRef
  27. final def notifyAll(): Unit

    Permalink
    Definition Classes
    AnyRef
  28. def queryAtTime(epoch: Long, key: T): Option[Approximate]

    Permalink
  29. def queryBetweenTime(epochFrom: Long, epochTo: Long, key: T): Option[Approximate]

    Permalink
  30. def queryTillLastNIntervals(lastNIntervals: Int, item: T): Option[Approximate]

    Permalink
  31. def queryTillTime(epoch: Long, key: T): Option[Approximate]

    Permalink
  32. final def synchronized[T0](arg0: ⇒ T0): T0

    Permalink
    Definition Classes
    AnyRef
  33. val taPlusIa: TimeAndItemAggregation

    Permalink
  34. val timeEpoch: TimeEpoch

    Permalink
  35. def toString(): String

    Permalink
    Definition Classes
    Hokusai → AnyRef → Any
  36. final def wait(): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  37. final def wait(arg0: Long, arg1: Int): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  38. final def wait(arg0: Long): Unit

    Permalink
    Definition Classes
    AnyRef
    Annotations
    @throws( ... )
  39. val windowSize: Long

    Permalink

Inherited from AnyRef

Inherited from Any

Ungrouped