Source code for Communication.SocketClient

import logging
import socket

from CanReader.Communication.ComBase import ComBase


[docs]class SocketClient(ComBase): """ :Inherit: :class:`ComBase` :Description: Handle direct communication with car via Wi-Fi. :param address: address of socket server :type address: str, optional :param port: port for socket server :type port: int, optional :raises OSError: IP address is invalid. :raises TypeError: Port is not a int. :raises ValueError: Port is not i range 1 - 65536. """ def __init__(self, address='192.168.1.100', port=80): ComBase.__init__(self) self.check_address(address) self.check_port(port) self.__address = address self.__port = port def __repr__(self): if self.status == "Offline": return "You are disconnected" else: return "Socket is connected to IP: {} on port {}".format(self.__address, self.__port)
[docs] def close(self): """ :Description: Close communication and self destroy. """ self.__sock.close() ComBase.close(self)
[docs] def connect_to_device(self): """ :Description: Establish serial communication with server via Wi-Fi.\n Send information about connection status. :raises WindowsError: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond """ try: if self.first_connection: self.status = "Connecting" else: self.status = "Reconnecting" self.status_changed.emit(self.status) self.__sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.__sock.connect((self.__address, self.__port)) self.__sock.settimeout(self.TIMEOUT) self.first_connection = False self.status = "Online" except WindowsError: logging.warning("A connection attempt failed because the connected party did not properly respond after " "a period of time, or established connection failed because " "connected host has failed to respond", exc_info=True) self.running = False self.status = "Failed" finally: self.status_changed.emit(self.status)
[docs] def get_data(self): """ :Description: Receive data from device and send signal containing data as a byte array.\n Whenever one of the following errors raise, method will try to resolve problem by reconnecting. :raises WindowsError: Unable to reach network.\n :raises AttributeError: SocketClient was not set properly. """ while True: try: if not self.running: break data = self.__sock.recv(self.MSG_SIZE) if len(data) == 0: break self.data_received.emit(bytearray(data)) except WindowsError: # TODO Printovat do GUI do statusbaru logging.info("Unable to reach network!") self.connect_to_device() except AttributeError: logging.warning("SocketClient was not set properly", exc_info=True) self.connect_to_device() self.__sock.close()
[docs] @staticmethod def check_address(address): """ :Description: Check if IP address is in a valid format. :param address: IP address. :type address: str :raises OSError: IP address is not in a valid format. """ try: socket.inet_aton(address) except OSError: error_msg = "IP address is not valid." logging.exception(error_msg) raise OSError(error_msg)
[docs] @staticmethod def check_port(port): """ :Description: Check if port is int in range 1 - 65535. :param port: Communication port. :type port: int :raises ValueError: Port is not in range of 1 - 65535. :raises TypeError: Port is not an int. """ try: if type(port) != int: raise TypeError if port <= 0 or port >= 65536: raise ValueError except ValueError: error_msg = "Port number must be in range 1 - 65535." logging.exception(error_msg) raise ValueError(error_msg) except TypeError: error_msg = "Port must be integer." logging.exception(error_msg) raise TypeError(error_msg)
if __name__ == "__main__": from datetime import datetime def process_data(data): id = int.from_bytes(data[:4], "big") msg = data[4:] now = datetime.now() current_time = now.strftime("%H:%M:%S") print("Current Time = ", current_time, " ") print("ID: {}".format(id)) for i, m in enumerate(msg): print("{}\t{}".format(i, m)) addr = '192.168.4.1' port = 23 com = SocketClient(addr, port) com.data_received.connect(process_data) com.connect_to_device() print(com) while True: pass