Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-12944

CrossValidator doesn't accept a Pipeline as an estimator

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Done
    • 1.6.0
    • None
    • ML, PySpark
    • None
    • spark-1.6.0-bin-hadoop2.6

      Python 3.4.4 :: Anaconda 2.4.1

    Description

      Pipeline is supposed to act as an estimator which CrossValidator currently throws error.

      from pyspark.ml.evaluation import MulticlassClassificationEvaluator
      from pyspark.ml.tuning import ParamGridBuilder
      from pyspark.ml.tuning import CrossValidator
      
      # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and nb.
      tokenizer = Tokenizer(inputCol="text", outputCol="words")
      hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
      nb = NaiveBayes()
      pipeline = Pipeline(stages=[tokenizer, hashingTF, nb])
      
      
      paramGrid = ParamGridBuilder().addGrid(nb.smoothing, [0, 1]).build()
      
      cv = CrossValidator(estimator=pipeline, 
                          estimatorParamMaps=paramGrid, 
                          evaluator=MulticlassClassificationEvaluator(), 
                          numFolds=4)
      
      cvModel = cv.fit(training_df)
      

      Sample dataset can be found here:
      https://github.com/dreyco676/nlp_spark/blob/master/data.zip
      The file can be converted to a DataFrame with:

      # Load precleaned training set
      training_rdd = sc.textFile("data/clean_training.txt")
      parts_rdd = training_rdd.map(lambda l: l.split("\t"))
      # Filter bad rows out
      garantee_col_rdd = parts_rdd.filter(lambda l: len(l) == 3)
      typed_rdd = garantee_col_rdd.map(lambda p: (p[0], p[1], float(p[2])))
      # Create DataFrame
      training_df = sqlContext.createDataFrame(typed_rdd, ["id", "text", "label"])
      

      Running the pipeline throws the following stack trace:

      ---------------------------------------------------------------------------Py4JJavaError                             Traceback (most recent call last)<ipython-input-3-34e9e27acada> in <module>()
           17                     numFolds=4)
           18 
      ---> 19 cvModel = cv.fit(training_df)
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in fit(self, dataset, params)
           67                 return self.copy(params)._fit(dataset)
           68             else:
      ---> 69                 return self._fit(dataset)
           70         else:
           71             raise ValueError("Params must be either a param map or a list/tuple of param maps, "
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/tuning.py in _fit(self, dataset)
          237             train = df.filter(~condition)
          238             for j in range(numModels):
      --> 239                 model = est.fit(train, epm[j])
          240                 # TODO: duplicate evaluator to take extra params from input
          241                 metric = eva.evaluate(model.transform(validation, epm[j]))
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in fit(self, dataset, params)
           65         elif isinstance(params, dict):
           66             if params:
      ---> 67                 return self.copy(params)._fit(dataset)
           68             else:
           69                 return self._fit(dataset)
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in _fit(self, dataset)
          211                     dataset = stage.transform(dataset)
          212                 else:  # must be an Estimator
      --> 213                     model = stage.fit(dataset)
          214                     transformers.append(model)
          215                     if i < indexOfLastEstimator:
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/pipeline.py in fit(self, dataset, params)
           67                 return self.copy(params)._fit(dataset)
           68             else:
      ---> 69                 return self._fit(dataset)
           70         else:
           71             raise ValueError("Params must be either a param map or a list/tuple of param maps, "
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _fit(self, dataset)
          130 
          131     def _fit(self, dataset):
      --> 132         java_model = self._fit_java(dataset)
          133         return self._create_model(java_model)
          134 
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
          126         :return: fitted Java model
          127         """
      --> 128         self._transfer_params_to_java()
          129         return self._java_obj.fit(dataset._jdf)
          130 
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _transfer_params_to_java(self)
           80         for param in self.params:
           81             if param in paramMap:
      ---> 82                 pair = self._make_java_param_pair(param, paramMap[param])
           83                 self._java_obj.set(pair)
           84 
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/ml/wrapper.py in _make_java_param_pair(self, param, value)
           71         java_param = self._java_obj.getParam(param.name)
           72         java_value = _py2java(sc, value)
      ---> 73         return java_param.w(java_value)
           74 
           75     def _transfer_params_to_java(self):
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
          811         answer = self.gateway_client.send_command(command)
          812         return_value = get_return_value(
      --> 813             answer, self.gateway_client, self.target_id, self.name)
          814 
          815         for temp_arg in temp_args:
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/pyspark/sql/utils.py in deco(*a, **kw)
           43     def deco(*a, **kw):
           44         try:
      ---> 45             return f(*a, **kw)
           46         except py4j.protocol.Py4JJavaError as e:
           47             s = e.java_exception.toString()
      /Users/dreyco676/spark-1.6.0-bin-hadoop2.6/python/lib/py4j-0.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
          306                 raise Py4JJavaError(
          307                     "An error occurred while calling {0}{1}{2}.\n".
      --> 308                     format(target_id, ".", name), value)
          309             else:
          310                 raise Py4JError(
      Py4JJavaError: An error occurred while calling o113.w.
      : java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Double
      	at scala.runtime.BoxesRunTime.unboxToDouble(BoxesRunTime.java:119)
      	at org.apache.spark.ml.param.DoubleParam.w(params.scala:223)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
      	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:381)
      	at py4j.Gateway.invoke(Gateway.java:259)
      	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
      	at py4j.commands.CallCommand.execute(CallCommand.java:79)
      	at py4j.GatewayConnection.run(GatewayConnection.java:209)
      	at java.lang.Thread.run(Thread.java:745)
      

      Workaround is to run Transformers outside of pipeline. This ruins the purpose of Pipelines.

      Attachments

        Issue Links

          Activity

            People

              Unassigned Unassigned
              dreyco676 John Hogue
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: