Source code for Communication.SerialCom

import itertools
import logging
import time
import winreg

from serial import Serial, SerialException

from CanReader.Communication.ComBase import ComBase


[docs]class SerialCom(ComBase): """ :Inherit: :class:`ComBase` :Description: Handle communication with device via serial port (COM). :param port: COM port on which to communicate. :type port: str :param bauds: Baud speed of serial communication :type bauds: int, optional :raises OSError: COM port does not exist. :raises TypeError: -- COM port is not a str.\n -- Baud rate is not a int. :raises ValueError: Baud rate is not in range 300 - 921600. """ def __init__(self, port, bauds=921600): ComBase.__init__(self) self.check_com_port(port) self.check_baud_rate(bauds) self.__port = port.upper() self.__baud_rate = bauds self.__serial = None def __repr__(self): if self.status == "Offline": return "Device is disconnected" else: return "Device is connected on COM port: {} with baud rate {}".format(self.__port, self.__baud_rate)
[docs] def close(self): """ :Description: Close communication and self destroy. """ if self.__serial is not None: self.__serial.close() ComBase.close(self)
[docs] def connect_to_device(self): """ :Description: Establish serial communication with via USB COM.\n Send information about connection status. :raises SerialException: Port cannot be opened. Usually because port is already opened. """ try: if self.first_connection: self.status = "Connecting" else: self.status = "Reconnecting" self.status_changed.emit(self.status) self.__serial = Serial(port=self.__port, baudrate=self.__baud_rate) self.__serial.timeout = self.TIMEOUT self.first_connection = False self.status = "Online" except SerialException: logging.warning("Could not open port {}. Port is probably already open!".format(self.__port), exc_info=True) self.running = False self.status = "Failed" finally: self.status_changed.emit(self.status)
[docs] def get_data(self): """ :Description: Receive data from device and send signal containing data as a byte array. :raises: If any error occurs try to solve problem by reconnecting to device. """ try: if not self.running: return data = self.__serial.read(self.MSG_SIZE) self.data_received.emit(bytearray(data)) except: logging.warning("Error occurred when receiving data", exc_info=True) time.sleep(self.TIMEOUT) self.connect_to_device()
@staticmethod def available_com_ports() -> list: import sys """ :Description: Lists serial port names :raises EnvironmentError: On unsupported or unknown platforms :raises WindowsError: When cannot access COM registers :returns: A list of the serial ports available on the system """ path = 'HARDWARE\\DEVICEMAP\\SERIALCOMM' ports = [] try: if not sys.platform.startswith('win'): raise EnvironmentError key = winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE, path) for i in itertools.count(): try: ports.append(winreg.EnumValue(key, i)[1]) except EnvironmentError: break except WindowsError: error_msg = "Could not access register on path {}".format(path) logging.exception(error_msg) raise WindowsError(error_msg) except EnvironmentError: error_msg = 'Unsupported platform' logging.exception(error_msg) raise EnvironmentError(error_msg) finally: return ports
[docs] @staticmethod def check_baud_rate(bauds): """ :Description: Check if baud is an int in range of 300 - 921600. :param bauds: Baud rate. :type bauds: int :raises ValueError: Baud rate is not in range 300 - 921600. :raises TypeError: Baud rate is not an integer. """ try: if type(bauds) != int: raise TypeError if bauds < 300 or bauds > 921600: raise ValueError except ValueError: error_msg = "Baud rate must be in range 300 - 921600!" logging.exception(error_msg) raise ValueError(error_msg) except TypeError: error_msg = "Baud rate must be an integer!" logging.exception(error_msg) raise TypeError(error_msg)
[docs] @staticmethod def check_com_port(port): """ :Description: Check if port is a str and can be found on the system. :param port: Com port name. :type port: str :raises TypeError: Port is not a str. :raises OSError: Port cannot be find on the system. """ try: if type(port) != str: raise TypeError if port.upper() not in SerialCom.available_com_ports(): raise OSError except OSError: error_msg = "Cannot find port {}!".format(port) logging.exception(error_msg) raise OSError(error_msg) except TypeError: error_msg = "COM port must be a string!" logging.exception(error_msg) raise TypeError(error_msg)
if __name__ == "__main__": """ ser = serial.Serial('COM4', 921600, timeout=None) #921600 buad is max speed of cp2102 startTime = time.time() i = 0 while i < 125000: i += len(ser.read(ser.inWaiting())) # 1.35 sec on 1Mbit print(time.time() - startTime) """ def print_data(data: bytearray): print(data) serCom = SerialCom('COM4', 921600) serCom.data_received.connect(print_data) serCom.connect_to_device() while 1: serCom.get_data()