Details
-
Sub-task
-
Status: Resolved
-
Major
-
Resolution: Fixed
-
2.3.0
-
None
-
Python (using virtualenv):
$ python --version Python 3.6.5
Modules installed:
arrow==0.12.1 backcall==0.1.0 bleach==2.1.3 chardet==3.0.4 decorator==4.3.0 entrypoints==0.2.3 findspark==1.2.0 html5lib==1.0.1 ipdb==0.11 ipykernel==4.8.2 ipython==6.3.1 ipython-genutils==0.2.0 ipywidgets==7.2.1 jedi==0.12.0 Jinja2==2.10 jsonschema==2.6.0 jupyter==1.0.0 jupyter-client==5.2.3 jupyter-console==5.2.0 jupyter-core==4.4.0 MarkupSafe==1.0 mistune==0.8.3 nbconvert==5.3.1 nbformat==4.4.0 notebook==5.5.0 numpy==1.14.3 pandas==0.22.0 pandocfilters==1.4.2 parso==0.2.0 pbr==3.1.1 pexpect==4.5.0 pickleshare==0.7.4 progressbar2==3.37.1 prompt-toolkit==1.0.15 ptyprocess==0.5.2 pyarrow==0.9.0 Pygments==2.2.0 python-dateutil==2.7.2 python-utils==2.3.0 pytz==2018.4 pyzmq==17.0.0 qtconsole==4.3.1 Send2Trash==1.5.0 simplegeneric==0.8.1 six==1.11.0 SQLAlchemy==1.2.7 stevedore==1.28.0 terminado==0.8.1 testpath==0.3.1 tornado==5.0.2 traitlets==4.3.2 virtualenv==15.1.0 virtualenv-clone==0.2.6 virtualenvwrapper==4.7.2 wcwidth==0.1.7 webencodings==0.5.1 widgetsnbextension==3.2.1
Java:
$ java -version java version "1.8.0_171" Java(TM) SE Runtime Environment (build 1.8.0_171-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode)
System:
$ lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 16.04.4 LTS Release: 16.04 Codename: xenial
Python (using virtualenv): $ python --version Python 3.6.5 Modules installed: arrow==0.12.1 backcall==0.1.0 bleach==2.1.3 chardet==3.0.4 decorator==4.3.0 entrypoints==0.2.3 findspark==1.2.0 html5lib==1.0.1 ipdb==0.11 ipykernel==4.8.2 ipython==6.3.1 ipython-genutils==0.2.0 ipywidgets==7.2.1 jedi==0.12.0 Jinja2==2.10 jsonschema==2.6.0 jupyter==1.0.0 jupyter-client==5.2.3 jupyter-console==5.2.0 jupyter-core==4.4.0 MarkupSafe==1.0 mistune==0.8.3 nbconvert==5.3.1 nbformat==4.4.0 notebook==5.5.0 numpy==1.14.3 pandas==0.22.0 pandocfilters==1.4.2 parso==0.2.0 pbr==3.1.1 pexpect==4.5.0 pickleshare==0.7.4 progressbar2==3.37.1 prompt-toolkit==1.0.15 ptyprocess==0.5.2 pyarrow==0.9.0 Pygments==2.2.0 python-dateutil==2.7.2 python-utils==2.3.0 pytz==2018.4 pyzmq==17.0.0 qtconsole==4.3.1 Send2Trash==1.5.0 simplegeneric==0.8.1 six==1.11.0 SQLAlchemy==1.2.7 stevedore==1.28.0 terminado==0.8.1 testpath==0.3.1 tornado==5.0.2 traitlets==4.3.2 virtualenv==15.1.0 virtualenv-clone==0.2.6 virtualenvwrapper==4.7.2 wcwidth==0.1.7 webencodings==0.5.1 widgetsnbextension==3.2.1 Java: $ java -version java version "1.8.0_171" Java(TM) SE Runtime Environment (build 1.8.0_171-b11) Java HotSpot(TM) 64-Bit Server VM (build 25.171-b11, mixed mode) System: $ lsb_release -a No LSB modules are available. Distributor ID: Ubuntu Description: Ubuntu 16.04.4 LTS Release: 16.04 Codename: xenial
Description
I am working on Wikipedia page views (see task T188041 on Wikimedia's Pharicator). For simplicity, let's say that these are the data:
<lang> <page> <timestamp> <views>
For each combination of (lang, page, day(timestamp)) I need to transform the views for each hour:
00:00 -> A 01:00 -> B ...
and concatenate the number of views for that hour. So, if a page got 5 views at 00:00 and 7 views at 01:00 it would become:
A5B7
I have written a UDF called
concat_hours
However, the function is mixing the columns and I am not sure what is going on. I wrote here a minimal complete example that reproduces the issue on my system (the details of my environment are above).
#!/usr/bin/env python3 # coding: utf-8 input_data = b"""en Albert_Camus 20071210-000000 150 en Albert_Camus 20071210-010000 148 en Albert_Camus 20071210-020000 197 en Albert_Camus 20071211-200000 145 en Albert_Camus 20071211-210000 131 en Albert_Camus 20071211-220000 154 en Albert_Camus 20071211-230001 142 en Albert_Caquot 20071210-020000 1 en Albert_Caquot 20071210-020000 1 en Albert_Caquot 20071210-040001 1 en Albert_Caquot 20071211-060000 1 en Albert_Caquot 20071211-080000 1 en Albert_Caquot 20071211-150000 3 en Albert_Caquot 20071211-210000 1""" import tempfile fp = tempfile.NamedTemporaryFile() fp.write(input_data) fp.seek(0) import findspark findspark.init() import pyspark from pyspark.sql.types import StructType, StructField from pyspark.sql.types import StringType, IntegerType, TimestampType from pyspark.sql import functions sc = pyspark.SparkContext(appName="udf_example") sqlctx = pyspark.SQLContext(sc) schema = StructType([StructField("lang", StringType(), False), StructField("page", StringType(), False), StructField("timestamp", TimestampType(), False), StructField("views", IntegerType(), False)]) df = sqlctx.read.csv(fp.name, header=False, schema=schema, timestampFormat="yyyyMMdd-HHmmss", sep=' ') df.count() df.dtypes df.show() new_schema = StructType([StructField("lang", StringType(), False), StructField("page", StringType(), False), StructField("day", StringType(), False), StructField("enc", StringType(), False)]) from pyspark.sql.functions import pandas_udf, PandasUDFType import pandas as pd hour_to_letter = ['A','B','C','D','E','F','G','H','I','J','K','L','M','N','O', 'P','Q','R','S','T','U','V','W','X'] @pandas_udf(new_schema, PandasUDFType.GROUPED_MAP) def concat_hours(x): view_hours = x['hour'].tolist() view_views = x['views'].tolist() view_hours_letters = [hour_to_letter[h] for h in view_hours] encoded_views = [l + str(h) for l, h in sorted(zip(view_hours_letters,view_views))] encoded_views_string = ''.join(encoded_views) # return pd.DataFrame({'enc': x.page, 'day': x.lang, 'lang': x.day, # 'page': encoded_views_string}, index=[x.index[0]]) return pd.DataFrame({'page': x.page, 'lang': x.lang,'day': x.day, 'enc': encoded_views_string}, index=[x.index[0]]) from pyspark.sql import functions grouped_df = (df.select(['lang', 'page', functions.date_format('timestamp','yyyy-MM-dd')\ .alias('day'), functions.hour('timestamp').alias('hour'), 'views' ]) .groupby(['lang','page','day']) ) grouped_df = (grouped_df.apply(concat_hours) .dropDuplicates() ) grouped_df.show()
This is what I am getting:
$ ./udf_example.py 2018-05-20 05:13:23 WARN Utils:66 - Your hostname, inara resolves to a loopback address: 127.0.1.1; using 10.109.49.111 instead (on interface wlp2s0) 2018-05-20 05:13:23 WARN Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address 2018-05-20 05:13:23 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 2018-05-20 05:13:24 WARN Utils:66 - Service 'SparkUI' could not bind on port 4040. Attempting port 4041. +----+-------------+-------------------+-----+ |lang| page| timestamp|views| +----+-------------+-------------------+-----+ | en| Albert_Camus|2007-12-10 00:00:00| 150| | en| Albert_Camus|2007-12-10 01:00:00| 148| | en| Albert_Camus|2007-12-10 02:00:00| 197| | en| Albert_Camus|2007-12-11 20:00:00| 145| | en| Albert_Camus|2007-12-11 21:00:00| 131| | en| Albert_Camus|2007-12-11 22:00:00| 154| | en| Albert_Camus|2007-12-11 23:00:01| 142| | en|Albert_Caquot|2007-12-10 02:00:00| 1| | en|Albert_Caquot|2007-12-10 02:00:00| 1| | en|Albert_Caquot|2007-12-10 04:00:01| 1| | en|Albert_Caquot|2007-12-11 06:00:00| 1| | en|Albert_Caquot|2007-12-11 08:00:00| 1| | en|Albert_Caquot|2007-12-11 15:00:00| 3| | en|Albert_Caquot|2007-12-11 21:00:00| 1| +----+-------------+-------------------+-----+ +----------+----------------+---+-------------+ | lang| page|day| enc| +----------+----------------+---+-------------+ |2007-12-10| A150B148C197| en| Albert_Camus| |2007-12-11| G1I1P3V1| en|Albert_Caquot| |2007-12-10| C1C1E1| en|Albert_Caquot| |2007-12-11|U145V131W154X142| en| Albert_Camus| +----------+----------------+---+-------------+
Of course what I am expecting is:
+----+-------------+----------+----------------+ |lang| page| day| enc| +----+-------------+----------+----------------+ | en|Albert_Caquot|2007-12-11| G1I1P3V1| | en|Albert_Caquot|2007-12-10| C1C1E1| | en| Albert_Camus|2007-12-10| A150B148C197| | en| Albert_Camus|2007-12-11|U145V131W154X142| +----+-------------+----------+----------------+
Attachments
Issue Links
- duplicates
-
SPARK-23929 pandas_udf schema mapped by position and not by name
- Resolved
- is duplicated by
-
SPARK-23929 pandas_udf schema mapped by position and not by name
- Resolved
- relates to
-
SPARK-24444 Improve pandas_udf GROUPED_MAP docs to explain column assignment
- Resolved
- links to