Description
In the .mllib side, if the storageLevel of input data is always ignored and cached twice:
@Since("0.8.0")
def run(data: RDD[Vector]): KMeansModel = {
val instances = data.map(point => (point, 1.0))
runWithWeight(instances, None)
}
private[spark] def runWithWeight( data: RDD[(Vector, Double)], instr: Option[Instrumentation]): KMeansModel = { // Compute squared norms and cache them. val norms = data.map { case (v, _) => Vectors.norm(v, 2.0) } val zippedData = data.zip(norms).map { case ((v, w), norm) => new VectorWithNorm(v, norm, w) } if (data.getStorageLevel == StorageLevel.NONE) { zippedData.persist(StorageLevel.MEMORY_AND_DISK) } val model = runAlgorithmWithWeight(zippedData, instr) zippedData.unpersist() model }