CS375: Software Engineering - Cloud Architectures


Activity Goals

The goals of this activity are:
  1. To facilitate software integration and scale by migrating to a cloud-based architecture

The Activity

Directions

Consider the activity models and answer the questions provided. First reflect on these questions on your own briefly, before discussing and comparing your thoughts with your group. Appoint one member of your group to discuss your findings with the class, and the rest of the group should help that member prepare their response. Answer each question individually from the activity, and compare with your group to prepare for our whole-class discussion. After class, think about the questions in the reflective prompt and respond to those individually in your notebook. Report out on areas of disagreement or items for which you and your group identified alternative approaches. Write down and report out questions you encountered along the way for group discussion.

Model 1: Case Study: The IoT Sensor Framework

from flask import Flask, jsonify, abort, make_response, request
from flask_cors import CORS
# Data Members
ws_database = None
ws_crypto = None
ws_flask_host = None
ws_flask_port = None
ws_name = None
ws_do_debug = False
app = Flask('Webserver')
CORS(app)
# call manually with abort(404)
@app.errorhandler(404)
def ws_web_not_found(error):
return make_response(jsonify({'error': ('Not found %s', (error))}), 404)
@app.errorhandler(400)
def ws_web_bad_request(error):
return make_response(jsonify({'error': ('Bad request %s', (error))}), 400)
@app.route('/api/audit', methods=['GET'])
# GET
def ws_web_audit():
global ws_database
data = ws_database.get_audit()
return make_response(jsonify({'data': data}), 200)
@app.route('/api/iot/maxtime', methods=['GET'])
@app.route('/api/rssi/maxtime', methods=['GET'])
# GET
def ws_iot_web_max_rel_time():
global ws_database
data = ws_database.get_max_rel_time()
return make_response(jsonify({'data': data}), 200)
@app.route('/api/iot/seconds/<int:lastnsec>', methods=['POST'])
@app.route('/api/iot/<int:starttime>/<int:endtime>', methods=['POST'])
@app.route('/api/iot/<int:starttime>', methods=['POST'])
@app.route('/api/iot', methods=['POST', 'PUT'])
@app.route('/api/rssi/seconds/<int:lastnsec>', methods=['POST'])
@app.route('/api/rssi/<int:starttime>/<int:endtime>', methods=['POST'])
@app.route('/api/rssi/<int:starttime>', methods=['POST'])
@app.route('/api/rssi', methods=['POST', 'PUT'])
# PUT:
# Content-Type: 'application/json'
# { "data": { "db_password": "abc123", "relative_time": 500, "interrogator_time": "3/18/2014 10:59:19.123456 AM", "freeform": "..."} }
# POST (acting as a GET with a body):
# Content-Type: 'application/json'
# { "data": { "db_password": 'str'} }
def ws_iot_web(starttime=-1, endtime=-1, lastnsec=-1):
if request.method == 'PUT':
return ws_iot_add_data()
elif request.method == 'POST':
return ws_iot_get_all_data(starttime, endtime, lastnsec)
else:
abort(400)
def ws_iot_get_all_data(starttime=-1, endtime=-1, lastnsec=-1):
global ws_database
if request.json and 'data' in request.json:
db_pw = request.json['data'].get('db_password', "")
else:
db_pw = ''
if lastnsec == -1:
if starttime == -1 and endtime == -1:
data = ws_database.fetch_all(db_pw=db_pw)
elif endtime == -1:
data = ws_database.fetch_since(starttime, db_pw)
else:
data = ws_database.fetch_between_window(starttime, endtime, db_pw)
else:
data = ws_database.fetch_last_n_sec(lastnsec, db_pw)
return make_response(jsonify({'data': data}), 200)
def getwithdefault(d, key, default):
result = default
if key in d:
result = d[key]
return result
def ws_iot_add_data():
global ws_do_debug
global ws_database
if ws_do_debug:
print(request.json)
if not request.json:
abort(400)
if (not isinstance(request.json, dict)) and (not isinstance(request.json, list)):
abort(400)
insert_list = []
if isinstance(request.json, dict):
insert_list.append(request.json)
else:
insert_list = request.json
for row in insert_list:
if not ('data' in row):
abort(400)
for row in insert_list:
db_pw = getwithdefault(row['data'], 'db_password', "")
freeform = getwithdefault(row['data'], 'freeform', "")
relative_time = getwithdefault(row['data'], 'relative_time', "-1")
interrogator_time = getwithdefault(
row['data'], 'interrogator_time', "-1")
relative_time = int(relative_time)
interrogator_time = int(interrogator_time)
ws_database.insert_row(relative_time, interrogator_time, freeform, db_pw=db_pw)
return make_response(jsonify({'success': str(len(insert_list)) + ' object(s) created'}), 201)
def ws_start(crypto, database, flask_host='0.0.0.0', flask_port=5000, do_debug=False, use_reloader=False):
global ws_crypto
global ws_database
global ws_flask_host
global ws_flask_port
global ws_do_debug
ws_crypto = crypto
ws_database = database
ws_flask_host = flask_host
ws_flask_port = flask_port
ws_do_debug = do_debug
context = ws_crypto.get_ssl_context()
app.run(debug=ws_do_debug, port=ws_flask_port, host=ws_flask_host, ssl_context=context, threaded=False,
use_reloader=use_reloader) # multithreaded web server cannot share database connection
# References
# http://blog.miguelgrinberg.com/post/designing-a-restful-api-with-python-and-flask
# @app.route('/todo/api/v1/tasks', methods = ['POST']) with request.json
# @app.route('/todo/api/v1/tasks/<int:task_id>', methods = ['GET']) uses variable task_id
# http://flask.pocoo.org/snippets/111/

import sqlite3
import base64
import os
import numpy
from database import Database
import threading
import queue
from time import sleep
import json
class SqliteDatabase(Database):
def __init__(self, crypto, db_path='database.db', flush=False, dispatchsleep=0):
Database.__init__(self, crypto, db_path=db_path, flush=flush)
self.dispatchsleep = dispatchsleep
self.insertion_queue = queue.Queue()
self.dispatcher_thread = threading.Thread(
target=self.dispatcher, args=())
self.dispatcher_thread.start()
def dispatcher(self):
conn = self.open_db_connection()
c = conn.cursor()
while 1:
queuelist = dict()
(row, db_pw) = self.insertion_queue.get(block=True)
if not (db_pw in queuelist):
queuelist[db_pw] = []
queuelist[db_pw].append(row)
# http://stackoverflow.com/questions/156360/get-all-items-from-thread-queue
# while we're here, try to pick up any more items that were inserted into the queue
while 1:
try:
(row, db_pw) = self.insertion_queue.get_nowait()
if not (db_pw in queuelist):
queuelist[db_pw] = []
queuelist[db_pw].append(row)
except queue.Empty:
break
for db_pw in queuelist:
self.db_password = db_pw
# combine many rows together into a single insert, each with its own password
rowlist = []
for row in queuelist[db_pw]:
rowlist.append(row)
#print(row)
# the additional interrogatortime entries are for the encryption function which requires a counter to synchronize stream encryption and decryption; this time should be to the microsecond (6 places after the decimal for seconds) to ensure uniqueness, but can be less precise if the interrogator resolution is lower. relative_time is expected in microseconds, and both relativetime and interrogatortime are assumed to be whole numbers (i.e. epoch time)
c.executemany('INSERT INTO IOTD (relative_timestamp, interrogator_timestamp, freeform) VALUES (?,?,encrypt(?,?))', rowlist)
conn.commit()
if self.dispatchsleep > 0:
# if desired, sleep the dispatcher for a short time to queue up some inserts and give the producer some CPU time
sleep(self.dispatchsleep)
conn.close()
def close_db_connection(self, thread='main'):
while self.insertion_queue.qsize() > 0:
sleep(5+2*self.dispatchsleep) # wait for dispatchers to finish
def __del__(self):
self.close_db_connection()
def open_db_connection(self):
# don't store the connection because each thread requires its own
conn = sqlite3.connect(self.db_path)
os.chmod(self.db_path, 0o600)
sqlite3.enable_callback_tracebacks(True) # for user defined function exceptions
conn.create_function('encrypt', 2, self.db_encrypt)
conn.create_function('decrypt', 2, self.db_decrypt)
self.init_database(conn)
return conn # don't forget to conn.close()
def db_encrypt(self, s, counter):
# counter = int(counter) % 10^16 # counter must be at most 16 digits
counter = int(str(counter)[-self.crypto.MAX_COUNTER_DIGITS:]) # counter must be at most 16 digits, take rightmost 16 characters
if type(s) is int:
val = str(s)
elif type(s) is float:
val = str(s)
else:
val = s
aes = self.crypto.get_db_aes(self.db_password, counter)
padded = self.crypto.pad(val)
enc = aes.encrypt(padded)
b64enc = base64.b64encode(enc)
return b64enc
def db_decrypt(self, s, counter):
# counter = int(counter) % 10^16 # counter must be at most 16 digits
counter = int(str(counter)[-self.crypto.MAX_COUNTER_DIGITS:]) # counter must be at most 16 digits, so take the rightmost 16 characters of the string
aes = self.crypto.get_db_aes(self.db_password, counter)
b64dec = base64.b64decode(s)
dec = aes.decrypt(b64dec)
unpaddec = self.crypto.unpad(dec)
unpaddec = unpaddec.decode()
return unpaddec
def init_database(self, conn):
if self.flush == True:
self.flush_database(conn)
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS IOTD(id INTEGER PRIMARY KEY, absolute_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, relative_timestamp BIGINT, interrogator_timestamp BIGINT, freeform TEXT)''')
conn.commit()
def flush_database(self, conn):
c = conn.cursor()
c.execute('''DROP TABLE IF EXISTS IOTD''')
conn.commit()
# get max data time in the db
def get_max_rel_time(self):
conn = self.open_db_connection()
c = conn.cursor()
data = []
for row in c.execute("SELECT MAX(relative_timestamp) FROM IOTD"):
d = dict()
d['max_relative_timestamp'] = row[0]
data.append(d)
conn.commit()
conn.close()
return data
def insert_row(self, relativetime, interrogatortime, freeform, db_pw=''):
# the additional interrogatortime entries are for the encryption function which requires a counter to synchronize stream encryption and decryption; this time should be to the microsecond (6 places after the decimal for seconds) to ensure uniqueness, but can be less precise if the interrogator resolution is lower. relative_time is expected in microseconds, and both relativetime and interrogatortime are assumed to be whole numbers (i.e. epoch time)
# counter entries (i.e., interrogatortime) go after the field being entered into the row tuple
freeformjson = json.dumps(freeform)
row = (relativetime, interrogatortime, freeformjson, interrogatortime)
self.insertion_queue.put((row, db_pw)) # to be read by dispatcher
def fetch_all(self, db_pw=''):
self.db_password = db_pw
data = []
conn = self.open_db_connection()
c = conn.cursor()
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD ORDER BY interrogator_timestamp ASC"):
d = dict()
d['id'] = row[0]
d['absolute_timestamp'] = row[1]
d['relative_timestamp'] = row[2]
d['interrogator_timestamp'] = row[3]
d['freeform'] = row[4]
data.append(d)
conn.commit()
conn.close()
return data
def fetch_last_window(self, windowsize, db_pw=''):
self.db_password = db_pw
data = []
conn = self.open_db_connection()
c = conn.cursor()
input = (windowsize, )
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD ORDER BY interrogator_timestamp DESC LIMIT ?", input):
d = dict()
d['id'] = row[0]
d['absolute_timestamp'] = row[1]
d['relative_timestamp'] = row[2]
d['interrogator_timestamp'] = row[3]
d['freeform'] = row[4]
data.append(d)
conn.commit()
conn.close()
return data
def fetch_since(self, since, db_pw=''):
self.db_password = db_pw
data = []
conn = self.open_db_connection()
c = conn.cursor()
input = (since,)
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD WHERE relative_timestamp >= ? ORDER BY interrogator_timestamp ASC", input):
d = dict()
d['id'] = row[0]
d['absolute_timestamp'] = row[1]
d['relative_timestamp'] = row[2]
d['interrogator_timestamp'] = row[3]
d['freeform'] = row[4]
data.append(d)
conn.commit()
conn.close()
return data
def fetch_between_window(self, start, end, db_pw=''):
self.db_password = db_pw
data = []
conn = self.open_db_connection()
c = conn.cursor()
input = (start, end)
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD WHERE relative_timestamp >= ? AND relative_timestamp <= ? ORDER BY interrogator_timestamp ASC", input):
d = dict()
d['id'] = row[0]
d['absolute_timestamp'] = row[1]
d['relative_timestamp'] = row[2]
d['interrogator_timestamp'] = row[3]
d['freeform'] = row[4]
data.append(d)
conn.commit()
conn.close()
return data
def fetch_last_n_sec(self, n, db_pw=''):
self.db_password = db_pw
data = []
conn = self.open_db_connection()
c = conn.cursor()
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD WHERE absolute_timestamp >= datetime(?, ?) ORDER BY interrogator_timestamp ASC", ('now', '-' + str(n) + ' seconds')):
d = dict()
d['id'] = row[0]
d['absolute_timestamp'] = row[1]
d['relative_timestamp'] = row[2]
d['interrogator_timestamp'] = row[3]
d['freeform'] = row[4]
data.append(d)
conn.commit()
conn.close()
return data
# no auditing in sqlite
def flush_audit(self, conn):
pass
def get_audit(self):
return []
def db_log(self, text):
pass
# References:
# http://www.pythoncentral.io/introduction-to-sqlite-in-python/
# https://docs.python.org/2/library/sqlite3.html
# http://stackoverflow.com/questions/14461851/how-to-have-an-automatic-timestamp-in-sqlite
# http://pymotw.com/2/sqlite3/

Questions

  1. What are some of the limitations with a Sqlite database, given that it is a primarily file-based system?
  2. How might you migrate this to a scalabe in-memory database? What code would need to be re-written?
  3. How might you migrate this system to a cloud solution?

Submission

I encourage you to submit your answers to the questions (and ask your own questions!) using the Class Activity Questions discussion board. You may also respond to questions or comments made by others, or ask follow-up questions there. Answer any reflective prompt questions in the Reflective Journal section of your OneNote Classroom personal section. You can find the link to the class notebook on the syllabus.