CS375: Software Engineering - Cloud Architectures
Activity Goals
The goals of this activity are:- To facilitate software integration and scale by migrating to a cloud-based architecture
The Activity
Directions
Consider the activity models and answer the questions provided. First reflect on these questions on your own briefly, before discussing and comparing your thoughts with your group. Appoint one member of your group to discuss your findings with the class, and the rest of the group should help that member prepare their response. Answer each question individually from the activity, and compare with your group to prepare for our whole-class discussion. After class, think about the questions in the reflective prompt and respond to those individually in your notebook. Report out on areas of disagreement or items for which you and your group identified alternative approaches. Write down and report out questions you encountered along the way for group discussion.Model 1: Case Study: The IoT Sensor Framework
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from flask import Flask, jsonify, abort, make_response, request | |
from flask_cors import CORS | |
# Data Members | |
ws_database = None | |
ws_crypto = None | |
ws_flask_host = None | |
ws_flask_port = None | |
ws_name = None | |
ws_do_debug = False | |
app = Flask('Webserver') | |
CORS(app) | |
# call manually with abort(404) | |
@app.errorhandler(404) | |
def ws_web_not_found(error): | |
return make_response(jsonify({'error': ('Not found %s', (error))}), 404) | |
@app.errorhandler(400) | |
def ws_web_bad_request(error): | |
return make_response(jsonify({'error': ('Bad request %s', (error))}), 400) | |
@app.route('/api/audit', methods=['GET']) | |
# GET | |
def ws_web_audit(): | |
global ws_database | |
data = ws_database.get_audit() | |
return make_response(jsonify({'data': data}), 200) | |
@app.route('/api/iot/maxtime', methods=['GET']) | |
@app.route('/api/rssi/maxtime', methods=['GET']) | |
# GET | |
def ws_iot_web_max_rel_time(): | |
global ws_database | |
data = ws_database.get_max_rel_time() | |
return make_response(jsonify({'data': data}), 200) | |
@app.route('/api/iot/seconds/<int:lastnsec>', methods=['POST']) | |
@app.route('/api/iot/<int:starttime>/<int:endtime>', methods=['POST']) | |
@app.route('/api/iot/<int:starttime>', methods=['POST']) | |
@app.route('/api/iot', methods=['POST', 'PUT']) | |
@app.route('/api/rssi/seconds/<int:lastnsec>', methods=['POST']) | |
@app.route('/api/rssi/<int:starttime>/<int:endtime>', methods=['POST']) | |
@app.route('/api/rssi/<int:starttime>', methods=['POST']) | |
@app.route('/api/rssi', methods=['POST', 'PUT']) | |
# PUT: | |
# Content-Type: 'application/json' | |
# { "data": { "db_password": "abc123", "relative_time": 500, "interrogator_time": "3/18/2014 10:59:19.123456 AM", "freeform": "..."} } | |
# POST (acting as a GET with a body): | |
# Content-Type: 'application/json' | |
# { "data": { "db_password": 'str'} } | |
def ws_iot_web(starttime=-1, endtime=-1, lastnsec=-1): | |
if request.method == 'PUT': | |
return ws_iot_add_data() | |
elif request.method == 'POST': | |
return ws_iot_get_all_data(starttime, endtime, lastnsec) | |
else: | |
abort(400) | |
def ws_iot_get_all_data(starttime=-1, endtime=-1, lastnsec=-1): | |
global ws_database | |
if request.json and 'data' in request.json: | |
db_pw = request.json['data'].get('db_password', "") | |
else: | |
db_pw = '' | |
if lastnsec == -1: | |
if starttime == -1 and endtime == -1: | |
data = ws_database.fetch_all(db_pw=db_pw) | |
elif endtime == -1: | |
data = ws_database.fetch_since(starttime, db_pw) | |
else: | |
data = ws_database.fetch_between_window(starttime, endtime, db_pw) | |
else: | |
data = ws_database.fetch_last_n_sec(lastnsec, db_pw) | |
return make_response(jsonify({'data': data}), 200) | |
def getwithdefault(d, key, default): | |
result = default | |
if key in d: | |
result = d[key] | |
return result | |
def ws_iot_add_data(): | |
global ws_do_debug | |
global ws_database | |
if ws_do_debug: | |
print(request.json) | |
if not request.json: | |
abort(400) | |
if (not isinstance(request.json, dict)) and (not isinstance(request.json, list)): | |
abort(400) | |
insert_list = [] | |
if isinstance(request.json, dict): | |
insert_list.append(request.json) | |
else: | |
insert_list = request.json | |
for row in insert_list: | |
if not ('data' in row): | |
abort(400) | |
for row in insert_list: | |
db_pw = getwithdefault(row['data'], 'db_password', "") | |
freeform = getwithdefault(row['data'], 'freeform', "") | |
relative_time = getwithdefault(row['data'], 'relative_time', "-1") | |
interrogator_time = getwithdefault( | |
row['data'], 'interrogator_time', "-1") | |
relative_time = int(relative_time) | |
interrogator_time = int(interrogator_time) | |
ws_database.insert_row(relative_time, interrogator_time, freeform, db_pw=db_pw) | |
return make_response(jsonify({'success': str(len(insert_list)) + ' object(s) created'}), 201) | |
def ws_start(crypto, database, flask_host='0.0.0.0', flask_port=5000, do_debug=False, use_reloader=False): | |
global ws_crypto | |
global ws_database | |
global ws_flask_host | |
global ws_flask_port | |
global ws_do_debug | |
ws_crypto = crypto | |
ws_database = database | |
ws_flask_host = flask_host | |
ws_flask_port = flask_port | |
ws_do_debug = do_debug | |
context = ws_crypto.get_ssl_context() | |
app.run(debug=ws_do_debug, port=ws_flask_port, host=ws_flask_host, ssl_context=context, threaded=False, | |
use_reloader=use_reloader) # multithreaded web server cannot share database connection | |
# References | |
# http://blog.miguelgrinberg.com/post/designing-a-restful-api-with-python-and-flask | |
# @app.route('/todo/api/v1/tasks', methods = ['POST']) with request.json | |
# @app.route('/todo/api/v1/tasks/<int:task_id>', methods = ['GET']) uses variable task_id | |
# http://flask.pocoo.org/snippets/111/ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sqlite3 | |
import base64 | |
import os | |
import numpy | |
from database import Database | |
import threading | |
import queue | |
from time import sleep | |
import json | |
class SqliteDatabase(Database): | |
def __init__(self, crypto, db_path='database.db', flush=False, dispatchsleep=0): | |
Database.__init__(self, crypto, db_path=db_path, flush=flush) | |
self.dispatchsleep = dispatchsleep | |
self.insertion_queue = queue.Queue() | |
self.dispatcher_thread = threading.Thread( | |
target=self.dispatcher, args=()) | |
self.dispatcher_thread.start() | |
def dispatcher(self): | |
conn = self.open_db_connection() | |
c = conn.cursor() | |
while 1: | |
queuelist = dict() | |
(row, db_pw) = self.insertion_queue.get(block=True) | |
if not (db_pw in queuelist): | |
queuelist[db_pw] = [] | |
queuelist[db_pw].append(row) | |
# http://stackoverflow.com/questions/156360/get-all-items-from-thread-queue | |
# while we're here, try to pick up any more items that were inserted into the queue | |
while 1: | |
try: | |
(row, db_pw) = self.insertion_queue.get_nowait() | |
if not (db_pw in queuelist): | |
queuelist[db_pw] = [] | |
queuelist[db_pw].append(row) | |
except queue.Empty: | |
break | |
for db_pw in queuelist: | |
self.db_password = db_pw | |
# combine many rows together into a single insert, each with its own password | |
rowlist = [] | |
for row in queuelist[db_pw]: | |
rowlist.append(row) | |
#print(row) | |
# the additional interrogatortime entries are for the encryption function which requires a counter to synchronize stream encryption and decryption; this time should be to the microsecond (6 places after the decimal for seconds) to ensure uniqueness, but can be less precise if the interrogator resolution is lower. relative_time is expected in microseconds, and both relativetime and interrogatortime are assumed to be whole numbers (i.e. epoch time) | |
c.executemany('INSERT INTO IOTD (relative_timestamp, interrogator_timestamp, freeform) VALUES (?,?,encrypt(?,?))', rowlist) | |
conn.commit() | |
if self.dispatchsleep > 0: | |
# if desired, sleep the dispatcher for a short time to queue up some inserts and give the producer some CPU time | |
sleep(self.dispatchsleep) | |
conn.close() | |
def close_db_connection(self, thread='main'): | |
while self.insertion_queue.qsize() > 0: | |
sleep(5+2*self.dispatchsleep) # wait for dispatchers to finish | |
def __del__(self): | |
self.close_db_connection() | |
def open_db_connection(self): | |
# don't store the connection because each thread requires its own | |
conn = sqlite3.connect(self.db_path) | |
os.chmod(self.db_path, 0o600) | |
sqlite3.enable_callback_tracebacks(True) # for user defined function exceptions | |
conn.create_function('encrypt', 2, self.db_encrypt) | |
conn.create_function('decrypt', 2, self.db_decrypt) | |
self.init_database(conn) | |
return conn # don't forget to conn.close() | |
def db_encrypt(self, s, counter): | |
# counter = int(counter) % 10^16 # counter must be at most 16 digits | |
counter = int(str(counter)[-self.crypto.MAX_COUNTER_DIGITS:]) # counter must be at most 16 digits, take rightmost 16 characters | |
if type(s) is int: | |
val = str(s) | |
elif type(s) is float: | |
val = str(s) | |
else: | |
val = s | |
aes = self.crypto.get_db_aes(self.db_password, counter) | |
padded = self.crypto.pad(val) | |
enc = aes.encrypt(padded) | |
b64enc = base64.b64encode(enc) | |
return b64enc | |
def db_decrypt(self, s, counter): | |
# counter = int(counter) % 10^16 # counter must be at most 16 digits | |
counter = int(str(counter)[-self.crypto.MAX_COUNTER_DIGITS:]) # counter must be at most 16 digits, so take the rightmost 16 characters of the string | |
aes = self.crypto.get_db_aes(self.db_password, counter) | |
b64dec = base64.b64decode(s) | |
dec = aes.decrypt(b64dec) | |
unpaddec = self.crypto.unpad(dec) | |
unpaddec = unpaddec.decode() | |
return unpaddec | |
def init_database(self, conn): | |
if self.flush == True: | |
self.flush_database(conn) | |
c = conn.cursor() | |
c.execute('''CREATE TABLE IF NOT EXISTS IOTD(id INTEGER PRIMARY KEY, absolute_timestamp DATETIME DEFAULT CURRENT_TIMESTAMP, relative_timestamp BIGINT, interrogator_timestamp BIGINT, freeform TEXT)''') | |
conn.commit() | |
def flush_database(self, conn): | |
c = conn.cursor() | |
c.execute('''DROP TABLE IF EXISTS IOTD''') | |
conn.commit() | |
# get max data time in the db | |
def get_max_rel_time(self): | |
conn = self.open_db_connection() | |
c = conn.cursor() | |
data = [] | |
for row in c.execute("SELECT MAX(relative_timestamp) FROM IOTD"): | |
d = dict() | |
d['max_relative_timestamp'] = row[0] | |
data.append(d) | |
conn.commit() | |
conn.close() | |
return data | |
def insert_row(self, relativetime, interrogatortime, freeform, db_pw=''): | |
# the additional interrogatortime entries are for the encryption function which requires a counter to synchronize stream encryption and decryption; this time should be to the microsecond (6 places after the decimal for seconds) to ensure uniqueness, but can be less precise if the interrogator resolution is lower. relative_time is expected in microseconds, and both relativetime and interrogatortime are assumed to be whole numbers (i.e. epoch time) | |
# counter entries (i.e., interrogatortime) go after the field being entered into the row tuple | |
freeformjson = json.dumps(freeform) | |
row = (relativetime, interrogatortime, freeformjson, interrogatortime) | |
self.insertion_queue.put((row, db_pw)) # to be read by dispatcher | |
def fetch_all(self, db_pw=''): | |
self.db_password = db_pw | |
data = [] | |
conn = self.open_db_connection() | |
c = conn.cursor() | |
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD ORDER BY interrogator_timestamp ASC"): | |
d = dict() | |
d['id'] = row[0] | |
d['absolute_timestamp'] = row[1] | |
d['relative_timestamp'] = row[2] | |
d['interrogator_timestamp'] = row[3] | |
d['freeform'] = row[4] | |
data.append(d) | |
conn.commit() | |
conn.close() | |
return data | |
def fetch_last_window(self, windowsize, db_pw=''): | |
self.db_password = db_pw | |
data = [] | |
conn = self.open_db_connection() | |
c = conn.cursor() | |
input = (windowsize, ) | |
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD ORDER BY interrogator_timestamp DESC LIMIT ?", input): | |
d = dict() | |
d['id'] = row[0] | |
d['absolute_timestamp'] = row[1] | |
d['relative_timestamp'] = row[2] | |
d['interrogator_timestamp'] = row[3] | |
d['freeform'] = row[4] | |
data.append(d) | |
conn.commit() | |
conn.close() | |
return data | |
def fetch_since(self, since, db_pw=''): | |
self.db_password = db_pw | |
data = [] | |
conn = self.open_db_connection() | |
c = conn.cursor() | |
input = (since,) | |
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD WHERE relative_timestamp >= ? ORDER BY interrogator_timestamp ASC", input): | |
d = dict() | |
d['id'] = row[0] | |
d['absolute_timestamp'] = row[1] | |
d['relative_timestamp'] = row[2] | |
d['interrogator_timestamp'] = row[3] | |
d['freeform'] = row[4] | |
data.append(d) | |
conn.commit() | |
conn.close() | |
return data | |
def fetch_between_window(self, start, end, db_pw=''): | |
self.db_password = db_pw | |
data = [] | |
conn = self.open_db_connection() | |
c = conn.cursor() | |
input = (start, end) | |
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD WHERE relative_timestamp >= ? AND relative_timestamp <= ? ORDER BY interrogator_timestamp ASC", input): | |
d = dict() | |
d['id'] = row[0] | |
d['absolute_timestamp'] = row[1] | |
d['relative_timestamp'] = row[2] | |
d['interrogator_timestamp'] = row[3] | |
d['freeform'] = row[4] | |
data.append(d) | |
conn.commit() | |
conn.close() | |
return data | |
def fetch_last_n_sec(self, n, db_pw=''): | |
self.db_password = db_pw | |
data = [] | |
conn = self.open_db_connection() | |
c = conn.cursor() | |
for row in c.execute("SELECT id, absolute_timestamp, relative_timestamp, interrogator_timestamp, decrypt(freeform, interrogator_timestamp) FROM IOTD WHERE absolute_timestamp >= datetime(?, ?) ORDER BY interrogator_timestamp ASC", ('now', '-' + str(n) + ' seconds')): | |
d = dict() | |
d['id'] = row[0] | |
d['absolute_timestamp'] = row[1] | |
d['relative_timestamp'] = row[2] | |
d['interrogator_timestamp'] = row[3] | |
d['freeform'] = row[4] | |
data.append(d) | |
conn.commit() | |
conn.close() | |
return data | |
# no auditing in sqlite | |
def flush_audit(self, conn): | |
pass | |
def get_audit(self): | |
return [] | |
def db_log(self, text): | |
pass | |
# References: | |
# http://www.pythoncentral.io/introduction-to-sqlite-in-python/ | |
# https://docs.python.org/2/library/sqlite3.html | |
# http://stackoverflow.com/questions/14461851/how-to-have-an-automatic-timestamp-in-sqlite | |
# http://pymotw.com/2/sqlite3/ |
Questions
- What are some of the limitations with a Sqlite database, given that it is a primarily file-based system?
- How might you migrate this to a scalabe in-memory database? What code would need to be re-written?
- How might you migrate this system to a cloud solution?