Labrecquev
English

labrecquev.ca

L'espace de Vincent sur le web


Web Scraping des données boursières avec Python


Pratiquer mes aptitudes en essayant l'algo-trading.

À l'université, j'ai fait une mineure en intelligence des données qui m'a introduit aux champs fascinants du Data Mining et du Machine Learning. Je trouvais incroyable qu'on puisse émettre des prédictions valables sur notre monde physique avec des algorithmes. Ça m'a motivé à apprendre à programmer en Python, le language de l'heure en analyse de données.

Durant mon apprentissage, je suis tombé sur des tutoriels Python pour programmer des robots "traders" sur la bourse. J'étais déjà familier avec les rouages de la bourse, et je me demandais depuis on moment si les forums boursiers où les gens promeuvent certaines actions de compagnies (les fameux "stock pumpers") avaient une valeur prédictive quelconque. Les données étant toutes accessibles, je me suis lancé dans le développement d'un algorithme prédicteur de mouvements des cours des actions qui utiliserait les données sur les forums.

Ci-dessous mon code brut infructueux à ce jour, et des explications:

Module principal: boucle de routine. Importation et exécution des modules
        import os
from datetime import datetime
from get_stocklists import get_stocklists
from get_price_history import get_price_history
from get_posts import get_posts
from get_news_articles import get_news_articles

def stock_routine():    
    os.chdir("../python projects/stock picker")
    print("stock picker - data scraper start, it is now", datetime.now())

    fcts = {"get_stocklists":get_stocklists, # working, 29-10-2020
            "get_price_history":get_price_history, # working 30-10-2020
            "get_posts":get_posts, # working 30-10-2020
            "get_news_articles":get_news_articles} # working 30-10-2020

    for fct_str, fct in fcts.items():
        start = datetime.now()
        print(fct_str, "start")
        try:
            fct()
        except:
            print("could not scrape", fct_str)
        print("done, it took:", datetime.now() - start)
    print("stock picker - data scraper done, it is now", datetime.now())
    
if __name__ == "__main__":
    stock_routine()
        
    
Extraction des données historiques et quotidiennes des actions pour le TSX et le CSE (les bourses de Toronto et Vancouver) et sauvegarde dans une base de données SQLITE
        import sqlite3
import pandas as pd
from bs4 import BeautifulSoup
from open_session import open_session

def get_price_history():    
    global table_data, export_table_data
    session = open_session()
    
    db = sqlite3.connect('stockhouse.db')
    cursor = db.cursor()
    
    # get symbol (tickers) list from database
    cursor.execute("""select symbol, stock_exchange from stock_listing where listed = 1""")
    existing_stocks = cursor.fetchall()

    # get stocks' most recent price history line
    cursor.execute("""select ph_symbol, max(ph_date) from price_history group by ph_symbol""")
    recent_price_history = cursor.fetchall()
    recent_price_history = dict(recent_price_history)
    
    export_table_data = pd.DataFrame(columns = ["ph_date", "ph_symbol", "ph_open", "ph_high", "ph_low", "ph_close", "ph_change", "ph_volume", "ph_number_of_trades", "ph_bid", "ph_ask", "ph_stock_exchange"])
    
    # go over every stock, and get the missing price data up until the last market close
    for symbol, stock_exchange in existing_stocks:
        if symbol in recent_price_history: # I have historical data already, get the most recent row date
            most_recent_date = recent_price_history[symbol]
        else: # new stock, get the last 3 months' market close data
            most_recent_date = "2020-01-01"

            
        url1 = "https://www.stockwatch.com/Quote/Detail.aspx?snapshot=SX&symbol="
        url2 = "®ion=C"
        r = session.get(url1 + symbol + url2)
        soup = BeautifulSoup(r.content, "html.parser")        
        html_table_data = soup.find('table', {"id": "MainContent_Close1_Table1_Table1"})
        
        try:
            table_data = pd.read_html(str(html_table_data))[0].iloc[:, :12].drop(['Ex'], axis=1) # html to df; slice and drop cols
            table_data = table_data[table_data['Date'] > most_recent_date] # keep only the rows I don't have in the DB
            
            table_data.columns = ["ph_date", "ph_symbol", "ph_open", "ph_high", "ph_low", "ph_close", "ph_change", "ph_volume", "ph_number_of_trades", "ph_bid", "ph_ask"]
            cols = table_data.columns[2:] # filter cols to treat right below
            
            table_data[cols] = table_data[cols].apply(pd.to_numeric, errors='coerce') # prevent non-numeric data to be inserted in numeric cols
            table_data.insert(2, 'ph_stock_exchange', stock_exchange)
            
            export_table_data = pd.concat([table_data, export_table_data], ignore_index=True)
            
        except:
            pass
    
    export_table_data = export_table_data[~export_table_data.ph_date.str.contains("Symbol")]
    export_table_data = export_table_data[~export_table_data.ph_date.str.contains("Consolidation")]
    
    print("new price_history lines:", export_table_data.shape[0])
    export_table_data.to_sql('temp_price_history', con=db, if_exists='replace', index=False) # append row(s) in db
            
    cursor.execute("insert or ignore into price_history select * from temp_price_history")
    cursor.execute("drop table temp_price_history")
        
    db.commit()
    db.close()
    
if __name__=="__main__":
    get_price_history()
        
    
Extraction des publications sur les forums et sauvegarde dans la base de données
        import re
import datetime
import pandas as pd
import numpy as np
import sqlite3
from bs4 import BeautifulSoup
from open_session import open_session
    
def get_posts():    
    session = open_session()
    
    db = sqlite3.connect('stockhouse.db')
    cursor = db.cursor()
    
    cursor.execute("select symbol from posts")
    stocklist = cursor.fetchall()
    stocklist = [i[0] for i in stocklist]
    
    cursor.execute("""select post_id from posts""")
    existing_posts = cursor.fetchall()
    existing_posts = [i[0] for i in existing_posts]

    insert_statements = []
    stock_exchanges = {"T":"TSX", "V":"TSXV", "C":"CSE"}
    accepted_preffixes = ["T", "V", "C"]
    url = "https://stockhouse.com/community/bullboards/"
    consecutive_matches = 0
    
    end_phrase = "Sorry, there are no posts meeting your filter criteria."
    end_of_pages = 0
    page = 1
    
    while end_of_pages == 0: # go over every page until error page        
        if consecutive_matches > 10:
            page = page * 2
            consecutive_matches = 0
            print("consecutive matches trigger, jump ahead to page:", page)
        if page % 100 == 0:
            print("page: ", page)
        
        cookies = {'privacy-policy': '1,XXXXXXXXXXXXXXXXXXXXXX'}    
        r = session.get(url + str(page), cookies=cookies)
        soup = BeautifulSoup(r.content, "html.parser")
        end_of_pages_check = soup.find('div', class_="bullboard-posts").text

        if end_phrase in end_of_pages_check:
            print("End of pages reached")
            end_of_pages = 1
            continue
        
        raw_posts_on_page = soup.find_all('h3', {"class": "left"}) # h3 tags containing the posts' urls
        
        for i in raw_posts_on_page:
            href = i.find('a')['href']
            post_url = "https://stockhouse.com" + href
#            print(post_url)
            r = session.get(post_url, cookies=cookies)
            soup = BeautifulSoup(r.content, "html.parser")
            
            # symbol
            try:
                raw_symbol = soup.find("div", {"class":"company-header"})
                raw_symbol = raw_symbol.find_all("span")
                raw_symbol = raw_symbol[0].text.upper()
#                print(raw_symbol)
            except:
                continue
            
            raw_alt_symbol = soup.find("h2")
            if "Primary Symbol" or "Alternate Symbol(s):" in raw_alt_symbol.text:
                try:
                    raw_alt_symbol = raw_alt_symbol.contents[2].find("a").text.upper()
                    alternate_symbol = raw_alt_symbol.split(".")[1]
                except:
                    alternate_symbol = np.nan
            else:
                alternate_symbol = np.nan
            
            # stock_exchange
            raw_stock_exchange = raw_symbol.split(".")[0]
#                print(raw_stock_exchange)
            if raw_stock_exchange not in accepted_preffixes:
                # check for primary or alternate symbol
                try:
                    raw_alt_symbol = raw_symbol.split(".")[0]
                    if raw_stock_exchange not in accepted_preffixes:
#                        print("primary not in accepted exchanges")
                        continue                        
                except:                        
#                    print("not in accepted exchanges")
                    continue
            else:
                pass
#                print("in accepted exchanges, now check if in stocklist")
            
            stock_exchange = stock_exchanges[raw_stock_exchange]

            # check if I got the symbol in my stocklist
            if len(raw_symbol) > 1: # #I had an error "index out of range"
                symbol = raw_symbol.split(".")[1]
            else:
                symbol = raw_symbol
            
            if symbol in stocklist:
#                print("in stocklist")
                pass
            elif symbol + ".UN" in stocklist:
                symbol = symbol + ".UN"
#                print("+.UN in stocklist")
                pass
            else: # check primary or alternate symbol
#                print("not in stocklist, check primary or alternate")
                try:                        
                    if alternate_symbol in stocklist:
#                        print("primary in stocklist")
                        pass
                    elif alternate_symbol + ".UN" in stocklist:
                        alternate_symbol = alternate_symbol + ".UN"
#                        print("primary +.UN in stocklist")
                        pass
                except:
                    pass

#            print("symbol:", symbol, "se:", stock_exchange, "alt:", alternate_symbol)
                
            #  "post_id"
            try:
                post_id = int(href.split("=")[1])
#                print("post_id: ", post_id)
                if post_id in existing_posts:
                    consecutive_matches += 1
                    continue              
                else:
                    consecutive_matches = 0
                    existing_posts.append(post_id)
                    pass
            except:
#                print("couldn't get post id, continue")
                continue
                          
            #   "post_title"    
            try:
#                post_title = post_titles[post_title_counter].get_text(strip=True)
                post_title = soup.find("div", {"class": "post-body"})
                post_title = post_title.find("h3").text.strip()
#                post_title_counter += 1
#                print(post_title)
            except:
#                print("no post_title")
                post_title = np.nan
            
            #   "post_content"
            try:
                post_content = soup.find("div", {"class": "post-content"})
                post_content = post_content.get_text().strip()
            except:
#                print("no post_content")
                post_content = np.nan
    
            #   "upvote_count" 
    #        upvote_count = soup.select('div', {"class": "sh-inter sh-like"})
    #        upvote_count = upvote_count.get_text()
    
            #   "original_poster"
            try:
                original_poster = soup.find("a", {"id": "p_lt_zoneContent_SubContent_p_lt_zoneLeft_Stockhouse_CompanyBullboard_viewerForum_postElem_lnkProfileUrl"})
                original_poster = original_poster.get_text()
#                print(original_poster)
            except:
                original_poster = np.nan
#                print("no original_poster")
            
            #   "post_datetime"
            try:
                post_datetime = soup.find('div', {"class": "post-detail-info"})
                post_datetime = post_datetime.get_text().replace('\n', '').replace('\r', '').replace('  ', '')
                if "minutes ago" in post_datetime:
                    post_datetime = datetime.datetime.now()
                else:
                    pattern = r"[A-Z]{2}\d+"
                    post_datetime = post_datetime[:re.search(pattern, post_datetime).span()[0]]
                    post_datetime = (pd.to_datetime(post_datetime)).to_pydatetime()
#                print(post_datetime)
            except:
#                print("couldn't get datetime, continue. Post_id:", post_id)
                continue
                
            insert_values = (post_id, symbol, alternate_symbol, stock_exchange, post_title, post_content, original_poster, post_datetime, post_url)
            insert_statements.append(insert_values)
#            print("insert values:", insert_values)
            
        page += 1
    
    print("new posts:", len(insert_statements))
    cursor.executemany("""INSERT OR IGNORE INTO posts (post_id, symbol, alternate_symbol, stock_exchange, post_title, post_content, original_poster, post_datetime, post_url)\
                       VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?)""", insert_statements)
    db.commit()
    db.close()
    
if __name__=="__main__":
    get_posts()
        
    
En utilisant les données boursières et les publications sur les forums (variables prédictives du modèle), j'ai tenté de bâtir un modèle de régression linéaire en utilisant le module scikit-learn de Python. J'essayais de prédire le mouvement du prix des actons dans n jours (variable cible du modèle). La précision de ce premier modèle est décevante, car elle prédit moins bien que la chance aléatoire. Les prochaines étapes seront de nettoyer mes données, modifier et mieux sélectionner mes variables prédictives, essayer différentes cibles, et essayer différents algorithmes prédictifs autres que la régression linéaire.
        import os
os.chdir("../python projects/stock market predictive modeling")
import sqlite3
import pandas as pd
pd.options.mode.chained_assignment = None
from datetime import timedelta
from datetime import datetime
from pandas.tseries.offsets import BDay
import numpy as np
import matplotlib.pyplot as plt
from math import sqrt
from collections import defaultdict
from collections import Counter
import string
import nltk
import re
# download need only once
#nltk.download("punkt")



def model():
    df = pd.read_csv('df.csv')
    df = df.set_index('id')

    features_to_remove = ['symbol', 'date', 'volume', 'open', 'high', 'low', 'close', 'volume', 'change',
       'number_of_trades', 'target_date', '1_cash_volume', '2_cash_volume', '3_cash_volume',
       '4_cash_volume', '5_cash_volume', '6_cash_volume', '7_cash_volume', '8_cash_volume', 
       '9_cash_volume', '10_cash_volume', '11_cash_volume', '12_cash_volume', '13_cash_volume', 
       '14_cash_volume', '15_cash_volume']
        
    features = [x for x in df.columns if x not in features_to_remove]
    
    X = df[features]
    y = df['target']
    
   # Split the Dataset into Training and Test Datasets
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.25, random_state=324)
   
   # Linear Regression: Fit a model to the training set
    regressor = LinearRegression()
    regressor.fit(X_train, y_train)
   
   # Perform Prediction using Linear Regression Model
    y_prediction = regressor.predict(X_test)
    
   # Evaluate Linear Regression Accuracy using Root Mean Square Error    
    RMSE = sqrt(mean_squared_error(y_true = y_test, y_pred = y_prediction))
    print("Root Mean Square Error (RMSE):", RMSE)
 
   y_test['prediction'] = y_prediction
   y_test['diff'] = y_test['prediction'] - y_test['target']
   print(y_prediction.shape)
   print(y_test)
   print(y_test['prediction'].describe())
   print(y_test['ad_duration'].describe())
    
    
    # Decision Tree Regressor: Fit a new regression model to the training set
   regressor = DecisionTreeRegressor(max_depth=20)
   regressor.fit(X_train, y_train)
        
    # Perform Prediction using Decision Tree Regressor
   y_prediction = regressor.predict(X_test)
   print(y_prediction)
   print(y_test.describe())
    
    # Evaluate Linear Regression Accuracy using Root Mean Square Error
   RMSE = sqrt(mean_squared_error(y_true = y_test, y_pred = y_prediction))
   print("Root Mean Square Error (RMSE):", RMSE)
    
   y_test['prediction'] = y_prediction
   print(y_test['prediction'].describe())
   print(y_test['ad_duration'].describe())
    
model()