Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19383

Spark Sql Fails with Cassandra 3.6 and later PER PARTITION LIMIT option

    XMLWordPrintableJSON

Details

    • Bug
    • Status: Closed
    • Minor
    • Resolution: Not A Problem
    • 2.0.2
    • None
    • SQL

    Description

      Attempting to use version 2.0.0-M3 of the datastax/spark-cassandra-connector to select the most recent version of each partition key using the Cassandra 3.6 and later PER PARTITION LIMIT option fails. I've tried using all the Cassandra Java RDD's and Spark Sql with and without partition key equality constraints. All attempts have failed due to syntax errors and/or start/end bound restriction errors.

      The BrentDorsey/cassandra-spark-job repo contains working code that demonstrates the error. Clone the repo, create the keyspace and table locally and supply connection information then run main.

      Spark Dataset .where & Spark Sql Errors:

      errors
      ERROR [2017-01-27 06:35:19,919] (main) org.per.partition.limit.test.spark.job.Main: getSparkDatasetPerPartitionLimitTestWithTokenGreaterThan failed.
      org.apache.spark.sql.catalyst.parser.ParseException: 
      mismatched input 'PARTITION' expecting <EOF>(line 1, pos 67)
      
      == SQL ==
      TOKEN(item_uuid) > TOKEN(6616b548-4fd1-4661-a938-0af3c77357f7) PER PARTITION LIMIT 1
      -------------------------------------------------------------------^^^
      
      	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
      	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
      	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
      	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseExpression(ParseDriver.scala:43)
      	at org.apache.spark.sql.Dataset.where(Dataset.scala:1153)
      	at org.per.partition.limit.test.spark.job.Main.getSparkDatasetPerPartitionLimitTestWithTokenGreaterThan(Main.java:349)
      	at org.per.partition.limit.test.spark.job.Main.run(Main.java:128)
      	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
      	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
      	at org.per.partition.limit.test.spark.job.Main.main(Main.java:72)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
      ERROR [2017-01-27 06:35:20,238] (main) org.per.partition.limit.test.spark.job.Main: getSparkSqlDatasetPerPartitionLimitTest failed.
      org.apache.spark.sql.catalyst.parser.ParseException: 
      extraneous input ''' expecting {'(', 'SELECT', 'FROM', 'ADD', 'AS', 'ALL', 'DISTINCT', 'WHERE', 'GROUP', 'BY', 'GROUPING', 'SETS', 'CUBE', 'ROLLUP', 'ORDER', 'HAVING', 'LIMIT', 'AT', 'OR', 'AND', 'IN', NOT, 'NO', 'EXISTS', 'BETWEEN', 'LIKE', RLIKE, 'IS', 'NULL', 'TRUE', 'FALSE', 'NULLS', 'ASC', 'DESC', 'FOR', 'INTERVAL', 'CASE', 'WHEN', 'THEN', 'ELSE', 'END', 'JOIN', 'CROSS', 'OUTER', 'INNER', 'LEFT', 'SEMI', 'RIGHT', 'FULL', 'NATURAL', 'ON', 'LATERAL', 'WINDOW', 'OVER', 'PARTITION', 'RANGE', 'ROWS', 'UNBOUNDED', 'PRECEDING', 'FOLLOWING', 'CURRENT', 'ROW', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'VIEW', 'REPLACE', 'INSERT', 'DELETE', 'INTO', 'DESCRIBE', 'EXPLAIN', 'FORMAT', 'LOGICAL', 'CODEGEN', 'CAST', 'SHOW', 'TABLES', 'COLUMNS', 'COLUMN', 'USE', 'PARTITIONS', 'FUNCTIONS', 'DROP', 'UNION', 'EXCEPT', 'INTERSECT', 'TO', 'TABLESAMPLE', 'STRATIFY', 'ALTER', 'RENAME', 'ARRAY', 'MAP', 'STRUCT', 'COMMENT', 'SET', 'RESET', 'DATA', 'START', 'TRANSACTION', 'COMMIT', 'ROLLBACK', 'MACRO', 'IF', '+', '-', '*', 'DIV', '~', 'PERCENT', 'BUCKET', 'OUT', 'OF', 'SORT', 'CLUSTER', 'DISTRIBUTE', 'OVERWRITE', 'TRANSFORM', 'REDUCE', 'USING', 'SERDE', 'SERDEPROPERTIES', 'RECORDREADER', 'RECORDWRITER', 'DELIMITED', 'FIELDS', 'TERMINATED', 'COLLECTION', 'ITEMS', 'KEYS', 'ESCAPED', 'LINES', 'SEPARATED', 'FUNCTION', 'EXTENDED', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'LAZY', 'FORMATTED', TEMPORARY, 'OPTIONS', 'UNSET', 'TBLPROPERTIES', 'DBPROPERTIES', 'BUCKETS', 'SKEWED', 'STORED', 'DIRECTORIES', 'LOCATION', 'EXCHANGE', 'ARCHIVE', 'UNARCHIVE', 'FILEFORMAT', 'TOUCH', 'COMPACT', 'CONCATENATE', 'CHANGE', 'CASCADE', 'RESTRICT', 'CLUSTERED', 'SORTED', 'PURGE', 'INPUTFORMAT', 'OUTPUTFORMAT', DATABASE, DATABASES, 'DFS', 'TRUNCATE', 'ANALYZE', 'COMPUTE', 'LIST', 'STATISTICS', 'PARTITIONED', 'EXTERNAL', 'DEFINED', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'REPAIR', 'RECOVER', 'EXPORT', 'IMPORT', 'LOAD', 'ROLE', 'ROLES', 'COMPACTIONS', 'PRINCIPALS', 'TRANSACTIONS', 'INDEX', 'INDEXES', 'LOCKS', 'OPTION', 'ANTI', 'LOCAL', 'INPATH', 'CURRENT_DATE', 'CURRENT_TIMESTAMP', STRING, BIGINT_LITERAL, SMALLINT_LITERAL, TINYINT_LITERAL, INTEGER_VALUE, DECIMAL_VALUE, SCIENTIFIC_DECIMAL_VALUE, DOUBLE_LITERAL, BIGDECIMAL_LITERAL, IDENTIFIER, BACKQUOTED_IDENTIFIER}(line 1, pos 36)
      
      == SQL ==
      SELECT item_uuid, time_series_date, 'item_uri FROM perPartitionLimitTests PER PARTITION LIMIT 1
      ------------------------------------^^^
      
      	at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:197)
      	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:99)
      	at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:45)
      	at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parsePlan(ParseDriver.scala:53)
      	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:582)
      	at org.per.partition.limit.test.spark.job.Main.getSparkSqlDatasetPerPartitionLimitTest(Main.java:367)
      	at org.per.partition.limit.test.spark.job.Main.run(Main.java:129)
      	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
      	at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
      	at org.per.partition.limit.test.spark.job.Main.main(Main.java:72)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:497)
      	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
      

      Attachments

        Activity

          People

            Unassigned Unassigned
            bdorsey Brent Dorsey
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

            Dates

              Created:
              Updated:
              Resolved: