Fledgling ETL Data Engineering Project

Justifications

First of all, why bother with this project? Secondly, why write an article about it?

Addressing point one, the reason I worked on this this project in the first place is to take a step toward developing my data engineering skillset . I have no background in data engineering and the field is very new to me — you can read more about my story here. Learning theory is great, but it’s better when we can also accumulate first hand experience, and ingrain the lessons of the theory through action. This project really isn’t anything special — seriously, but it served as an opportunity to dip my toe into data engineering, with minimal prerequisites.

Why write this article? Several reasons. Today, the more exposure you can bring to yourself, the better. We’re living in a time when attention is extremely valuable. By putting myself out there, I am exposing myself to favourable outcomes, i.e. professionals might be able to advise me, I can provide a snapshot of my current skills when I ask community members for help, potential future employers can see where I’ve come from and where I am currently, I can look back at my own journey, and finally it could help someone else who is where I was several weeks ago. Clearly I am no expert yet, but I’d like to believe that at least someone might benefit from this article!

The project

So, with all of that out of the way, what was the project?

Like all developers, my first step when investigating how to get started was to hit google. The first query was ‘beginner data engineering project’. So I looked through some reddit threads, skimmed over some youtube videos, until finally I found something that looked right for me. This video

So I followed along, gained a little insight, and decided to adapt it and make my own project. It’s useful to build follow along projects — but you don’t really learn from that. The right way (IMO) to build a project is to try and take what you’ve learned, and re-use those skills to build something new- you must strive toward independent thought. I made up a fictional scenario to make the project feel more real.

Scenario

A company that provides ML training to beginners requires a real world data set to teach basic concepts and perform some learning exercises with.

They need a dataset which allows students to answer the question: Does twitter attention influence price outcomes of a given stock ticker?

To answer this, ML devs may wish to take a variety of approaches: they could want to apply some NLP to the tweets for sentiment, or they might look at ‘unsigned’ sentiment (any tweet- disregarding it’s sentiment), to see if attention of any kind has an influence.

Solution

To satisfy the requirements of the mock scenario, data collection at least 2 data points was necessary: a stock ticker, twitter activity.

Twitter: Twitter provides 2 means of tweet extraction: A GET endpoint in which tweets are pulled, with a maximum of 100 tweets per request. Alternatively, a streaming option in which all tweets are pushed to the client. Given that the goal was to produce a true beginner project, the GET endpoint was selected.

Stock Selection: NVIDIA $NVDA was selected because it is an actively traded stock with decent volatility. There is significant mention of the ticker on twitter per 30 minute window, the volume of which rarely exceeds the limit of 100 tweets *more on this later.

At this point a clearer picture could be formed. Stock activity would be collected looking back at a 30 minute window, and twitter data would also be collected looking back at a 30 minute window.

Extracting the Data

For twitter: A developer account is required to obtain API credentials. API credentials are used to make requests. This is the request that was used to extract the twitter data:

headers = {
"Authorization": "Bearer {bearer}".format(bearer=BEARER_TOKEN)
}
today = datetime.datetime.now()
if today.minute > 30:
today = today.replace(minute=30)
else:
today = today.replace(minute=0)
thirty_minutes_ago = today - datetime.timedelta(minutes=30)
thirty_minutes_ago = thirty_minutes_ago.isoformat()[0:20]+'000Z'
today = today.isoformat()[0:20]+'000Z'
r = requests.get('https://api.twitter.com/2/tweets/search/recent?query=nvda&tweet.fields=created_at&end_time={end}&start_time={start}&max_results=100'.format(start=thirty_minutes_ago, end=today), headers=headers)

Example response:

[{'created_at': '2020-12-20T14:45:07.000Z', 'id': '1340669567048294401', 'text': 'NVDA < 50 RSI [14,1d] (47.65)\n2020-12-20 09:45:07'},{'created_at': '2020-12-20T11:23:24.000Z', 'id': '1340618800996233216', 'text': 'RT @alistairmilne: Year-to-date performance (USD):\n\nTesla $TSLA +730%\n#Bitcoin $BTC  +227%\nNVIDIA $NVDA +126%\nPayPal $PYPL +119%\nMicroStrat…'},{'created_at': '2020-12-20T11:19:20.000Z', 'id': '1340617777112117249', 'text': 'RT @alistairmilne: Year-to-date performance (USD):\n\nTesla $TSLA +730%\n#Bitcoin $BTC  +227%\nNVIDIA $NVDA +126%\nPayPal $PYPL +119%\nMicroStrat…'}]

The code was written to collect only tweets before the current time rounded down by the nearest half hour, and to have a look back window for tweets equal to 30 minutes. This means that regardless of when the code is run, it will only ever collect tweets at hour:00 to hour:30 or hour:30 to hour:00. This is important because of the constraints of the financial data.

For the financial data, a super easy to use API called yfinance was used. No key required. OHLC data at 30 minute intervals was requested. The API returns data in a friendly df format.

Validation, Cleaning, Transformation

The Twitter data needed to be cleaned- retweets were removed. Duplicates were checked for (twitter provides tweet uids), and the tweets were checked to ensure they only fell within the specified 30 min time window. The timestamp of each tweet needed to also be adjusted to EST, the timezone used for the financial data. For each tweet, a string referencing the 30 min time block was derived, so that tweets could be easily cross referenced with the financial data — useful for joins later. A cleaned row in the tweets df would contain the following columns:

tweet_id    tweet_body    thirty_min_close_nyc

The financial data required little cleaning. Some derived column values such as returns over 30 min block and volatility over 30 min block were added to the df, additionally a column to contain a string referencing the 30 min time block, matching the tweet data, was added. A cleaned row in the financial data df would contain the following columns:

open high low close closetime returns_percent volatility_percent

At the end of each cycle of extraction and transformation, the dfs were added to their respective tables in the database.

Storage

The data was initially stored locally in the project directory, but this was only temporary. During deployment the goal was to move the data to a cloud storage solution. Seeking to build familiarity with the AWS ecosystem, an amazon RDS running postgreSQL was selected and setup.

The psycopg2 package was used to create the respective tables within python:

CREATE TABLE IF NOT EXISTS tweets(
tweet_id VARCHAR(200) PRIMARY KEY NOT NULL,
tweet_body VARCHAR(300) NOT NULL,
thirty_min_close_nyc VARCHAR(200) NOT NULL
);
CREATE TABLE IF NOT EXISTS stock_data(
open NUMERIC NOT NULL,
high NUMERIC NOT NULL,
low NUMERIC NOT NULL,
close NUMERIC NOT NULL,
closetime VARCHAR(200) NOT NULL,
returns_percent NUMERIC NOT NULL,
volatility_percent NUMERIC NOT NULL
);

SQLAlchemy was used to append the df as rows to each table.

Deployment

The scripts were now written:
Both scripts essentially :
1. pull from twitter/yfinance
2. clean & validate
3. push to db

The next step was to deploy this to a server with some type of scheduling system in place.

An AWS EC2 instance was provisioned following this guide and once ready, python was installed, the dependencies for the scripts were installed using pip, then files were transferred via scp.

scp etl.zip ec2-user@[server].[region].compute.amazonaws.com

After transfer, the files were extracted.

Scheduling

Finally, the job needed to be scheduled to run regularly at 30 min intervals.

There were constraints though, working with stock data means that stock data was not going to be produced on weekends, nor would it be produced outside of RTH (Real Trading Hours) — that means the only time to request data should be from Mon-Fri 09:30–16:00EST. The server was set to UTC (EST + 5:00), therefore crontimes are adjusted.

The scripts were scheduled to run in crontab:

5,35 14-21 * * 1-5 python3 /home/ec2-user/etl-copy/twitter.py >/dev/null 2>&15,35 14-21 * * 1-5 python3 /home/ec2-user/etl-copy/finance.py >/dev/null 2>&1

This statement means, at hour:05 and hour:35, for hours 2pm-9pm, monday to friday do the following:
Run the python script, and discard any output (don’t store a log).

Summary

That’s it!

We have built a system that pulls financial data and twitter data at 30 minute blocks- rounded to the nearest half hour, cleans that data, stores it in a cloud db, runs automatically.

Next we can wait for the data to build up and then run some queries to demonstrate what we’re gathering.

Here is the GitHub link if you want to build the same project

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store