Merge remote-tracking branch 'rpi/main'

This commit is contained in:
Aadi Desai 2022-03-07 23:29:56 +00:00
commit 41f06a6533
No known key found for this signature in database
GPG key ID: CFFFE425830EF4D9
8 changed files with 616 additions and 0 deletions

161
.gitignore vendored Normal file
View file

@ -0,0 +1,161 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
cover/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
db.sqlite3-journal
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
.pybuilder/
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
# For a library or package, you might want to ignore these files since the code is
# intended to run in multiple environments; otherwise, check them in:
# .python-version
# pipenv
# According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
# However, in case of collaboration, if having platform-specific dependencies or dependencies
# having no cross-platform support, pipenv may install dependencies that don't work, or not
# install all needed dependencies.
#Pipfile.lock
# poetry
# Similar to Pipfile.lock, it is generally recommended to include poetry.lock in version control.
# This is especially recommended for binary packages to ensure reproducibility, and is more
# commonly ignored for libraries.
# https://python-poetry.org/docs/basic-usage/#commit-your-poetrylock-file-to-version-control
#poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Celery stuff
celerybeat-schedule
celerybeat.pid
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pytype static type analyzer
.pytype/
# Cython debug symbols
cython_debug/
# PyCharm
# JetBrains specific template is maintainted in a separate JetBrains.gitignore that can
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
#.idea/
# VSCode
.vscode/
# Matlab files
.m
# Secrets file
.secrets.yml

13
README.md Normal file
View file

@ -0,0 +1,13 @@
# ELEC60013 Embedded Systems: Coursework 1
- Raspberry Pi Zero W IoT Device
- Competing product: Fi Collar
- Our features can outcompete:
- Battery Life
- Tracking Accuracy?
- Much Larger Network
- Health Metrics
- Can do advanced features too:
- Sleep Tracking
- Step Counts
- Escape Detection? As soon as out of range of owner device?

73
hci.py Normal file
View file

@ -0,0 +1,73 @@
import base64
import subprocess
from time import sleep
from struct import pack
class HCIBroadcaster:
def __init__(self, b64key):
self.key = base64.b64decode(b64key)
def _advertisement_template(self):
adv = ""
adv += "1e" # length (30)
adv += "ff" # manufacturer specific data
adv += "4c00" # company ID (Apple)
adv += "1219" # offline finding type and length
adv += "00" # state
for _ in range(22): # key[6:28]
adv += "00"
adv += "00" # first two bits of key[0]
adv += "00" # hint
return bytearray.fromhex(adv)
def _bytes_to_strarray(self, bytes_, with_prefix=False):
if with_prefix:
return [hex(b) for b in bytes_]
else:
return [format(b, "x") for b in bytes_]
def _run_hci_cmd(self, cmd, hci="hci0", wait=1):
cmd_ = ["hcitool", "-i", hci, "cmd"]
cmd_ += cmd
print(cmd_)
subprocess.run(cmd_)
if wait > 0:
sleep(wait)
def start_advertising(self, interval_ms=2000):
key = self.key
addr = bytearray(key[:6])
addr[0] |= 0b11000000
adv = self._advertisement_template()
adv[7:29] = key[6:28]
adv[29] = key[0] >> 6
print(f"key ({len(key):2}) {key.hex()}")
print(f"address ({len(addr):2}) {addr.hex()}")
print(f"payload ({len(adv):2}) {adv.hex()}")
# Set BLE address
self._run_hci_cmd(
["0x3f", "0x001"] + self._bytes_to_strarray(addr, with_prefix=True)[::-1]
)
subprocess.run(["systemctl", "restart", "bluetooth"])
sleep(1)
# Set BLE advertisement payload
self._run_hci_cmd(
["0x08", "0x0008"] + [format(len(adv), "x")] + self._bytes_to_strarray(adv)
)
# Set BLE advertising mode
interval_enc = pack("<h", interval_ms)
hci_set_adv_params = ["0x08", "0x0006"]
hci_set_adv_params += self._bytes_to_strarray(interval_enc)
hci_set_adv_params += self._bytes_to_strarray(interval_enc)
hci_set_adv_params += ["03", "00", "00", "00", "00", "00", "00", "00", "00"]
hci_set_adv_params += ["07", "00"]
self._run_hci_cmd(hci_set_adv_params)
# Start BLE advertising
self._run_hci_cmd(["0x08", "0x000a"] + ["01"], wait=0)

136
lis3dh.py Normal file
View file

@ -0,0 +1,136 @@
"""Library for interacting with LIS3DH triple-axis accelerometer."""
from importlib.resources import read_text
from re import S
from tabnanny import check
import smbus2
from time import sleep
# Register addresses
_CTRL_REG1 = bytes([0x20])
_CTRL_REG2 = bytes([0x21])
_CTRL_REG3 = bytes([0x22])
_CTRL_REG4 = bytes([0x23])
_CTRL_REG5 = bytes([0x24])
_CTRL_REG6 = bytes([0x25])
_REF_REG = bytes([0x26])
_STATUS_REG = bytes([0x27])
_OUT_X_L = bytes([0x28])
_OUT_X_H = bytes([0x29])
_OUT_Y_L = bytes([0x2A])
_OUT_Y_H = bytes([0x2B])
_OUT_Z_L = bytes([0x2C])
_OUT_Z_H = bytes([0x2D])
_INT1_CFG = bytes([0x30])
_INT1_SRC = bytes([0x31])
_INT1_THS = bytes([0x32])
_INT1_DURATION = bytes([0x33])
_INT2_CFG = bytes([0x34])
_INT2_SRC = bytes([0x35])
_INT2_THS = bytes([0x36])
_INT2_DURATION = bytes([0x37])
_CLICK_CFG = bytes([0x38])
# Config flags
SAMPLERATE_1HZ = bytes([0x17])
SAMPLERATE_10HZ = bytes([0x27])
SAMPLERATE_25HZ = bytes([0x37])
HP_DISABLE = bytes([0x00])
CTRL_REG3_V = bytes([0x40])
CTRL_REG4_V = bytes([0x00]) # sensitivity set to 2g
CTRL_REG5_V = bytes([0x08])
INT1_THS_V = bytes([0x16]) # free-fall threshold at 350 mg
INT1_DURATION_V = bytes([0x03])
INT1_CFG_V = bytes([0x95])
EMPTY = bytes([0x00])
class LIS3DH:
def __init__(self, i2cBus, samplerate=10, i2cAddress=0x18):
sleep(0.005)
self.i2c = i2cBus
self.addr = i2cAddress
self.samplerate = samplerate
i2cBus.pec = True # enable smbus2 Packet Error Checking
sample_modes = {
0:0x0, 1:0x1, 10:0x2, 25:0x3, 50:0x4,
100:0x5, 200:0x6, 400:0x7
}
# Check if user-entered values are correct
if samplerate in sample_modes:
self.samplerate = sample_modes[samplerate]
else:
raise Exception("Invalid sample rate.")
# Configure all registers in +-2g mode
c0 = smbus2.i2c_msg.write(self.addr, [0x20,(sample_modes[samplerate]<<4)|0xF]) # Initialise in low power mode
c1 = smbus2.i2c_msg.write(self.addr, [0x21,0x0A])
c2 = smbus2.i2c_msg.write(self.addr, [0x22,0x40])
c3 = smbus2.i2c_msg.write(self.addr, [0x23,0x00])
c4 = smbus2.i2c_msg.write(self.addr, [0x24,0x0A])
c5 = smbus2.i2c_msg.write(self.addr, [0x25,0x20])
c6 = smbus2.i2c_msg.write(self.addr, [0x2E,0x00])
c7 = smbus2.i2c_msg.write(self.addr, [0x30,0x95])
c8 = smbus2.i2c_msg.write(self.addr, [0x32,0x16])
c9 = smbus2.i2c_msg.write(self.addr, [0x33,0x03])
c10 = smbus2.i2c_msg.write(self.addr, [0x34,0x3F])
c11 = smbus2.i2c_msg.write(self.addr, [0x36,0x32])
c12 = smbus2.i2c_msg.write(self.addr, [0x24,0x0A]) # Configure 0x24 again
self.i2c.i2c_rdwr(c0,c1,c2,c3,c4,c5,c6,c7,c8,c9,c10,c11,c12)
def resetint1(self) -> bool:
'''Read INT1_SRC to reset it after an interrupt event.'''
int1_src_loc = smbus2.i2c_msg.write(self.addr, [0x31])
read_int1_src = smbus2.i2c_msg.read(self.addr, 1)
self.i2c.i2c_rdwr(int1_src_loc,read_int1_src)
if read_int1_src.buf[0] != None:
return True
else:
return False
def resetint2(self) -> bool:
'''Read INT2_SRC to reset it after an interrupt event.'''
int2_src_loc = smbus2.i2c_msg.write(self.addr, [0x35])
read_int2_src = smbus2.i2c_msg.read(self.addr, 1)
self.i2c.i2c_rdwr(int2_src_loc,read_int2_src)
if read_int2_src.buf[0] != None:
return True
else:
return False
def readAll(self) -> list:
'''Read acceleration data from all axes. Returns values as a list [X,Y,Z].'''
check_status = smbus2.i2c_msg.write(self.addr, [0x27])
x = smbus2.i2c_msg.read(self.addr, 1)
y = smbus2.i2c_msg.read(self.addr, 1)
z = smbus2.i2c_msg.read(self.addr, 1)
prepare_x = smbus2.i2c_msg.write(self.addr, [0x29])
prepare_y = smbus2.i2c_msg.write(self.addr, [0x2B])
prepare_z = smbus2.i2c_msg.write(self.addr, [0x2D])
status = smbus2.i2c_msg.read(self.addr, 1)
self.i2c.i2c_rdwr(check_status, status)
while status.buf[0][0] & 0b1111 != 0b1111: # Wait for data to be available
sleep(0.001)
self.i2c.i2c_rdwr(check_status, status)
if status.buf[0][0] & 0b1111 == 0b1111: # If data is available, read
self.i2c.i2c_rdwr(prepare_x, x)
self.i2c.i2c_rdwr(prepare_y, y)
self.i2c.i2c_rdwr(prepare_z, z)
X = int.from_bytes(x.buf[0],"big")
Y = int.from_bytes(y.buf[0],"big")
Z = int.from_bytes(z.buf[0],"big")
# Convert from binary 2s complement to useful data
new_values = []
for D in [X,Y,Z]:
MSB = D >> 7
if MSB == 1:
res = (-128 + (D - 128))*self.resolution/128
else:
res = (D*self.resolution)/128
new_values.append(res)
return new_values
else:
return None # Should never get here lol

62
main.py Normal file
View file

@ -0,0 +1,62 @@
import os, sys
from time import sleep
import yaml
import paho.mqtt.client as mqtt
import json, smbus2, si7201, tmp006, lis3dh, hci, gpiozero
# Global Sensor Data Variables
dailysteps = 0
fallen = False
def incrementStepCount(interrupt, sensor) -> None:
global dailysteps
dailysteps += 1
sensor.resetint2()
def setFallen(interrupt, sensor) -> None:
global fallen
fallen = True
sensor.resetint1()
# Setup
bus = smbus2.SMBus(1) # set up I2C bus 1
temphum = si7201.Si7201(bus, 0x40) # set up Si7201 sensor
temphum.reset() # reset Si7201
irtemp = tmp006.TMP006(bus, 0x41, tmp006.SAMPLERATE_4HZ) # set up TMP006 sensor
irtemp.active = 1 # turn on TMP006
accel = lis3dh.LIS3DH(bus, 10, 0x18) # set up LIS3DH sensor
fall = gpiozero.Button(18, pull_up = False) # GPIO17: Freefall Interrupt (INT1)
fall.when_activated = lambda: setFallen(fall, accel) # set fallen to True when Freefall Interrupt (INT1) is triggered
step = gpiozero.Button(17, pull_up = False) # GPIO18: Step Counter Interrupt (INT2)
step.when_activated = lambda: incrementStepCount(step, accel) # increment step count when Step Counter Interrupt (INT2) is triggered
with open(".secrets.yml", "r") as secrets:
try:
secrets = yaml.load(secrets, Loader = yaml.SafeLoader)
key = secrets["key"] # Get Base64 encoded device public key from secrets file
except ImportError as exc:
print(exc)
sleep(60) # 60s delay before restarting
os.execl(sys.executable, os.path.abspath(__file__), *sys.argv) # Restart propgram
btcast = hci.HCIBroadcaster(key) # set up HCI Broadcaster
client = mqtt.Client("RaspberryPi") # set up MQTT client
client.connect("add8.duckdns.org", 8883, 60) # connect to MQTT broker
client.loop_start() # Start a new thread to handle sending MQTT messages
# Main Loop
while True:
data = {
"devID": "testdoggo",
"air_temp": temphum.temperature,
"day_steps": dailysteps,
"hum_perc": temphum.humidity,
"pet_temp": irtemp.temperature
}
mqtt_data = json.dumps(data)
client.publish("/data", mqtt_data)
btcast.start_advertising() # Send out BT advertisement
sleep(5) # Sleep for 5 seconds to lower power consumption

8
requirements.txt Normal file
View file

@ -0,0 +1,8 @@
colorzero==2.0
gpiozero==1.6.2
paho-mqtt==1.6.1
python-dateutil==2.8.2
PyYAML==6.0
RPi.GPIO==0.7.1
six==1.16.0
smbus2==0.4.1

40
si7201.py Normal file
View file

@ -0,0 +1,40 @@
"""Library for interacting with Si7201 Temperature & Humidity Sensor."""
import smbus2
from time import sleep
class Si7201:
def __init__(self, i2cBus, i2cAddress=0x40):
self.i2c = i2cBus
self.addr = i2cAddress
i2cBus.pec = True # enable smbus2 Packet Error Checking
@property
def temperature(self, decimals=1):
"""Measured temperature in degrees Celsius, with configurable decimal places, default 1."""
measure_temp = smbus2.i2c_msg.write(self.addr, [0xF3])
read_temp = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(measure_temp)
sleep(0.1)
self.i2c.i2c_rdwr(read_temp)
temp_code = int.from_bytes(read_temp.buf[0] + read_temp.buf[1], "big")
temp = round(((175.72 * temp_code) / 65536 - 46.85), decimals)
return temp
@property
def humidity(self, decimals=1):
"""Measured relative humidity in percent, with configurable decimal places, default 1."""
measure_hum = smbus2.i2c_msg.write(self.addr, [0xF5])
read_rh = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(measure_hum)
sleep(0.1)
self.i2c.i2c_rdwr(read_rh)
rh_code = int.from_bytes(read_rh.buf[0] + read_rh.buf[1], "big")
hum = round((125 * rh_code) / 65536 - 6.0, decimals)
return hum
def reset(self):
"""Reset the sensor."""
resetcmd = smbus2.i2c_msg.write(self.addr, [0xFE])
self.i2c.i2c_rdwr(resetcmd)

123
tmp006.py Normal file
View file

@ -0,0 +1,123 @@
"""Library for interacting with TMP006 Thermopile (IR Temperature) Sensor."""
import smbus2
from time import sleep
# Pointer Register Locations
_REG_VOBJ = bytes([0x00])
_REG_TAMB = bytes([0x01])
_REG_CNFG = bytes([0x02])
_REG_M_ID = bytes([0xFE])
_REG_D_ID = bytes([0xFF])
# Configuration Flags
_MODE_ON = bytes([0x70])
SAMPLERATE_4HZ = bytes([0x00])
SAMPLERATE_2HZ = bytes([0x02])
SAMPLERATE_1HZ = bytes([0x04])
SAMPLERATE_0_5HZ = bytes([0x06])
SAMPLERATE_0_25HZ = bytes([0x08])
_DRDY_EN = bytes([0x01])
class TMP006:
def __init__(self, i2cBus, i2cAddress=0x40, samplerate=SAMPLERATE_1HZ):
self.i2c = i2cBus
self.addr = i2cAddress
self.samplerate = samplerate
i2cBus.pec = True # enable smbus2 Packet Error Checking
self.config = bytes([0x00, 0x00])
self.config = bytes(
[self.config[0] | samplerate[0] | _MODE_ON[0] | _DRDY_EN[0], self.config[1]]
)
ptrConfig = smbus2.i2c_msg.write(self.addr, _REG_CNFG)
writeConfig = smbus2.i2c_msg.write(self.addr, self.config)
self.i2c.i2c_rdwr(ptrConfig, writeConfig)
@property
def temperature(self) -> float:
"""Measured temperature in degrees Celsius, to 2 decimel places"""
Vobj = self.vObject()
Tdie = self.tAmbient()
# Values for Calculations
S0 = 6.4e-14 # Calibration Factor TODO: Calibrate
a1 = 1.75e-3
a2 = -1.678e-5
Tref = 298.15
b0 = -2.94e-5
b1 = -5.7e-7
b2 = 4.63e-9
c2 = 13.4
# Calculate Sensitivity of Thermopile
S = S0 * (1 + a1 * (Tdie - Tref) + a2 * ((Tdie - Tref) ** 2))
# Calculate Coltage offset due to package thermal resistance
Voffset = b0 + b1 * (Tdie - Tref) + b2 * ((Tdie - Tref) ** 2)
# Calculate Seebeck coefficients
fVobj = (Vobj - Voffset) + c2 * ((Vobj - Voffset) ** 2)
# Calculate object temperature in Kelvin
Tobj = (Tdie**4 + (fVobj / S)) ** 0.25
# Convert from Kelvin to Celsius
return round(Tobj - 273.15, 2)
@property
def active(self) -> bool:
"""Check if Sensor is powered on."""
ptrPower = smbus2.i2c_msg.write(self.addr, _REG_CNFG)
power = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(ptrPower, power)
return power.buf[0][0] & _MODE_ON[0] != 0
@active.setter
def active(self, value: bool):
"""Set the sensor to active or inactive."""
if value:
ptrPower = smbus2.i2c_msg.write(self.addr, _REG_CNFG)
power = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(ptrPower, power)
newPower = bytes([power.buf[0][0] | _MODE_ON[0], power.buf[1][0]])
updatePower = smbus2.i2c_msg.write(self.addr, newPower)
self.i2c.i2c_rdwr(ptrPower, updatePower)
else:
ptrPower = smbus2.i2c_msg.write(self.addr, _REG_CNFG)
power = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(ptrPower, power)
newPower = bytes([power.buf[0][0] & ~_MODE_ON[0], power.buf[1][0]])
updatePower = smbus2.i2c_msg.write(self.addr, newPower)
self.i2c.i2c_rdwr(ptrPower, updatePower)
def vObject(self) -> float:
"""Reading from Sensor Voltage Register in Volts"""
ptrVobject = smbus2.i2c_msg.write(self.addr, _REG_VOBJ)
readVobject = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(ptrVobject, readVobject)
scaledVoltage = int.from_bytes(
readVobject.buf[0] + readVobject.buf[1], byteorder="big", signed=True
)
return round(scaledVoltage * 156.25e-9, 1)
# convert to Volts (156.25nV per LSB * 1e-9 for scaling from nV to Volts)
def tAmbient(self) -> float:
"""Reading from Ambient Temperature Register in Degrees Celsius"""
ptrTambient = smbus2.i2c_msg.write(self.addr, _REG_TAMB)
readTambient = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(ptrTambient, readTambient)
scaledTemp = int.from_bytes(
readTambient.buf[0] + readTambient.buf[1], byteorder="big", signed=True
)
return round(scaledTemp * 0.0078125, 1)
# convert to degrees Celsius (1/32 for scaling * 1/4 for 2 bit shift)
@property
def manID(self) -> bytes:
"""Sensor manufacturer ID"""
ptrManID = smbus2.i2c_msg.write(self.addr, _REG_M_ID)
readManID = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(ptrManID, readManID)
return readManID.buf[0] + readManID.buf[1]
@property
def devID(self) -> bytes:
"""Sensor device ID"""
ptrDevID = smbus2.i2c_msg.write(self.addr, _REG_D_ID)
readDevID = smbus2.i2c_msg.read(self.addr, 2)
self.i2c.i2c_rdwr(ptrDevID, readDevID)
return readDevID.buf[0] + readDevID.buf[1]