L'espace de Vincent sur le web
Pratiquer mes aptitudes en essayant l'algo-trading.
À l'université, j'ai fait une mineure en intelligence des données qui m'a introduit aux champs fascinants du Data Mining et du Machine Learning. Je trouvais incroyable qu'on puisse émettre des prédictions valables sur notre monde physique avec des algorithmes. Ça m'a motivé à apprendre à programmer en Python, le language de l'heure en analyse de données.
Durant mon apprentissage, je suis tombé sur des tutoriels Python pour programmer des robots "traders" sur la bourse. J'étais déjà familier avec les rouages de la bourse, et je me demandais depuis on moment si les forums boursiers où les gens promeuvent certaines actions de compagnies (les fameux "stock pumpers") avaient une valeur prédictive quelconque. Les données étant toutes accessibles, je me suis lancé dans le développement d'un algorithme prédicteur de mouvements des cours des actions qui utiliserait les données sur les forums.
Ci-dessous mon code brut infructueux à ce jour, et des explications:
import os
from datetime import datetime
from get_stocklists import get_stocklists
from get_price_history import get_price_history
from get_posts import get_posts
from get_news_articles import get_news_articles
def stock_routine():
os.chdir("../python projects/stock picker")
print("stock picker - data scraper start, it is now", datetime.now())
fcts = {"get_stocklists":get_stocklists, # working, 29-10-2020
"get_price_history":get_price_history, # working 30-10-2020
"get_posts":get_posts, # working 30-10-2020
"get_news_articles":get_news_articles} # working 30-10-2020
for fct_str, fct in fcts.items():
start = datetime.now()
print(fct_str, "start")
try:
fct()
except:
print("could not scrape", fct_str)
print("done, it took:", datetime.now() - start)
print("stock picker - data scraper done, it is now", datetime.now())
if __name__ == "__main__":
stock_routine()
import sqlite3
import pandas as pd
from bs4 import BeautifulSoup
from open_session import open_session
def get_price_history():
global table_data, export_table_data
session = open_session()
db = sqlite3.connect('stockhouse.db')
cursor = db.cursor()
# get symbol (tickers) list from database
cursor.execute("""select symbol, stock_exchange from stock_listing where listed = 1""")
existing_stocks = cursor.fetchall()
# get stocks' most recent price history line
cursor.execute("""select ph_symbol, max(ph_date) from price_history group by ph_symbol""")
recent_price_history = cursor.fetchall()
recent_price_history = dict(recent_price_history)
export_table_data = pd.DataFrame(columns = ["ph_date", "ph_symbol", "ph_open", "ph_high", "ph_low", "ph_close", "ph_change", "ph_volume", "ph_number_of_trades", "ph_bid", "ph_ask", "ph_stock_exchange"])
# go over every stock, and get the missing price data up until the last market close
for symbol, stock_exchange in existing_stocks:
if symbol in recent_price_history: # I have historical data already, get the most recent row date
most_recent_date = recent_price_history[symbol]
else: # new stock, get the last 3 months' market close data
most_recent_date = "2020-01-01"
url1 = "https://www.stockwatch.com/Quote/Detail.aspx?snapshot=SX&symbol="
url2 = "®ion=C"
r = session.get(url1 + symbol + url2)
soup = BeautifulSoup(r.content, "html.parser")
html_table_data = soup.find('table', {"id": "MainContent_Close1_Table1_Table1"})
try:
table_data = pd.read_html(str(html_table_data))[0].iloc[:, :12].drop(['Ex'], axis=1) # html to df; slice and drop cols
table_data = table_data[table_data['Date'] > most_recent_date] # keep only the rows I don't have in the DB
table_data.columns = ["ph_date", "ph_symbol", "ph_open", "ph_high", "ph_low", "ph_close", "ph_change", "ph_volume", "ph_number_of_trades", "ph_bid", "ph_ask"]
cols = table_data.columns[2:] # filter cols to treat right below
table_data[cols] = table_data[cols].apply(pd.to_numeric, errors='coerce') # prevent non-numeric data to be inserted in numeric cols
table_data.insert(2, 'ph_stock_exchange', stock_exchange)
export_table_data = pd.concat([table_data, export_table_data], ignore_index=True)
except:
pass
export_table_data = export_table_data[~export_table_data.ph_date.str.contains("Symbol")]
export_table_data = export_table_data[~export_table_data.ph_date.str.contains("Consolidation")]
print("new price_history lines:", export_table_data.shape[0])
export_table_data.to_sql('temp_price_history', con=db, if_exists='replace', index=False) # append row(s) in db
cursor.execute("insert or ignore into price_history select * from temp_price_history")
cursor.execute("drop table temp_price_history")
db.commit()
db.close()
if __name__=="__main__":
get_price_history()
import re
import datetime
import pandas as pd
import numpy as np
import sqlite3
from bs4 import BeautifulSoup
from open_session import open_session
def get_posts():
session = open_session()
db = sqlite3.connect('stockhouse.db')
cursor = db.cursor()
cursor.execute("select symbol from posts")
stocklist = cursor.fetchall()
stocklist = [i[0] for i in stocklist]
cursor.execute("""select post_id from posts""")
existing_posts = cursor.fetchall()
existing_posts = [i[0] for i in existing_posts]
insert_statements = []
stock_exchanges = {"T":"TSX", "V":"TSXV", "C":"CSE"}
accepted_preffixes = ["T", "V", "C"]
url = "https://stockhouse.com/community/bullboards/"
consecutive_matches = 0
end_phrase = "Sorry, there are no posts meeting your filter criteria."
end_of_pages = 0
page = 1
while end_of_pages == 0: # go over every page until error page
if consecutive_matches > 10:
page = page * 2
consecutive_matches = 0
print("consecutive matches trigger, jump ahead to page:", page)
if page % 100 == 0:
print("page: ", page)
cookies = {'privacy-policy': '1,XXXXXXXXXXXXXXXXXXXXXX'}
r = session.get(url + str(page), cookies=cookies)
soup = BeautifulSoup(r.content, "html.parser")
end_of_pages_check = soup.find('div', class_="bullboard-posts").text
if end_phrase in end_of_pages_check:
print("End of pages reached")
end_of_pages = 1
continue
raw_posts_on_page = soup.find_all('h3', {"class": "left"}) # h3 tags containing the posts' urls
for i in raw_posts_on_page:
href = i.find('a')['href']
post_url = "https://stockhouse.com" + href
# print(post_url)
r = session.get(post_url, cookies=cookies)
soup = BeautifulSoup(r.content, "html.parser")
# symbol
try:
raw_symbol = soup.find("div", {"class":"company-header"})
raw_symbol = raw_symbol.find_all("span")
raw_symbol = raw_symbol[0].text.upper()
# print(raw_symbol)
except:
continue
raw_alt_symbol = soup.find("h2")
if "Primary Symbol" or "Alternate Symbol(s):" in raw_alt_symbol.text:
try:
raw_alt_symbol = raw_alt_symbol.contents[2].find("a").text.upper()
alternate_symbol = raw_alt_symbol.split(".")[1]
except:
alternate_symbol = np.nan
else:
alternate_symbol = np.nan
# stock_exchange
raw_stock_exchange = raw_symbol.split(".")[0]
# print(raw_stock_exchange)
if raw_stock_exchange not in accepted_preffixes:
# check for primary or alternate symbol
try:
raw_alt_symbol = raw_symbol.split(".")[0]
if raw_stock_exchange not in accepted_preffixes:
# print("primary not in accepted exchanges")
continue
except:
# print("not in accepted exchanges")
continue
else:
pass
# print("in accepted exchanges, now check if in stocklist")
stock_exchange = stock_exchanges[raw_stock_exchange]
# check if I got the symbol in my stocklist
if len(raw_symbol) > 1: # #I had an error "index out of range"
symbol = raw_symbol.split(".")[1]
else:
symbol = raw_symbol
if symbol in stocklist:
# print("in stocklist")
pass
elif symbol + ".UN" in stocklist:
symbol = symbol + ".UN"
# print("+.UN in stocklist")
pass
else: # check primary or alternate symbol
# print("not in stocklist, check primary or alternate")
try:
if alternate_symbol in stocklist:
# print("primary in stocklist")
pass
elif alternate_symbol + ".UN" in stocklist:
alternate_symbol = alternate_symbol + ".UN"
# print("primary +.UN in stocklist")
pass
except:
pass
# print("symbol:", symbol, "se:", stock_exchange, "alt:", alternate_symbol)
# "post_id"
try:
post_id = int(href.split("=")[1])
# print("post_id: ", post_id)
if post_id in existing_posts:
consecutive_matches += 1
continue
else:
consecutive_matches = 0
existing_posts.append(post_id)
pass
except:
# print("couldn't get post id, continue")
continue
# "post_title"
try:
# post_title = post_titles[post_title_counter].get_text(strip=True)
post_title = soup.find("div", {"class": "post-body"})
post_title = post_title.find("h3").text.strip()
# post_title_counter += 1
# print(post_title)
except:
# print("no post_title")
post_title = np.nan
# "post_content"
try:
post_content = soup.find("div", {"class": "post-content"})
post_content = post_content.get_text().strip()
except:
# print("no post_content")
post_content = np.nan
# "upvote_count"
# upvote_count = soup.select('div', {"class": "sh-inter sh-like"})
# upvote_count = upvote_count.get_text()
# "original_poster"
try:
original_poster = soup.find("a", {"id": "p_lt_zoneContent_SubContent_p_lt_zoneLeft_Stockhouse_CompanyBullboard_viewerForum_postElem_lnkProfileUrl"})
original_poster = original_poster.get_text()
# print(original_poster)
except:
original_poster = np.nan
# print("no original_poster")
# "post_datetime"
try:
post_datetime = soup.find('div', {"class": "post-detail-info"})
post_datetime = post_datetime.get_text().replace('\n', '').replace('\r', '').replace(' ', '')
if "minutes ago" in post_datetime:
post_datetime = datetime.datetime.now()
else:
pattern = r"[A-Z]{2}\d+"
post_datetime = post_datetime[:re.search(pattern, post_datetime).span()[0]]
post_datetime = (pd.to_datetime(post_datetime)).to_pydatetime()
# print(post_datetime)
except:
# print("couldn't get datetime, continue. Post_id:", post_id)
continue
insert_values = (post_id, symbol, alternate_symbol, stock_exchange, post_title, post_content, original_poster, post_datetime, post_url)
insert_statements.append(insert_values)
# print("insert values:", insert_values)
page += 1
print("new posts:", len(insert_statements))
cursor.executemany("""INSERT OR IGNORE INTO posts (post_id, symbol, alternate_symbol, stock_exchange, post_title, post_content, original_poster, post_datetime, post_url)\
VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)""", insert_statements)
db.commit()
db.close()
if __name__=="__main__":
get_posts()
import os
os.chdir("../python projects/stock market predictive modeling")
import sqlite3
import pandas as pd
pd.options.mode.chained_assignment = None
from datetime import timedelta
from datetime import datetime
from pandas.tseries.offsets import BDay
import numpy as np
import matplotlib.pyplot as plt
from math import sqrt
from collections import defaultdict
from collections import Counter
import string
import nltk
import re
# download need only once
#nltk.download("punkt")
def model():
df = pd.read_csv('df.csv')
df = df.set_index('id')
features_to_remove = ['symbol', 'date', 'volume', 'open', 'high', 'low', 'close', 'volume', 'change',
'number_of_trades', 'target_date', '1_cash_volume', '2_cash_volume', '3_cash_volume',
'4_cash_volume', '5_cash_volume', '6_cash_volume', '7_cash_volume', '8_cash_volume',
'9_cash_volume', '10_cash_volume', '11_cash_volume', '12_cash_volume', '13_cash_volume',
'14_cash_volume', '15_cash_volume']
features = [x for x in df.columns if x not in features_to_remove]
X = df[features]
y = df['target']
# Split the Dataset into Training and Test Datasets
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=324)
# Linear Regression: Fit a model to the training set
regressor = LinearRegression()
regressor.fit(X_train, y_train)
# Perform Prediction using Linear Regression Model
y_prediction = regressor.predict(X_test)
# Evaluate Linear Regression Accuracy using Root Mean Square Error
RMSE = sqrt(mean_squared_error(y_true = y_test, y_pred = y_prediction))
print("Root Mean Square Error (RMSE):", RMSE)
y_test['prediction'] = y_prediction
y_test['diff'] = y_test['prediction'] - y_test['target']
print(y_prediction.shape)
print(y_test)
print(y_test['prediction'].describe())
print(y_test['ad_duration'].describe())
# Decision Tree Regressor: Fit a new regression model to the training set
regressor = DecisionTreeRegressor(max_depth=20)
regressor.fit(X_train, y_train)
# Perform Prediction using Decision Tree Regressor
y_prediction = regressor.predict(X_test)
print(y_prediction)
print(y_test.describe())
# Evaluate Linear Regression Accuracy using Root Mean Square Error
RMSE = sqrt(mean_squared_error(y_true = y_test, y_pred = y_prediction))
print("Root Mean Square Error (RMSE):", RMSE)
y_test['prediction'] = y_prediction
print(y_test['prediction'].describe())
print(y_test['ad_duration'].describe())
model()