Select Git revision
This project manages its dependencies using Yarn.
Learn more
nbiot.be 12.64 KiB
import string
import gpio
# ------------------------------------------------------- #
# Requests #
# ------------------------------------------------------- #
class NBIoTRequestType
static var DEBUG = 0
static var MQTT = 1
static var COAP = 2
static var NTP = 3
static var PSM = 4
end
class NBIoTRequest
var request_type
var callback
def init(request_type, callback)
self.request_type = request_type
self.callback = callback
end
end
class NBIoTDebugRequest : NBIoTRequest
var cmd
var rsp
var retries
def init(cmd, rsp, retries)
super(self).init(NBIoTRequestType.DEBUG)
self.cmd = cmd
self.rsp = rsp
self.retries = retries
end
end
class NBIoTMQTTRequest : NBIoTRequest
var host
var port
var username
var password
var topic
var payload
def init(host, port, username, password, topic, payload, callback)
super(self).init(NBIoTRequestType.MQTT, callback)
self.host = host
self.port = port
self.username = username
self.password = password
self.topic = topic
self.payload = payload
end
end
class NBIoTCOAPRequest : NBIoTRequest
var host
var port
var path
var query
var topic
var payload
def init(host, port, path, method, query, payload, callback)
super(self).init(NBIoTRequestType.COAP, callback)
self.host = host
self.port = port
self.path = path
self.method = method
self.query = query
self.payload = payload
end
end
class NBIoTNTPRequest : NBIoTRequest
def init(callback)
super(self).init(NBIoTRequestType.NTP, callback)
end
end
class NBIoTPSMRequest : NBIoTRequest
var value
def init(value, callback)
super(self).init(NBIoTRequestType.PSM, callback)
self.value = value
end
end
# ------------------------------------------------------- #
# Connections #
# ------------------------------------------------------- #
class NBIoTMQTTConnection
var host
var port
var username
var password
def init(host, port, username, password)
self.host = host
self.port = port
self.username = username
self.password = password
end
def equals(other)
var equal = true
equal = equal && self.host == other.host
equal = equal && self.port == other.port
equal = equal && self.username == other.username
equal = equal && self.password == other.password
return equal
end
end
# ------------------------------------------------------- #
# Procedures #
# ------------------------------------------------------- #
class NBIoTCommand
var cmd
var rsp
var retries
def init(cmd, rsp, retries)
self.cmd = cmd
self.rsp = rsp
self.retries = retries
if self.retries == nil
self.retries = 0
end
end
end
class NBIoTProcedure
var ser
var request
var debug
var done
var aborted
var rsp_awaiting
var cmd_in_process
def init(ser, request)
self.ser = ser
self.request = request
self.debug = false
self.done = false
self.aborted = false
self.rsp_awaiting = false
self.cmd_in_process = nil
end
def is_done()
return self.done
end
def is_aborted()
return self.aborted
end
def send_cmd()
self.ser.write(bytes().fromstring(self.cmd_in_process.cmd))
self.rsp_awaiting = true
tasmota.log(string.format('NBT: Sending command \'%s\'', string.replace(self.cmd_in_process.cmd, '\r\n', '')), 2)
end
def read_rsp_contains_expected_rsp(return_rsp)
if self.rsp_awaiting
var rsp = self.ser.read().asstring()
if self.debug
print(rsp)
end
if string.find(rsp, self.cmd_in_process.rsp) >= 0
self.rsp_awaiting = false
tasmota.log(string.format('NBT: Received \'%s\'', self.cmd_in_process.rsp), 2)
if return_rsp
return rsp
end
return true
else
if self.cmd_in_process.retries > 0
self.cmd_in_process.retries -= 1
end
end
end
if return_rsp
return nil
else
return false
end
end
def fetch_read_rsp_if_contains_expected_rsp_or_send()
var rsp = self.read_rsp_contains_expected_rsp(true)
if rsp != nil
return rsp
else
self.send_cmd()
return nil
end
end
def read_rsp_contains_expected_rsp_or_send()
if self.read_rsp_contains_expected_rsp(false)
return true
else
self.send_cmd()
return false
end
end
def retries_exceeded()
return self.cmd_in_process.retries < 1
end
def execute() end
end
class NBIoTDebugProcedure : NBIoTProcedure
var cmd_debug
def init(ser, request)
super(self).init(ser)
self.debug = true
self.cmd_debug = NBIoTCommand(request.cmd + '\r\n', request.rsp, request.retries)
end
def execute()
if self.cmd_in_process == nil
self.cmd_in_process = self.cmd_debug
end
self.done = self.read_rsp_contains_expected_rsp_or_send()
self.aborted = self.retries_exceeded()
end
end
class NBIoTResetProcedure : NBIoTProcedure
var cmd_reset
var cmd_at
def init(ser)
super(self).init(ser)
self.cmd_reset = NBIoTCommand('AT+QRST=1\r\n', 'RDY')
self.cmd_at = NBIoTCommand('AT\r\n', 'OK')
end
def execute()
if self.cmd_in_process == nil
self.cmd_in_process = self.cmd_reset
elif self.cmd_in_process == self.cmd_reset
if self.read_rsp_contains_expected_rsp_or_send()
self.cmd_in_process = self.cmd_at
end
elif self.cmd_in_process == self.cmd_at
self.done = self.read_rsp_contains_expected_rsp_or_send()
end
end
end
class NBIoTDisablePSMProcedure : NBIoTProcedure
var cmd_register
var cmd_disable_psm
var cmd_disable_sleep
def init(ser)
super(self).init(ser)
self.cmd_register = NBIoTCommand('AT+CEREG=1\r\n', 'OK')
self.cmd_disable_psm = NBIoTCommand('AT+CPSMS=0\r\n', 'OK')
self.cmd_disable_sleep = NBIoTCommand('AT+QSCLK=0\r\n', 'OK')
end
def execute()
if self.cmd_in_process == nil
self.cmd_in_process = self.cmd_register
elif self.cmd_in_process == self.cmd_register
if self.read_rsp_contains_expected_rsp_or_send()
self.cmd_in_process = self.cmd_disable_psm
end
elif self.cmd_in_process == self.cmd_disable_psm
if self.read_rsp_contains_expected_rsp_or_send()
self.cmd_in_process = self.cmd_disable_sleep
end
elif self.cmd_in_process == self.cmd_disable_sleep
self.done = self.read_rsp_contains_expected_rsp_or_send()
end
end
end
class NBIoTNTPProcedure : NBIoTProcedure
var cmd_ntp
def init(ser, request)
super(self).init(ser, request)
self.cmd_ntp = NBIoTCommand('AT+QNTP=1,\"0.at.pool.ntp.org\"\r\n', '+QNTP: 0', 10)
end
def set_system_time(rsp)
var rsp_args = string.split(rsp, '\"')
var timestamp = nil
for rsp_arg : rsp_args
timestamp = tasmota.strptime(rsp_arg, "%y/%m/%d,%H:%M:%S")
if timestamp != nil
break
end
end
tasmota.cmd('time ' + str(timestamp['epoch'] + 3600)) # UTC + 1
end
def execute()
if self.cmd_in_process == nil
self.cmd_in_process = self.cmd_ntp
end
var rsp = self.fetch_read_rsp_if_contains_expected_rsp_or_send('+QNTP: 0', 'AT+QNTP=1,\"0.at.pool.ntp.org\"\r\n')
if rsp != nil
self.set_system_time(rsp)
self.done = true
end
self.aborted = self.retries_exceeded()
end
end
# ------------------------------------------------------- #
# Driver State #
# ------------------------------------------------------- #
class NBIoTDriverState
static var INIT = 0
static var IDLE = 1
static var RESET = 2
static var DISABLE_PSM = 3
static var READY = 4
static var BUSY = 5
end
# ------------------------------------------------------- #
# Driver #
# ------------------------------------------------------- #
class NBIoTDriver
var ser
var psm_eint
var state
var request_queue
var procedure
def init(rx, tx, psm_eint)
self.ser = serial(rx, tx, 115200, serial.SERIAL_8N1)
self.psm_eint = psm_eint
self.state = NBIoTDriverState.INIT
self.request_queue = []
self.procedure = nil
end
def queue_request(request)
self.request_queue.push(request)
end
def init_procedure(procedure)
self.procedure = procedure
end
def finish_procedure(done)
if done && self.procedure.request != nil && self.procedure.request.callback != nil
self.procedure.request.callback()
end
self.procedure = nil
end
def next_state(state)
tasmota.log(string.format('NBT: Transitioning from state %i to %i', self.state, state), 2)
self.state = state
end
def set_psm_pin(value)
gpio.digital_write(self.psm_eint, value)
end
def every_second()
if self.state == NBIoTDriverState.IDLE
return
elif self.state == NBIoTDriverState.INIT
self.set_psm_pin(1)
self.next_state(NBIoTDriverState.RESET)
elif self.state == NBIoTDriverState.RESET
if self.procedure == nil
self.init_procedure(NBIoTResetProcedure(self.ser))
end
self.procedure.execute()
if self.procedure.is_done()
self.finish_procedure()
self.next_state(NBIoTDriverState.DISABLE_PSM)
end
elif self.state == NBIoTDriverState.DISABLE_PSM
if self.procedure == nil
self.init_procedure(NBIoTDisablePSMProcedure(self.ser))
end
self.procedure.execute()
if self.procedure.is_done()
self.finish_procedure()
self.next_state(NBIoTDriverState.READY)
end
elif self.state == NBIoTDriverState.READY
if self.request_queue.size() > 0
var request = self.request_queue[0]
self.request_queue.remove(0)
var procedure = nil
if request.request_type == NBIoTRequestType.DEBUG
procedure = NBIoTDebugProcedure(self.ser, request)
elif request.request_type == NBIoTRequestType.NTP
procedure = NBIoTNTPProcedure(self.ser, request)
elif
tasmota.log(string.format('NBT: Request with type %i not supported, discarding request', request.request_type), 2)
end
if procedure != nil
self.init_procedure(procedure)
self.next_state(NBIoTDriverState.BUSY)
end
end
elif self.state == NBIoTDriverState.BUSY
if self.procedure.is_aborted() || self.procedure.is_done()
if self.procedure.is_aborted()
var cmd = string.replace(self.procedure.cmd_in_process.cmd, '\r\n', '')
tasmota.log(string.format('NBT: Exceeded retries at command %s, discarding request', cmd), 2)
end
self.finish_procedure(self.procedure.is_done())
self.next_state(NBIoTDriverState.READY)
else
self.procedure.execute()
end
else
tasmota.log('NBT: Invalid driver state, stopping driver', 2)
tasmota.remove_driver(self)
end
end
end
# ------------------------------------------------------- #
# nbiot module #
# ------------------------------------------------------- #
var nbiot = module('nbiot')
nbiot.init = def (m)
class nbiot
var _driver
def init()
self._driver = NBIoTDriver(18, 23, 16)
tasmota.add_driver(self._driver)
end
def set_power_save_mode(value, callback)
var request = NBIoTPSMRequest(value, callback)
self._driver.queue_request(request)
end
def sync_time(callback)
var request = NBIoTNTPRequest(callback)
self._driver.queue_request(request)
end
def mqtt_publish(host, port, username, password, topic, payload, callback)
if type(payload) == type('')
var request = NBIoTMQTTRequest(host, port, username, password, topic, payload, callback)
self._driver.queue_request(request)
return true
else
return false
end
end
def coap_get(host, port, path, query, callback)
if type(path) != type([]) || type(query) != type([])
return false
else
var request = NBIoTCOAPRequest(host, port, path, 'GET', query, nil, callback)
self._driver.queue_request(request)
return true
end
end
def coap_post(host, port, path, payload, callback)
if type(path) != type([])
return false
else
var request = NBIoTCOAPRequest(host, port, path, 'POST', [], payload, callback)
self._driver.queue_request(request)
return true
end
end
def debug_send(cmd, rsp, retries)
if retries == nil
retries = 0
end
var request = NBIoTDebugRequest(cmd, rsp, retries)
self._driver.queue_request(request)
end
end
return nbiot()
end
return nbiot