Twitter data analysis through Spark Streaming

Twitter data analysis through Spark Streaming

This is my first project with Spark Streaming and I decided to stream and analyze twitter data. Twitter data is a rich source of information about any topics. This data can be used in different use cases such as finding trends related to a specific keyword, focus on users on specific location,targeting specific device, targeting people with same interest etc

I found this article helped me most in figuring out how to extract, filter, and process data from twitter api.

In this project, I am uing Twitter data to do the following analysis.

  • Compare the popular hashtag words
  • Compare the number of tweets based on Country
  • Compare the popularity of device used by the user for example iphone, android etc

In the first part, we will see how to connect to Twitter Streaming API and how to get the data. In the second part, will see how to structure the data for analysis and in the last pat, will see how to Visualize our data.

Getting Data stream from Twitter

In order to access Twitter Streaming tweets following steps need to be completed.

  • We need to create a twitter account and register on TwitterApps.
  • Go to your newly created app and generate Keys and Access Tokens.
  • Copy your Twitter app credentials, API key, API secret, Access token and Access token secret.

Part 1. Connecting to Twitter Stream to get tweets

This simple program will get the tweets from Twitter API using Python and passes them to the Spark Streaming instance.

Let’s create a file called connect_app.py with the following code.

Import the libraries that we’ll use. Connect to twitter and start streaming the tweets. Twitter API gives us a lot of data about each tweet in a json data structure. We are interested in several parts of tweet that can be accessed with the key [“put the key”]. we do capture the tweet = json.loads(line) statement inside a try/except clause to fecilitate the script to skip over a line of bad data and continue processing the remaining tweets.

In [ ]:
import socket
import sys
import requests
import requests_oauthlib
import json
import bleach
from bs4 import BeautifulSoup


# Include your Twitter account details
ACCESS_TOKEN = 'ACCESS_TOKEN'
ACCESS_SECRET = 'ACCESS_SECRET'
CONSUMER_KEY = 'CONSUMER_KEY'
CONSUMER_SECRET = 'CONSUMER_SECRET'
my_auth = requests_oauthlib.OAuth1(CONSUMER_KEY, CONSUMER_SECRET,ACCESS_TOKEN, ACCESS_SECRET)


def get_tweets():
    url = 'https://stream.twitter.com/1.1/statuses/filter.json'	
    query_data = [('language', 'en'), ('locations', '-130,-20,100,50'),('track','iphone')]
    query_url = url + '?' + '&'.join([str(t[0]) + '=' + str(t[1]) for t in query_data])
    response = requests.get(query_url, auth=my_auth, stream=True)
    print(query_url, response)
    return response




def send_tweets_to_spark(http_resp, tcp_connection):
    for line in http_resp.iter_lines():
            try:
                full_tweet = json.loads(line)
                tweet_text = full_tweet['text']
                print("Tweet Text: " + tweet_text)
                print ("------------------------------------------")
                tweet_screen_name = "SN:"+full_tweet['user']['screen_name']
                print("SCREEN NAME IS : " + tweet_screen_name)
                print ("------------------------------------------")
                source = full_tweet['source']
                soup = BeautifulSoup(source)
                for anchor in soup.find_all('a'):         
                   print("Tweet Source: " + anchor.text)        
                tweet_source = anchor.text
                source_device = tweet_source.replace(" ", "")
                device = "TS"+source_device.replace("Twitter", "") 
                print("SOURCE IS : " + device)
                print ("------------------------------------------")
                tweet_country_code = "CC"+full_tweet['place']['country_code']
                print("COUNTRY CODE IS : " + tweet_country_code)
                print ("------------------------------------------")
                tcp_connection.send(tweet_text +' '+ tweet_country_code + ' '+ tweet_screen_name +' '+ device +'\n')
        
            except:
                continue
   

TCP_IP = 'localhost'
TCP_PORT = 'port#'
conn = None
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

s.bind((TCP_IP, TCP_PORT))
s.listen(1)

print("Waiting for TCP connection...")
conn, addr = s.accept()

print("Connected... Starting getting tweets.")
resp = get_tweets()
send_tweets_to_spark(resp, conn)

Part 2. Building Spark Streaming Application

This program will do real-time processing for the tweets that are streamed and help us to do the analysis.

In this program we are going to create Streaming Context ssc from sc sparkContext with a batch interval ten seconds that will do the transformation on all streams received every ten seconds. We defined a checkpoint here in order to allow periodic RDD checkpointing; this is mandatory to be used in our app, as we’ll use stateful transformations.Then we define our main DStream dataStream that will connect to the socket server we created before on port and read the tweets from that port. Each record in the DStream will be a tweet.

Let’s create a file called stream_app.py with the following code.

In [ ]:
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests


def aggregate_tags_count(new_values, total_sum):
    return sum(new_values) + (total_sum or 0)

def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
            globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']



def process_rdd(time, rdd):
    print("----------- %s -----------" % str(time))
    try:
       # Get spark sql singleton context from the current context
        sql_context = get_sql_context_instance(rdd.context)
        print("Get spark sql singleton context from the current context ----------- %s -----------" % str(time))
    
        # convert the RDD to Row RDD
        row_rdd = rdd.map(lambda w: Row(word=w[0], word_count=w[1]))
   
        # create a DF from the Row RDD
        hashtags_df = sql_context.createDataFrame(row_rdd)
        
        # Register the dataframe as table
        hashtags_df.registerTempTable("hashtags")
   
        # get the top 10 hashtags from the table using SQL and print them
        hashtag_counts_df = sql_context.sql("select word , word_count from hashtags where word like '#%'order by word_count desc limit 10")
        hashtag_counts_df.show()
        hashtag_counts_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header", "true").csv("/Users/girishdurgaiah/hashtag_file.csv") 
   
        country_counts_df = sql_context.sql("select word as country_code, word_count as tweet_count from hashtags where word like 'CC%'order by word_count desc limit 10")
        country_counts_df.show()
        country_counts_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header", "true").csv("/Users/girishdurgaiah/country_file.csv")
   
        device_df = sql_context.sql("select word as device, word_count as device_count from hashtags where word like 'TS%'order by word_count desc limit 10")
        device_df.show()
        device_df.coalesce(1).write.format('com.databricks.spark.csv').mode('overwrite').option("header", "true").csv("/Users/girishdurgaiah/device_file.csv")
           
    except:
        pass



# create spark configuration
conf = SparkConf()
conf.setAppName("TwitterStreamApp")

# create spark context with the above configuration
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# create the Streaming Context from the above spark context with interval size 2 seconds
ssc = StreamingContext(sc, 2)

# setting a checkpoint to allow RDD recovery
ssc.checkpoint("checkpoint_TwitterApp")

# read data from port 9009
dataStream = ssc.socketTextStream("localhost",'port#')


# split each tweet into words
words = dataStream.flatMap(lambda line: line.split(" "))
              
# filter the words to get only hashtags, then map each hashtag to be a pair of (hashtag,1)
hashtags = words.map(lambda x: (x, 1)) 

# adding the count of each hashtag to its last count
tags_totals = hashtags.updateStateByKey(aggregate_tags_count)

# do processing for each RDD generated in each interval
tags_totals.foreachRDD(process_rdd)


# start the streaming computation
ssc.start()

# wait for the streaming to finish
ssc.awaitTermination()

Part 3. Visualization of the Twitter Data

This program will display the graphs for the varous analysis we did with the twitter streaming data.

Let’s create a file called visual_app.py with the following code.

In [4]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

df = pd.read_csv('/hashtag.csv')

df.describe()
Out[4]:
word_count
count 10.000000
mean 169.600000
std 131.658818
min 56.000000
25% 62.750000
50% 129.500000
75% 229.750000
max 426.000000
In [5]:
import matplotlib.pyplot as plt; plt.rcdefaults()
import numpy as np
import matplotlib.pyplot as plt
 
objects = df.word
y_pos = np.arange(len(objects))
count = df.word_count
 
plt.barh(y_pos, count, align='center', alpha=0.5, color="navy")
plt.yticks(y_pos, objects)
plt.xlabel('Count')
plt.title('Top Ten Hashtag Tweets')
 
plt.show()
In [6]:
df = pd.read_csv('/country.csv')

df.describe()
Out[6]:
tweet_count
count 10.000000
mean 552.100000
std 1494.362774
min 33.000000
25% 44.250000
50% 61.000000
75% 140.750000
max 4802.000000
In [7]:
objects = df.country_code
y_pos = np.arange(len(objects))
count = df.tweet_count
 
plt.barh(y_pos, count, align='center', alpha=0.5, color="brown")
plt.yticks(y_pos, objects)
plt.xlabel('Count')
plt.title('Tweets from Top 10 countries')
 
plt.show()
In [8]:
df = pd.read_csv('/device.csv')

df.describe()
Out[8]:
device_count
count 10.000000
mean 581.400000
std 938.354849
min 11.000000
25% 41.000000
50% 148.500000
75% 740.000000
max 3058.000000
In [9]:
objects = df.device
y_pos = np.arange(len(objects))
count = df.device_count
 
plt.barh(y_pos, count, align='center', alpha=0.5, color="yellow")
plt.yticks(y_pos, objects)
plt.xlabel('Count')
plt.title('Tweets originating from devices')
 
plt.show()

During the time I spent in learning and completing this project i learnt Twitter has rich search functionality which has many nuances is one of its best features. With a better understanding and little practice we can turn Twitter search into a powerful tool for connecting people around the world. For businesses it offers variety of targeting options like gender,language,interest,geography etc.

In [ ]: