Uploaded image for project: 'Spark'
  1. Spark
  2. SPARK-19065

dropDuplicates uses the same expression id for Alias and Attribute and breaks attribute replacement

Details

    • Bug
    • Status: Resolved
    • Major
    • Resolution: Fixed
    • 2.1.0
    • 2.1.1, 2.2.0
    • Structured Streaming
    • None

    Description

      Right now if you use .dropDuplicates in a stream you get an exception because attribute replacement is broken

      Here is an example:

      org.apache.spark.sql.AnalysisException: resolved attribute(s) accountName#34351,eventSource#34331,resources#34339,eventType#34333,readOnly#34335,date#34350,errorCode#34327,errorMessage#34328,userAgent#34344,eventVersion#34334,eventTime#34332,recipientAccountId#34336,sharedEventID#34341,timing#34349,apiVersion#34325,additionalEventData#34324,requestParameters#34338,sourceIPAddress#34342,serviceEventDetails#34343,timestamp#34323,awsRegion#34326,eventName#34330,responseElements#34340,filename#34347,requestID#34337,vpcEndpointId#34346,line#34348,userIdentity#34345 missing from requestID#34119,eventSource#34113,serviceEventDetails#34125,eventVersion#34116,userIdentity#34127,requestParameters#34120,accountName#34133,apiVersion#34107,eventTime#34114,additionalEventData#34106,line#34130,readOnly#34117,sourceIPAddress#34124,eventID#34329,errorCode#34109,resources#34121,timing#34131,userAgent#34126,eventType#34115,recipientAccountId#34118,errorMessage#34110,vpcEndpointId#34128,sharedEventID#34123,filename#34129,awsRegion#34108,responseElements#34122,date#34132,timestamp#34105,eventName#34112 in operator !Project [timestamp#34323, additionalEventData#34324, apiVersion#34325, awsRegion#34326, errorCode#34327, errorMessage#34328, eventID#34329, eventName#34330, eventSource#34331, eventTime#34332, eventType#34333, eventVersion#34334, readOnly#34335, recipientAccountId#34336, requestID#34337, requestParameters#34338, resources#34339, responseElements#34340, sharedEventID#34341, sourceIPAddress#34342, serviceEventDetails#34343, userAgent#34344, userIdentity#34345, vpcEndpointId#34346, ... 5 more fields];;
      
      !Project [timestamp#34323, additionalEventData#34324, apiVersion#34325, awsRegion#34326, errorCode#34327, errorMessage#34328, eventID#34329, eventName#34330, eventSource#34331, eventTime#34332, eventType#34333, eventVersion#34334, readOnly#34335, recipientAccountId#34336, requestID#34337, requestParameters#34338, resources#34339, responseElements#34340, sharedEventID#34341, sourceIPAddress#34342, serviceEventDetails#34343, userAgent#34344, userIdentity#34345, vpcEndpointId#34346, ... 5 more fields]
      +- Aggregate [eventID#34329], [first(timestamp#34323, false) AS timestamp#34105, first(additionalEventData#34324, false) AS additionalEventData#34106, first(apiVersion#34325, false) AS apiVersion#34107, first(awsRegion#34326, false) AS awsRegion#34108, first(errorCode#34327, false) AS errorCode#34109, first(errorMessage#34328, false) AS errorMessage#34110, eventID#34329, first(eventName#34330, false) AS eventName#34112, first(eventSource#34331, false) AS eventSource#34113, first(eventTime#34332, false) AS eventTime#34114, first(eventType#34333, false) AS eventType#34115, first(eventVersion#34334, false) AS eventVersion#34116, first(readOnly#34335, false) AS readOnly#34117, first(recipientAccountId#34336, false) AS recipientAccountId#34118, first(requestID#34337, false) AS requestID#34119, first(requestParameters#34338, false) AS requestParameters#34120, first(resources#34339, false) AS resources#34121, first(responseElements#34340, false) AS responseElements#34122, first(sharedEventID#34341, false) AS sharedEventID#34123, first(sourceIPAddress#34342, false) AS sourceIPAddress#34124, first(serviceEventDetails#34343, false) AS serviceEventDetails#34125, first(userAgent#34344, false) AS userAgent#34126, first(userIdentity#34345, false) AS userIdentity#34127, first(vpcEndpointId#34346, false) AS vpcEndpointId#34128, ... 5 more fields]
         +- Project [timestamp#34323, additionalEventData#34324, apiVersion#34325, awsRegion#34326, errorCode#34327, errorMessage#34328, eventID#34329, eventName#34330, eventSource#34331, eventTime#34332, eventType#34333, eventVersion#34334, readOnly#34335, recipientAccountId#34336, requestID#34337, requestParameters#34338, resources#34339, responseElements#34340, sharedEventID#34341, sourceIPAddress#34342, serviceEventDetails#34343, userAgent#34344, userIdentity#34345, vpcEndpointId#34346, ... 5 more fields]
            +- Relation[timestamp#34323,additionalEventData#34324,apiVersion#34325,awsRegion#34326,errorCode#34327,errorMessage#34328,eventID#34329,eventName#34330,eventSource#34331,eventTime#34332,eventType#34333,eventVersion#34334,readOnly#34335,recipientAccountId#34336,requestID#34337,requestParameters#34338,resources#34339,responseElements#34340,sharedEventID#34341,sourceIPAddress#34342,serviceEventDetails#34343,userAgent#34344,userIdentity#34345,vpcEndpointId#34346,... 5 more fields] parquet
      
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
      	at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
      	at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
      	at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)
      	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:48)
      	at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:68)
      	at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:67)
      	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan$lzycompute(IncrementalExecution.scala:60)
      	at org.apache.spark.sql.execution.streaming.IncrementalExecution.optimizedPlan(IncrementalExecution.scala:60)
      	at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:79)
      	at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:75)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
      	at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:516)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:508)
      	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:265)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:45)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:508)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:267)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:256)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:256)
      	at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:265)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:45)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:256)
      	at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
      	at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:251)
      	at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:186)
      

      Attachments

        Issue Links

          Activity

            apachespark Apache Spark added a comment -

            User 'zsxwing' has created a pull request for this issue:
            https://github.com/apache/spark/pull/16564

            apachespark Apache Spark added a comment - User 'zsxwing' has created a pull request for this issue: https://github.com/apache/spark/pull/16564
            cloud_fan Wenchen Fan added a comment -

            Issue resolved by pull request 16564
            https://github.com/apache/spark/pull/16564

            cloud_fan Wenchen Fan added a comment - Issue resolved by pull request 16564 https://github.com/apache/spark/pull/16564

            People

              zsxwing Shixiong Zhu
              marmbrus Michael Armbrust
              Votes:
              0 Vote for this issue
              Watchers:
              4 Start watching this issue

              Dates

                Created:
                Updated:
                Resolved: