Skip to content
Snippets Groups Projects
Select Git revision
  • 6f28bece9ca6b0c4548ffe5d9cc9316a2a61d720
  • main default protected
  • renovate/lock-file-maintenance
  • demo protected
  • person-select-custom
  • dbp-translation-component
  • icon-set-mapping
  • port-i18next-parser
  • remove-sentry
  • favorites-and-recent-files
  • revert-6c632dc6
  • lit2
  • advertisement
  • wc-part
  • automagic
  • publish
  • wip-cleanup
  • demo-file-handling
18 results

yarn.lock

Blame
  • This project manages its dependencies using Yarn. Learn more
    nbiot.be 12.64 KiB
    import string
    import gpio
    
    # ------------------------------------------------------- #
    #                         Requests                        #
    # ------------------------------------------------------- #
    class NBIoTRequestType
      static var DEBUG = 0
      static var MQTT = 1
      static var COAP = 2
      static var NTP = 3
      static var PSM = 4
    end
    
    class NBIoTRequest
      var request_type
      var callback
    
      def init(request_type, callback)
        self.request_type = request_type
        self.callback = callback
      end
    end
    
    class NBIoTDebugRequest : NBIoTRequest
      var cmd
      var rsp
      var retries
    
      def init(cmd, rsp, retries)
        super(self).init(NBIoTRequestType.DEBUG)
        self.cmd = cmd
        self.rsp = rsp
        self.retries = retries
      end
    end
    
    class NBIoTMQTTRequest : NBIoTRequest
      var host
      var port
      var username
      var password
      var topic
      var payload
    
      def init(host, port, username, password, topic, payload, callback)
        super(self).init(NBIoTRequestType.MQTT, callback)
        self.host = host
        self.port = port
        self.username = username
        self.password = password
        self.topic = topic
        self.payload = payload
      end
    end
    
    class NBIoTCOAPRequest : NBIoTRequest
      var host
      var port
      var path
      var query
      var topic
      var payload
    
      def init(host, port, path, method, query, payload, callback)
        super(self).init(NBIoTRequestType.COAP, callback)
        self.host = host
        self.port = port
        self.path = path
        self.method = method
        self.query = query
        self.payload = payload
      end
    end
    
    class NBIoTNTPRequest : NBIoTRequest
      def init(callback)
        super(self).init(NBIoTRequestType.NTP, callback)
      end
    end
    
    class NBIoTPSMRequest : NBIoTRequest
      var value
    
      def init(value, callback)
        super(self).init(NBIoTRequestType.PSM, callback)
        self.value = value
      end
    end
    
    # ------------------------------------------------------- #
    #                       Connections                       #
    # ------------------------------------------------------- #
    
    class NBIoTMQTTConnection
      var host
      var port
      var username
      var password
    
      def init(host, port, username, password)
        self.host = host
        self.port = port
        self.username = username
        self.password = password
      end
    
      def equals(other)
        var equal = true
        equal = equal && self.host == other.host
        equal = equal && self.port == other.port
        equal = equal && self.username == other.username
        equal = equal && self.password == other.password
    
        return equal
      end
    end
    
    # ------------------------------------------------------- #
    #                        Procedures                       #
    # ------------------------------------------------------- #
    
    class NBIoTCommand
      var cmd
      var rsp
      var retries
    
      def init(cmd, rsp, retries)
        self.cmd = cmd
        self.rsp = rsp
        self.retries = retries
    
        if self.retries == nil
          self.retries = 0
        end
      end
    end
    
    class NBIoTProcedure
      var ser
      var request
      var debug
      var done
      var aborted
      var rsp_awaiting
      var cmd_in_process
    
      def init(ser, request)
        self.ser = ser
        self.request = request
        self.debug = false
        self.done = false
        self.aborted = false
        self.rsp_awaiting = false
        self.cmd_in_process = nil
      end
    
      def is_done()
        return self.done
      end
    
      def is_aborted()
        return self.aborted
      end
    
      def send_cmd()
        self.ser.write(bytes().fromstring(self.cmd_in_process.cmd))
        self.rsp_awaiting = true
    
        tasmota.log(string.format('NBT: Sending command \'%s\'', string.replace(self.cmd_in_process.cmd, '\r\n', '')), 2)
      end
    
      def read_rsp_contains_expected_rsp(return_rsp)
        if self.rsp_awaiting
          var rsp = self.ser.read().asstring()
    
          if self.debug
            print(rsp)
          end
    
          if string.find(rsp, self.cmd_in_process.rsp) >= 0
            self.rsp_awaiting = false
    
            tasmota.log(string.format('NBT: Received \'%s\'', self.cmd_in_process.rsp), 2)
            
            if return_rsp
              return rsp
            end
    
            return true
          else
            if self.cmd_in_process.retries > 0
              self.cmd_in_process.retries -= 1
            end
          end
        end
    
        if return_rsp
          return nil
        else
          return false
        end
      end
    
      def fetch_read_rsp_if_contains_expected_rsp_or_send()
        var rsp = self.read_rsp_contains_expected_rsp(true)
    
        if rsp != nil
          return rsp
        else
          self.send_cmd()
    
          return nil
        end
      end
    
      def read_rsp_contains_expected_rsp_or_send()
        if self.read_rsp_contains_expected_rsp(false)
          return true
        else
          self.send_cmd()
    
          return false
        end
      end
    
      def retries_exceeded()
        return self.cmd_in_process.retries < 1
      end
    
      def execute() end
    end
    
    class NBIoTDebugProcedure : NBIoTProcedure
      var cmd_debug
    
      def init(ser, request)
        super(self).init(ser)
    
        self.debug = true
        self.cmd_debug = NBIoTCommand(request.cmd + '\r\n', request.rsp, request.retries)
      end
    
      def execute()
        if self.cmd_in_process == nil
          self.cmd_in_process = self.cmd_debug
        end
    
        self.done = self.read_rsp_contains_expected_rsp_or_send()
        self.aborted = self.retries_exceeded()
      end
    end
    
    class NBIoTResetProcedure : NBIoTProcedure
      var cmd_reset
      var cmd_at
    
      def init(ser)
        super(self).init(ser)
    
        self.cmd_reset = NBIoTCommand('AT+QRST=1\r\n', 'RDY')
        self.cmd_at = NBIoTCommand('AT\r\n', 'OK')
      end
    
      def execute()
        if self.cmd_in_process == nil
          self.cmd_in_process = self.cmd_reset
        elif self.cmd_in_process == self.cmd_reset
          if self.read_rsp_contains_expected_rsp_or_send()
            self.cmd_in_process = self.cmd_at
          end
        elif self.cmd_in_process == self.cmd_at
          self.done = self.read_rsp_contains_expected_rsp_or_send()
        end
      end
    end
    
    class NBIoTDisablePSMProcedure : NBIoTProcedure
      var cmd_register
      var cmd_disable_psm
      var cmd_disable_sleep
    
      def init(ser)
        super(self).init(ser)
    
        self.cmd_register = NBIoTCommand('AT+CEREG=1\r\n', 'OK')
        self.cmd_disable_psm = NBIoTCommand('AT+CPSMS=0\r\n', 'OK')
        self.cmd_disable_sleep = NBIoTCommand('AT+QSCLK=0\r\n', 'OK')
      end
    
      def execute()
        if self.cmd_in_process == nil
          self.cmd_in_process = self.cmd_register
        elif self.cmd_in_process == self.cmd_register
          if self.read_rsp_contains_expected_rsp_or_send()
            self.cmd_in_process = self.cmd_disable_psm
          end
        elif self.cmd_in_process == self.cmd_disable_psm
          if self.read_rsp_contains_expected_rsp_or_send()
            self.cmd_in_process = self.cmd_disable_sleep
          end
        elif self.cmd_in_process == self.cmd_disable_sleep
          self.done = self.read_rsp_contains_expected_rsp_or_send()
        end
      end
    end
    
    class NBIoTNTPProcedure : NBIoTProcedure
      var cmd_ntp
    
      def init(ser, request)
        super(self).init(ser, request)
    
        self.cmd_ntp = NBIoTCommand('AT+QNTP=1,\"0.at.pool.ntp.org\"\r\n', '+QNTP: 0', 10)
      end
    
      def set_system_time(rsp)
        var rsp_args = string.split(rsp, '\"')
        var timestamp = nil
    
        for rsp_arg : rsp_args
          timestamp = tasmota.strptime(rsp_arg, "%y/%m/%d,%H:%M:%S")
    
          if timestamp != nil
            break
          end
        end
    
        tasmota.cmd('time ' + str(timestamp['epoch'] + 3600)) # UTC + 1
      end
    
      def execute()
        if self.cmd_in_process == nil
          self.cmd_in_process = self.cmd_ntp
        end
    
        var rsp = self.fetch_read_rsp_if_contains_expected_rsp_or_send('+QNTP: 0', 'AT+QNTP=1,\"0.at.pool.ntp.org\"\r\n')
    
        if rsp != nil
          self.set_system_time(rsp)
          self.done = true
        end
    
        self.aborted = self.retries_exceeded()
      end
    end
    
    # ------------------------------------------------------- #
    #                       Driver State                      #
    # ------------------------------------------------------- #
    
    class NBIoTDriverState
      static var INIT = 0
      static var IDLE = 1
      static var RESET = 2
      static var DISABLE_PSM = 3
      static var READY = 4
      static var BUSY = 5
    end
    
    # ------------------------------------------------------- #
    #                          Driver                         #
    # ------------------------------------------------------- #
    
    class NBIoTDriver
      var ser
      var psm_eint
      var state
      var request_queue
      var procedure
    
      def init(rx, tx, psm_eint)
        self.ser = serial(rx, tx, 115200, serial.SERIAL_8N1)
        self.psm_eint = psm_eint
    
        self.state = NBIoTDriverState.INIT
        self.request_queue = []
        self.procedure = nil
      end
    
      def queue_request(request)
        self.request_queue.push(request)
      end
    
      def init_procedure(procedure)
        self.procedure = procedure
      end
    
      def finish_procedure(done)
        if done && self.procedure.request != nil && self.procedure.request.callback != nil
          self.procedure.request.callback()
        end
    
        self.procedure = nil
      end
    
      def next_state(state)
        tasmota.log(string.format('NBT: Transitioning from state %i to %i', self.state, state), 2)
    
        self.state = state
      end
    
      def set_psm_pin(value)
        gpio.digital_write(self.psm_eint, value)
      end
    
      def every_second()
        if self.state == NBIoTDriverState.IDLE
          return
        elif self.state == NBIoTDriverState.INIT
          self.set_psm_pin(1)
          self.next_state(NBIoTDriverState.RESET)
        elif self.state == NBIoTDriverState.RESET
          if self.procedure == nil
            self.init_procedure(NBIoTResetProcedure(self.ser))
          end
    
          self.procedure.execute()
    
          if self.procedure.is_done()
            self.finish_procedure()
            self.next_state(NBIoTDriverState.DISABLE_PSM)
          end
        elif self.state == NBIoTDriverState.DISABLE_PSM
          if self.procedure == nil
            self.init_procedure(NBIoTDisablePSMProcedure(self.ser))
          end
    
          self.procedure.execute()
    
          if self.procedure.is_done()
            self.finish_procedure()
            self.next_state(NBIoTDriverState.READY)
          end
        elif self.state == NBIoTDriverState.READY
          if self.request_queue.size() > 0
            var request = self.request_queue[0]
            self.request_queue.remove(0)
    
            var procedure = nil
    
            if request.request_type == NBIoTRequestType.DEBUG
              procedure = NBIoTDebugProcedure(self.ser, request)
            elif request.request_type == NBIoTRequestType.NTP
              procedure = NBIoTNTPProcedure(self.ser, request)
            elif
              tasmota.log(string.format('NBT: Request with type %i not supported, discarding request', request.request_type), 2)
            end
    
            if procedure != nil
              self.init_procedure(procedure)
              self.next_state(NBIoTDriverState.BUSY)
            end
          end
        elif self.state == NBIoTDriverState.BUSY
          if self.procedure.is_aborted() || self.procedure.is_done()
            if self.procedure.is_aborted()
              var cmd = string.replace(self.procedure.cmd_in_process.cmd, '\r\n', '')
    
              tasmota.log(string.format('NBT: Exceeded retries at command %s, discarding request', cmd), 2)
            end
    
            self.finish_procedure(self.procedure.is_done())
            self.next_state(NBIoTDriverState.READY)
          else
            self.procedure.execute()
          end
        else
          tasmota.log('NBT: Invalid driver state, stopping driver', 2)
          tasmota.remove_driver(self)
        end
      end
    end
    
    # ------------------------------------------------------- #
    #                       nbiot module                      #
    # ------------------------------------------------------- #
    
    var nbiot = module('nbiot')
    
    nbiot.init = def (m)
      class nbiot
        var _driver
    
        def init()
          self._driver = NBIoTDriver(18, 23, 16)
    
          tasmota.add_driver(self._driver)
        end
    
        def set_power_save_mode(value, callback)
          var request = NBIoTPSMRequest(value, callback)
    
          self._driver.queue_request(request)
        end
    
        def sync_time(callback)
          var request = NBIoTNTPRequest(callback)
    
          self._driver.queue_request(request)
        end
    
        def mqtt_publish(host, port, username, password, topic, payload, callback)
          if type(payload) == type('')
            var request = NBIoTMQTTRequest(host, port, username, password, topic, payload, callback)
    
            self._driver.queue_request(request)
            
            return true
          else 
            return false
          end
        end
    
        def coap_get(host, port, path, query, callback)
          if type(path) != type([]) || type(query) != type([])
            return false
          else
            var request = NBIoTCOAPRequest(host, port, path, 'GET', query, nil, callback)
    
            self._driver.queue_request(request)
    
            return true
          end
        end
    
        def coap_post(host, port, path, payload, callback)
          if type(path) != type([])
            return false
          else
            var request = NBIoTCOAPRequest(host, port, path, 'POST', [], payload, callback)
    
            self._driver.queue_request(request)
    
            return true
          end
        end
    
        def debug_send(cmd, rsp, retries)
          if retries == nil
            retries = 0
          end
          
          var request = NBIoTDebugRequest(cmd, rsp, retries)
    
          self._driver.queue_request(request)
        end
      end
    
      return nbiot()
    end
    
    return nbiot