diff --git a/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java b/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java index 32b5a46..cf1c484 100644 --- a/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java +++ b/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSource.java @@ -20,6 +20,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Arrays; import org.apache.flume.Context; import org.apache.flume.Event; @@ -60,6 +61,8 @@ public class TwitterSource extends AbstractSource private String accessTokenSecret; private String[] keywords; + private long[] users; + private String[] languages; /** The actual Twitter stream. It's set up to collect raw JSON data */ private TwitterStream twitterStream; @@ -86,6 +89,27 @@ public void configure(Context context) { } } + String usersString = context.getString(TwitterSourceConstants.USERS_KEY, ""); + if (usersString.trim().length() == 0) { + users = new long[0]; + } else { + String[] userStrings = usersString.split(","); + users = new long[userStrings.length]; + for (int i = 0; i < userStrings.length; i++) { + users[i] = Long.parseLong(userStrings[i].trim()); + } + } + + String languageString = context.getString(TwitterSourceConstants.LANGUAGES_KEY, ""); + if (languageString.trim().length() == 0) { + languages = new String[0]; + } else { + languages = languageString.split(","); + for (int i = 0; i < languages.length; i++) { + languages[i] = languages[i].trim(); + } + } + ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setOAuthConsumerKey(consumerKey); cb.setOAuthConsumerSecret(consumerSecret); @@ -140,13 +164,31 @@ public void onStallWarning(StallWarning warning) {} twitterStream.addListener(listener); // Set up a filter to pull out industry-relevant tweets - if (keywords.length == 0) { + if ((keywords.length == 0) && (users.length == 0)) { logger.debug("Starting up Twitter sampling..."); twitterStream.sample(); } else { logger.debug("Starting up Twitter filtering..."); - FilterQuery query = new FilterQuery().track(keywords); + FilterQuery query = new FilterQuery(); + if (keywords.length > 0) { + query = query.track(keywords); + logger.debug("Creating query with keyword tracking:"); + logger.debug(Arrays.toString(keywords)); + } + + if (users.length > 0){ + query = query.follow(users); + logger.debug("Creating query with user following:"); + logger.debug(Arrays.toString(users)); + } + + if (languages.length > 0) { + query = query.language(languages); + logger.debug("Creating query with language filter:"); + logger.debug(Arrays.toString(languages)); + } + twitterStream.filter(query); } super.start(); diff --git a/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSourceConstants.java b/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSourceConstants.java index 331a4f7..4e16640 100644 --- a/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSourceConstants.java +++ b/flume-sources/src/main/java/com/cloudera/flume/source/TwitterSourceConstants.java @@ -29,4 +29,6 @@ public class TwitterSourceConstants { public static final long DEFAULT_BATCH_SIZE = 1000L; public static final String KEYWORDS_KEY = "keywords"; + public static final String USERS_KEY = "users"; + public static final String LANGUAGES_KEY = "language"; }