1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
|
from machine import Pin, ADC, Timer
import network
import time
import ujson
from uhashlib import sha256
import binascii
import math
import urequests
######################################################## CONFIG ########################################################
# Wifi settings:
# WIFI_ESSID = 'Your wifi ESSID (name)'
# WIFI_PSK = 'Your wifi key'
#
# Notification settings:
# NOTIFICATION_URL = 'https://automation.jaseg.de/notify/klingel'
# NOTIFICATION_SECRET = b'Your notification proxy secret for this endpoint'
#
# NOTIFICATION_COOLDOWN = 60 # how long to wait after sending a notification before sending the next, in seconds
#
# Detection settings
# MEAN_LEN = 8 # Window length for DC offset determination in seconds (1024ms to be exact)
# RMS_THRESHOLD = 1000 # Threshold for rms detection threshold over 1s window in ADC counts
######################################################### END ##########################################################
def wifi_connect():
iface = network.WLAN(network.STA_IF)
if not iface.isconnected():
print('Connecting to wifi... ')
iface.active(True)
iface.connect(WIFI_ESSID, WIFI_PSK)
for i in range(20):
if iface.isconnected():
print('Wifi connected. IP config: ', iface.ifconfig())
break
time.sleep(0.5)
else:
print("Couldn't connect to wifi.")
buf = [0] * 1024
capture = None
mean = 0
rms = 0
sample_tim = Timer(-1)
def start_sampling():
global sample_tim
buf_pos = 0
buf_sum = 0
mean_acc = []
adc = ADC(Pin(34))
adc.atten(ADC.ATTN_11DB)
def sample_cb(tim):
global buf, mean, rms, capture
nonlocal adc, buf_pos, buf_sum, mean_acc
val = adc.read()
buf[buf_pos] = val
buf_sum += val
buf_pos += 1
if buf_pos == len(buf):
buf_pos = 0
mean_acc = [buf_sum/len(buf)] + mean_acc[:MEAN_LEN-1]
mean = sum(mean_acc)/len(mean_acc)
buf_sum = 0
rms = math.sqrt( sum( (x-mean)**2 for x in buf )/len(buf) )
capture = list(buf) # Make a copy
sample_tim.init(period=1, mode=Timer.PERIODIC, callback=sample_cb) # period in ms
def uhmac(key, data):
blocksize = 64
key += bytes(64 - len(key))
tx = lambda s, x: bytes( b ^ x for b in s )
outer = sha256(tx(key, 0x5C))
inner = sha256(tx(key, 0x36))
inner.update(data)
outer.update(inner.digest())
return outer.digest()
def usign(secret, payload=None, seq=None):
payload = {'time': int(time.time()), 'd': payload}
if seq is not None:
payload['seq'] = seq
payload = ujson.dumps(payload).encode()
auth = binascii.hexlify(uhmac(secret, payload))
return ujson.dumps({'payload': payload, 'auth': auth})
def notify(**kwargs):
data = usign(NOTIFICATION_SECRET, kwargs)
print(time.time(), 'Notifying', NOTIFICATION_URL)
urequests.post(NOTIFICATION_URL, data=data, headers={'Content-Type': 'application/json'})
def klingel_notify(rms, capture):
notify(rms=rms, capture=capture)
def loop():
global rms, capture
while True:
if rms > RMS_THRESHOLD:
wifi_connect()
old_capture = capture
rms = 0
while rms == 0:
time.sleep(0.1)
rms = 0
klingel_notify(rms, [old_capture, capture])
time.sleep(NOTIFICATION_COOLDOWN)
time.sleep(0.1)
wifi_connect()
start_sampling()
loop()
|