100 Days of Learning: Day 12 – Using SQLAlchemy as an ORM for our Lego database

Here is my Log book

Git repository for the functions.

Remember: faas-cli logs is your best friend. You can also tail the logs with "faas-cli logs NAME —tail"

Using SQLAlchemy

First I need to read the tutorial on how this actually works. I know SQLAlchemy is very popular but for some reason I have just never used it. I will also be referencing the docs on mariadb.

I also found this tutorial good enough to learn the basic SQLAlchemy ORM that I needed for this project.

# First trying this on my local machine
(venv)$ pip install sqlalchemy
(venv)$ python

>>> from sqlalchemy import create_engine
>>> from sqlalchemy import text
>>> engine = create_engine("mariadb+mariadbconnector://user:pass@some_mariadb/dbname?charset=utf8mb4")
>>> with engine.connect() as conn:
...     result = conn.execute(text("select 'hello world'"))
...     print(result.all())

Ok that worked. I continued to explore what the most minimal ORM code I would need to write to have a model named LegoSet that will map to the table in MariaDB. I will spare you all the steps I took to establish this.

I created a shell script named up.sh to do the faas-cli up with the build options required.

#!/bin/bash
faas-cli up --build-option dev -f legodb.yml

Don’t forget to update the OpenFaaS requirements.txt file.

(venv)$ pip freeze > legodb/requirements.txt
(venv)$ cat legodb/requirements.txt
greenlet==1.0.0
mariadb==1.0.6
SQLAlchemy==1.4.2

Creating the LegoSet model class

Create a new file named models.py in the same directory as handler.py.

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

We will need to be able to import this file into handler.py. There is a good example on OpenFaaS of how relative imports work in Python.

First I tested that the local import and SQLAlchemy imports worked before moving on.

# Modified handler.py
...
import mariadb
# Added this line to import the LegoSet model class
from .models import LegoSet

Build and test

$ ./up.sh
$ curl -s http://192.168.64.4:9999/function/legodb/legosets | jq
# Still works (it did take some trial and error to get here)

Getting all the Lego sets from the database

Edit models.py

from sqlalchemy import create_engine, Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

Base = declarative_base()

class LegoSet(Base):
    __tablename__ = 'legosets'
    id = Column('pkID', Integer, primary_key=True, nullable=False)
    legoID = Column('LegoID', Integer, unique=True, nullable=False)
    description = Column('Description',String(200))
    productURL = Column('ProductURL',String(4096))
    imageURL = Column('ImageURL',String(4096))

def database_session(host, port, user, password, schema):
    engine = create_engine(f"mariadb+mariadbconnector://{user}:{password}@{host}:{port}/{schema}?charset=utf8mb4")
    
    Session = sessionmaker(bind=engine)
    Base.metadata.create_all(engine)
    
    session = Session()
    return session

def get_all_legosets(session):
    legosets = session.query(LegoSet).all()
    return legosets

Edit handler.py

import os
import json
from .models import LegoSet, database_session, get_all_legosets

# GET /legosets : Returns the list of lego sets
# POST /legoset : Add a new lego set to the database

def handle(event, context):
    response = {'statusCode': 400}

    if event.method == 'GET':
        if event.path == '/legosets':
            legosets = get_list_of_legosets()
            response = {'statusCode': 200, 
                'body': legosets,
                'headers': {'Content-Type': 'application/json'}
                }
    elif event.method == 'POST':
        if event.path == '/legoset':
            response = add_new_legoset(event.body)
    
    return response

def load_secret(name):
    filepath = os.path.join('/var/openfaas/secrets/', name)
    with open(filepath) as f:
        secret = f.read()
    return secret

def create_database_session():
    host = load_secret('database-host')
    user = load_secret('database-user')
    password = load_secret('database-password')
    schema = os.environ.get('database-name')
    port = int(os.environ.get('database-port', '3306'))

    session = database_session(host, port, user, password, schema)
    return session

def get_list_of_legosets():
    session = create_database_session()
    legosets = get_all_legosets(session)
    result = []

    for legoset in legosets:
        result.append({
            'legoID': legoset.legoID,
            'description': legoset.description,
            'productURL': legoset.productURL,
            'imageURL': legoset.imageURL
        })

    session.close()
    return {"sets": result}

def add_new_legoset(body):
    response = None
    try:
        inputJSON = json.loads(body.decode('utf8').replace("'", '"'))
        response = {
            'statusCode': 200,
            'body': {'received': inputJSON},
            'headers': {'Content-Type': 'application/json'}
        }
    except ValueError:
        response = {
            'statusCode': 400,
            'body': {'reason': 'Invalid JSON'},
            'headers': {'Content-Type': 'application/json'}
         }
    return response

To verify that this does actually work as expected, I added an extra Lego set using MySQLWorkbench.

Build and test.

$ curl -s http://192.168.64.4:9999/function/legodb/legosets | jq
{
  "sets": [
    {
      "description": "Pirates of Barracuda Bay",
      "imageURL": "https://live.staticflickr.com/65535/49698377257_be3f773feb_b.jpg",
      "legoID": 21322,
      "productURL": "https://www.lego.com/en-gb/product/pirates-of-barracuda-bay-21322"
    },
    {
      "description": "Medieval Blacksmith",
      "imageURL": "https://images.brickset.com/sets/AdditionalImages/21325-1/21325-1_95850.jpg",
      "legoID": 21325,
      "productURL": "https://www.lego.com/en-gb/product/medieval-blacksmith-21325"
    },
    {
      "description": "Mars Research Shuttle",
      "imageURL": "https://www.lego.com/cdn/cs/set/assets/blt32796f487437e9db/60226_alt1.jpg",
      "legoID": 60226,
      "productURL": "https://www.lego.com/en-gb/product/mars-research-shuttle-60226"
    }
  ]
}

Time to add new Lego sets using our model

Edit model.py

# Add this at the bottom
def create_legoset(json):
    legoID = json.get('legoID', None)
    if legoID is None:
        return None
    description = json.get('description', None)
    if description is None:
        return None
    productURL = json.get('productURL', None)
    imageURL = json.get('imageURL', None)

    legoset = LegoSet(legoID=legoID, description=description, productURL=productURL, imageURL=imageURL)
    return legoset

Edit handler.py

# Change the import
from .models import LegoSet, database_session, get_all_legosets, create_legoset

# Replace add_new_legoset
def add_new_legoset(body):
    response = None
    try:
        inputJSON = json.loads(body.decode('utf8').replace("'", '"'))
        legoset = create_legoset(inputJSON)
        if legoset is None:
            raise ValueError()
        else:
            session = create_database_session()
            session.add(legoset)
            session.commit()
            session.refresh(legoset)
            newID = legoset.id
            session.close()

            response = {
                'statusCode': 200,
                'body': {'pkID': newID},
                'headers': {'Content-Type': 'application/json'}
        }
    except ValueError:
        response = {
            'statusCode': 400,
            'body': {'reason': 'Invalid JSON'},
            'headers': {'Content-Type': 'application/json'}
         }
    return response

Build and test

# Add a new lego set to the database
$ curl -s -H "Content-Type: application/json" -d '{"legoID":6929,"description":"Star Fleet Voyager","productURL":"https://brickset.com/sets/6929-1/Star-Fleet-Voyager","imageURL":"https://images.brickset.com/sets/images/6929-1.jpg"}' "http://192.168.64.4:9999/function/legodb/legoset" | jq
{"pkID":8}

# Verify
$ curl -s http://192.168.64.4:9999/function/legodb/legosets | jq
...
    {
      "description": "Star Fleet Voyager",
      "imageURL": "https://images.brickset.com/sets/images/6929-1.jpg",
      "legoID": 6929,
      "productURL": "https://brickset.com/sets/6929-1/Star-Fleet-Voyager"
    }
...

Next up

We now have a MariaDB instance running on AWS RDS and an OpenFaaS function that serves a RESTful API that allows us to get the list of Lego sets as well as being able to add a new set to the database.

Somethings that we could add (but I will leave that for the reader):

  • Protect the API using a secret token
  • Catch and handle database errors
  • Sanitise the database inputs
  • Unit-testing

As it stands right now, our function is not really shareable just yet. Meaning it works on my machine but won’t work on someone else’s machine. Tomorrow I will look at what is required to make this a "install, follow recipe and forget".

I also want to play around with having a function that will run on a schedule to go and fetch the imageURL and store the image data as a blob inside of the database.