Details
-
Bug
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.0.0
-
- Spark @c553976
$ java -version java version "1.8.0_20" Java(TM) SE Runtime Environment (build 1.8.0_20-b26) Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode)
$ uname -a Darwin mymac.local 14.5.0 Darwin Kernel Version 14.5.0: Tue Sep 1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 x86_64
Spark @c553976 $ java -version java version "1.8.0_20" Java(TM) SE Runtime Environment (build 1.8.0_20-b26) Java HotSpot(TM) 64-Bit Server VM (build 25.20-b23, mixed mode) $ uname -a Darwin mymac.local 14.5.0 Darwin Kernel Version 14.5.0: Tue Sep 1 21:23:09 PDT 2015; root:xnu-2782.50.1~1/RELEASE_X86_64 x86_64
Description
Given the following program :
// A simple Order <-> Items model case class Order ( orderid: Int, customer: String ) case class Item ( orderid: Int, itemid: Int, amount: Float ) import spark.implicits._ val sqlContext = new org.apache.spark.sql.SQLContext(sc) // input data comes from CSV files val INPUT_DIR: String ="" val FILE_EXTENSION: String=".tbl" val ORDERS_FILE : String = INPUT_DIR + "orders" + FILE_EXTENSION // Items are added as a stream so input is a directory, not a file val ITEM_DIR : String = INPUT_DIR + "item" import org.apache.spark.sql.types._ import org.apache.spark.sql._ val ItemSchema = StructType ( StructField("orderid" , IntegerType, false) :: StructField("itemid" , IntegerType, true) :: StructField("amount" , FloatType, true) :: Nil) val OrderSchema = StructType ( StructField("orderid" , IntegerType, false) :: StructField("customer" , StringType, true) :: Nil) val csvOptions = Map("sep" -> "|") val orders = sqlContext.read.format("csv").schema(OrderSchema).options(csvOptions).load(ORDERS_FILE).as[Order] orders.registerTempTable("orders") val itemsStream = sqlContext.readStream.format("csv").schema(ItemSchema).options(csvOptions).csv(ITEM_DIR).as[Item] itemsStream.registerTempTable("itemsStream") val sum_of_items_per_customer_streamed = sqlContext.sql("SELECT customer, sum(amount) from orders d, itemsStream s where d.orderid = s.orderid group by customer") // print each computed value val outwriter = new ForeachWriter[Row] { def open(partitionId: Long, version: Long): Boolean = true def process(value: Row): Unit = if (value != null) print(value) def close(errorOrNull: Throwable): Unit = if (errorOrNull != null) print(errorOrNull) } sum_of_items_per_customer_streamed.writeStream.outputMode("complete").foreach(outwriter).start
and the following data sets:
- orders.tbl
orderid customer 1 foo 2 bar 3 foo
- items1.tbl
orderid itemid amount 1 1 1.0 1 2 1.0 1 3 1.0 2 1 1.0 2 2 1.0 3 1 1.0
- items2.tbl
orderid itemid amount 1 4 1.0 2 5 1.0
When I do the following actions:
- start bin/spark-shell
- :load complete-bug.scala
- cp items1.tbl item/
- cp items2.tbl item/
Then the following results are printed in console:
[bar,2.0][foo,4.0] [bar,1.0][foo,1.0]
I would expect the following:
[bar,2.0][foo,4.0] [bar,3.0][foo,5.0]
Attachments
Issue Links
- is part of
-
SPARK-8360 Structured Streaming (aka Streaming DataFrames)
- Resolved
- relates to
-
SPARK-16264 Allow the user to use operators on the received DataFrame
- Closed
- links to