Description
The twitter stream api is very powerful provides a lot of support for twitter.com side filtering of status objects. When ever possible we want to let twitter do as much work as possible for us.
currently the spark twitter api only allows you to configure a small sub set of possible filters
String{} filters =
{"tag1", tag2"}JavaDStream<Status> tweets =TwitterUtils.createStream(ssc, twitterAuth, filters);
The current implemenation does
private[streaming]
class TwitterReceiver(
twitterAuth: Authorization,
filters: Seq[String],
storageLevel: StorageLevel
) extends Receiver[Status](storageLevel) with Logging {
. . .
val query = new FilterQuery
if (filters.size > 0) { query.track(filters.mkString(",")) newTwitterStream.filter(query) } else { newTwitterStream.sample() }
...
rather than construct the FilterQuery object in TwitterReceiver.onStart(). we should be able to pass a FilterQueryObject
looks like an easy fix. See source code links bellow
kind regards
Andy
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L60
https://github.com/apache/spark/blob/master/external/twitter/src/main/scala/org/apache/spark/streaming/twitter/TwitterInputDStream.scala#L89
$$$$$$$$$ 2/2/16
attached is my java implementation for this problem. Feel free to reuse it how ever you like. In my streaming spark app main() I have the following code
FilterQuery query = config.getFilterQuery().fetch();
if (query != null) { // TODO https://issues.apache.org/jira/browse/SPARK-13065 tweets = TwitterFilterQueryUtils.createStream(ssc, twitterAuth, query); } /*else
spark native api
String[] filters = {"tag1", tag2"}
tweets = TwitterUtils.createStream(ssc, twitterAuth, filters);
causes
val query = new FilterQuery
if (filters.size > 0)
*/
Attachments
Attachments
Issue Links
- duplicates
-
SPARK-2788 Add location filtering to Twitter streams
- Resolved
- links to