Module prontonet.device

Expand source code
import socket
import struct
import threading
import time
from typing import Callable

from prontonet.protocol import ProntonetProtocol
from prontonet.structures import ProntonetCommand, StatusLineStatusChanged, StatusAlarmsStatusChanged, \
    StatusDecoderAudioModeChanged, StatusEncoderAudioModeChanged
from prontonet.enums import Command, AlarmStatus, LineStatus, IPCallType, Codec


class ProntonetDevice:
    """
    A class used to represent Prontonet Device.

    ...

    Attributes
    ----------
    ip : str
        IP address of Prontonet device.
    port : int
        Port number of command protocol.
    status_port : int
        Port number of status protocol.

    Methods
    -------
    connect()
        Connects to status protocol socket and starts listening for messages.

    disconnect()
        Disconnects from status protocol socket.

    attach_on_connected(func, *args)
        Method to attach function, which will be executed after status socket connected.

    attach_on_status_socket_disconnected(func, *args)
        Method to attach function, which will be executed after status socket disconnected.

    attach_on_line_status_changed(func, *args)
        Method to attach function, which will be executed after line status changed.

    attach_on_alarm_status_changed(func, *args)
        Method to attach function, which will be executed after alarm status changed.

    attach_on_decoder_audio_mode_changed(func, *args)
        Method to attach function, which will be executed after decoder audio mode changed.

    attach_on_encoder_audio_mode_changed(func, *args)
        Method to attach function, which will be executed after encoder audio mode changed.

    send_command(command: ProntonetCommand)
        Send command to command protocol socket.

    """

    def __init__(self, ip, port=50031, status_port=50035):
        """
        Parameters
        ----------
        ip : str
            IP address of Prontonet device.
        port : int
            Port number of command protocol. (default is 50031)
        status_port : int
            Port number of status protocol. (default is 50035)
        """

        self.__command_socket: socket.socket or None = None
        self.__status_socket: socket.socket or None = None
        self.__status_th: threading.Thread or None = None
        self.ip: str = ip
        self.port: int = port
        self.status_port: int = status_port

        self.__on_connected: Callable or None = None
        self.__on_connected_args: tuple or None = None
        self.__on_status_socket_disconnected: Callable or None = None
        self.__on_status_socket_disconnected_args: tuple or None = None
        self.__on_status_socket_reconnected: Callable or None = None
        self.__on_status_socket_reconnected_args: tuple or None = None
        self.__on_line_status_changed: Callable or None = None
        self.__on_line_status_changed_args: tuple or None = None
        self.__on_alarm_status_changed: Callable or None = None
        self.__on_alarm_status_changed_args: tuple or None = None
        self.__on_decoder_audio_mode_changed: Callable or None = None
        self.__on_decoder_audio_mode_changed_args: tuple or None = None
        self.__on_encoder_audio_mode_changed: Callable or None = None
        self.__on_encoder_audio_mode_changed_args: tuple or None = None

    def connect(self) -> None:
        """
        Connects to status protocol socket and starts listening for messages.

        Returns
        -------
        None
        """

        self.__status_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        self.__status_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 1)
        self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1)
        self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5)

        self.__status_socket.connect((self.ip, self.status_port))

        if self.__on_connected is not None:
            self.__on_connected(self.__on_connected_args)

        self.__status_th = threading.Thread(target=self.__status_loop, daemon=True)
        self.__status_th.start()

    def __status_loop(self) -> None:
        """
        Listens for status change. When socket disconnects, attempts to reconnect.

        Returns
        -------
        None
        """

        connected = True
        while True:
            if not connected:
                try:
                    self.__status_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 1)
                    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1)
                    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5)
                    self.__status_socket.connect((self.ip, self.status_port))
                    connected = True
                    time.sleep(2)
                    if self.__on_status_socket_reconnected is not None:
                        self.__on_status_socket_reconnected(self.__on_status_socket_reconnected_args)
                except socket.error:
                    time.sleep(5)
            else:
                try:
                    data = self.__status_socket.recv(1024)
                    self.__process_status_message(data)
                except socket.error:
                    if self.__on_status_socket_disconnected is not None:
                        self.__on_status_socket_disconnected(self.__on_status_socket_disconnected_args)
                    time.sleep(1)
                    self.__status_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    connected = False

    def __process_status_message(self, message: bytes) -> None:
        """
        Processes status messages. If function is attached to event, this method executes it.

        Parameters
        ----------
        message : bytes
            Message received from status socket.
        Returns
        -------
        None
        """

        if message[0] == Command.STATUS_LINE_STATUS_CHANGED:
            if self.__on_line_status_changed is not None:
                res = struct.unpack("<ii32sii", message[8:])
                num = ''.join(list(s for s in bytes(res[2]).decode("utf-8") if s.isprintable()))
                status = StatusLineStatusChanged(res[0], LineStatus(res[1]), num, res[3], IPCallType(res[4]))
                self.__on_line_status_changed(status, self.__on_line_status_changed_args)
        elif message[0] == Command.STATUS_ALARMS_STATUS_CHANGED:
            if self.__on_alarm_status_changed is not None:
                status = struct.unpack("<i", message[8:])
                self.__on_alarm_status_changed(StatusAlarmsStatusChanged(AlarmStatus(status[0])),
                                               self.__on_alarm_status_changed_args)
        elif message[0] == Command.STATUS_DECODER_AUDIO_MODE_CHANGED:
            if self.__on_decoder_audio_mode_changed is not None:
                message_len = len(message) - 12
                status = struct.unpack("<i" + str(message_len) + "s", message[8:])
                codec_name = ''.join(list(s for s in bytes(status[1]).decode("utf-8") if s.isprintable()))[:-1]
                self.__on_decoder_audio_mode_changed(StatusDecoderAudioModeChanged(Codec(status[0]), codec_name),
                                                     self.__on_decoder_audio_mode_changed_args)
        elif message[0] == Command.STATUS_ENCODER_AUDIO_MODE_CHANGED:
            if self.__on_encoder_audio_mode_changed is not None:
                message_len = len(message) - 12
                status = struct.unpack("<i" + str(message_len) + "s", message[8:])
                codec_name = ''.join(list(s for s in bytes(status[1]).decode("utf-8") if s.isprintable()))[:-1]
                self.__on_encoder_audio_mode_changed(StatusEncoderAudioModeChanged(Codec(status[0]), codec_name),
                                                     self.__on_encoder_audio_mode_changed_args)
        else:
            print("[!] Unknown status message received.")

    def disconnect(self) -> None:
        """
        Stops listening for status messages and closes status socket.

        Returns
        -------
        None
        """

        self.__status_th.join()
        self.__status_socket.close()

    def attach_on_connected(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after status socket connected.

        Parameters
        ----------
        func
            Function to be executed when status socket is connected.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_connected = func
        self.__on_connected_args = args

    def attach_on_status_socket_disconnected(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after status socket disconnected.

        Parameters
        ----------
        func
            Function to be executed when status socket is disconnected.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_status_socket_disconnected = func
        self.__on_status_socket_disconnected_args = args

    def attach_on_status_socket_reconnected(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after status socket reconnected.

        Parameters
        ----------
        func
            Function to be executed when status socket is reconnected.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_status_socket_reconnected = func
        self.__on_status_socket_reconnected_args = args

    def attach_on_line_status_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after line status changed.

        Parameters
        ----------
        func
            Function to be executed when line status changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_line_status_changed = func
        self.__on_line_status_changed_args = args

    def attach_on_alarm_status_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after alarm status changed.

        Parameters
        ----------
        func
            Function to be executed when alarm status changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_alarm_status_changed = func
        self.__on_alarm_status_changed_args = args

    def attach_on_decoder_audio_mode_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after decoder audio mode changed.

        Parameters
        ----------
        func
            Function to be executed when decoder audio mode changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_decoder_audio_mode_changed = func
        self.__on_decoder_audio_mode_changed_args = args

    def attach_on_encoder_audio_mode_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after encoder audio mode changed.

        Parameters
        ----------
        func
            Function to be executed when encoder audio mode changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_encoder_audio_mode_changed = func
        self.__on_encoder_audio_mode_changed_args = args

    def send_command(self, command: ProntonetCommand):
        """
        Sends command to command protocol socket.

        Parameters
        ----------
        command : ProntonetCommand
            Dataclass which contains command for Prontonet device (bytes),
            struct.unpack() pattern for response and command response type.

        Returns
        -------
        Structure passed to function as third argument of ProntonetCommand object.
        """

        self.__command_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__command_socket.connect((self.ip, self.port))
        self.__command_socket.send(ProntonetProtocol.connect())
        self.__command_socket.recv(1024)
        self.__command_socket.send(command.command)
        data = self.__command_socket.recv(2048)
        self.__command_socket.close()
        pattern = command.unpack_pattern
        if '#' in pattern:
            return command.response_type(data[8:-1])
        else:
            val = struct.unpack(pattern, data)
            return command.response_type(*val[2:])

Classes

class ProntonetDevice (ip, port=50031, status_port=50035)

A class used to represent Prontonet Device.

Attributes

ip : str
IP address of Prontonet device.
port : int
Port number of command protocol.
status_port : int
Port number of status protocol.

Methods

connect() Connects to status protocol socket and starts listening for messages.

disconnect() Disconnects from status protocol socket.

attach_on_connected(func, *args) Method to attach function, which will be executed after status socket connected.

attach_on_status_socket_disconnected(func, *args) Method to attach function, which will be executed after status socket disconnected.

attach_on_line_status_changed(func, *args) Method to attach function, which will be executed after line status changed.

attach_on_alarm_status_changed(func, *args) Method to attach function, which will be executed after alarm status changed.

attach_on_decoder_audio_mode_changed(func, *args) Method to attach function, which will be executed after decoder audio mode changed.

attach_on_encoder_audio_mode_changed(func, *args) Method to attach function, which will be executed after encoder audio mode changed.

send_command(command: ProntonetCommand) Send command to command protocol socket.

Parameters

ip : str
IP address of Prontonet device.
port : int
Port number of command protocol. (default is 50031)
status_port : int
Port number of status protocol. (default is 50035)
Expand source code
class ProntonetDevice:
    """
    A class used to represent Prontonet Device.

    ...

    Attributes
    ----------
    ip : str
        IP address of Prontonet device.
    port : int
        Port number of command protocol.
    status_port : int
        Port number of status protocol.

    Methods
    -------
    connect()
        Connects to status protocol socket and starts listening for messages.

    disconnect()
        Disconnects from status protocol socket.

    attach_on_connected(func, *args)
        Method to attach function, which will be executed after status socket connected.

    attach_on_status_socket_disconnected(func, *args)
        Method to attach function, which will be executed after status socket disconnected.

    attach_on_line_status_changed(func, *args)
        Method to attach function, which will be executed after line status changed.

    attach_on_alarm_status_changed(func, *args)
        Method to attach function, which will be executed after alarm status changed.

    attach_on_decoder_audio_mode_changed(func, *args)
        Method to attach function, which will be executed after decoder audio mode changed.

    attach_on_encoder_audio_mode_changed(func, *args)
        Method to attach function, which will be executed after encoder audio mode changed.

    send_command(command: ProntonetCommand)
        Send command to command protocol socket.

    """

    def __init__(self, ip, port=50031, status_port=50035):
        """
        Parameters
        ----------
        ip : str
            IP address of Prontonet device.
        port : int
            Port number of command protocol. (default is 50031)
        status_port : int
            Port number of status protocol. (default is 50035)
        """

        self.__command_socket: socket.socket or None = None
        self.__status_socket: socket.socket or None = None
        self.__status_th: threading.Thread or None = None
        self.ip: str = ip
        self.port: int = port
        self.status_port: int = status_port

        self.__on_connected: Callable or None = None
        self.__on_connected_args: tuple or None = None
        self.__on_status_socket_disconnected: Callable or None = None
        self.__on_status_socket_disconnected_args: tuple or None = None
        self.__on_status_socket_reconnected: Callable or None = None
        self.__on_status_socket_reconnected_args: tuple or None = None
        self.__on_line_status_changed: Callable or None = None
        self.__on_line_status_changed_args: tuple or None = None
        self.__on_alarm_status_changed: Callable or None = None
        self.__on_alarm_status_changed_args: tuple or None = None
        self.__on_decoder_audio_mode_changed: Callable or None = None
        self.__on_decoder_audio_mode_changed_args: tuple or None = None
        self.__on_encoder_audio_mode_changed: Callable or None = None
        self.__on_encoder_audio_mode_changed_args: tuple or None = None

    def connect(self) -> None:
        """
        Connects to status protocol socket and starts listening for messages.

        Returns
        -------
        None
        """

        self.__status_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

        self.__status_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 1)
        self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1)
        self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5)

        self.__status_socket.connect((self.ip, self.status_port))

        if self.__on_connected is not None:
            self.__on_connected(self.__on_connected_args)

        self.__status_th = threading.Thread(target=self.__status_loop, daemon=True)
        self.__status_th.start()

    def __status_loop(self) -> None:
        """
        Listens for status change. When socket disconnects, attempts to reconnect.

        Returns
        -------
        None
        """

        connected = True
        while True:
            if not connected:
                try:
                    self.__status_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
                    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 1)
                    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1)
                    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5)
                    self.__status_socket.connect((self.ip, self.status_port))
                    connected = True
                    time.sleep(2)
                    if self.__on_status_socket_reconnected is not None:
                        self.__on_status_socket_reconnected(self.__on_status_socket_reconnected_args)
                except socket.error:
                    time.sleep(5)
            else:
                try:
                    data = self.__status_socket.recv(1024)
                    self.__process_status_message(data)
                except socket.error:
                    if self.__on_status_socket_disconnected is not None:
                        self.__on_status_socket_disconnected(self.__on_status_socket_disconnected_args)
                    time.sleep(1)
                    self.__status_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                    connected = False

    def __process_status_message(self, message: bytes) -> None:
        """
        Processes status messages. If function is attached to event, this method executes it.

        Parameters
        ----------
        message : bytes
            Message received from status socket.
        Returns
        -------
        None
        """

        if message[0] == Command.STATUS_LINE_STATUS_CHANGED:
            if self.__on_line_status_changed is not None:
                res = struct.unpack("<ii32sii", message[8:])
                num = ''.join(list(s for s in bytes(res[2]).decode("utf-8") if s.isprintable()))
                status = StatusLineStatusChanged(res[0], LineStatus(res[1]), num, res[3], IPCallType(res[4]))
                self.__on_line_status_changed(status, self.__on_line_status_changed_args)
        elif message[0] == Command.STATUS_ALARMS_STATUS_CHANGED:
            if self.__on_alarm_status_changed is not None:
                status = struct.unpack("<i", message[8:])
                self.__on_alarm_status_changed(StatusAlarmsStatusChanged(AlarmStatus(status[0])),
                                               self.__on_alarm_status_changed_args)
        elif message[0] == Command.STATUS_DECODER_AUDIO_MODE_CHANGED:
            if self.__on_decoder_audio_mode_changed is not None:
                message_len = len(message) - 12
                status = struct.unpack("<i" + str(message_len) + "s", message[8:])
                codec_name = ''.join(list(s for s in bytes(status[1]).decode("utf-8") if s.isprintable()))[:-1]
                self.__on_decoder_audio_mode_changed(StatusDecoderAudioModeChanged(Codec(status[0]), codec_name),
                                                     self.__on_decoder_audio_mode_changed_args)
        elif message[0] == Command.STATUS_ENCODER_AUDIO_MODE_CHANGED:
            if self.__on_encoder_audio_mode_changed is not None:
                message_len = len(message) - 12
                status = struct.unpack("<i" + str(message_len) + "s", message[8:])
                codec_name = ''.join(list(s for s in bytes(status[1]).decode("utf-8") if s.isprintable()))[:-1]
                self.__on_encoder_audio_mode_changed(StatusEncoderAudioModeChanged(Codec(status[0]), codec_name),
                                                     self.__on_encoder_audio_mode_changed_args)
        else:
            print("[!] Unknown status message received.")

    def disconnect(self) -> None:
        """
        Stops listening for status messages and closes status socket.

        Returns
        -------
        None
        """

        self.__status_th.join()
        self.__status_socket.close()

    def attach_on_connected(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after status socket connected.

        Parameters
        ----------
        func
            Function to be executed when status socket is connected.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_connected = func
        self.__on_connected_args = args

    def attach_on_status_socket_disconnected(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after status socket disconnected.

        Parameters
        ----------
        func
            Function to be executed when status socket is disconnected.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_status_socket_disconnected = func
        self.__on_status_socket_disconnected_args = args

    def attach_on_status_socket_reconnected(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after status socket reconnected.

        Parameters
        ----------
        func
            Function to be executed when status socket is reconnected.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_status_socket_reconnected = func
        self.__on_status_socket_reconnected_args = args

    def attach_on_line_status_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after line status changed.

        Parameters
        ----------
        func
            Function to be executed when line status changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_line_status_changed = func
        self.__on_line_status_changed_args = args

    def attach_on_alarm_status_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after alarm status changed.

        Parameters
        ----------
        func
            Function to be executed when alarm status changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_alarm_status_changed = func
        self.__on_alarm_status_changed_args = args

    def attach_on_decoder_audio_mode_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after decoder audio mode changed.

        Parameters
        ----------
        func
            Function to be executed when decoder audio mode changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_decoder_audio_mode_changed = func
        self.__on_decoder_audio_mode_changed_args = args

    def attach_on_encoder_audio_mode_changed(self, func, *args) -> None:
        """
        Method to attach function, which will be executed after encoder audio mode changed.

        Parameters
        ----------
        func
            Function to be executed when encoder audio mode changed.
        args
            Function arguments.

        Returns
        -------
        None
        """

        self.__on_encoder_audio_mode_changed = func
        self.__on_encoder_audio_mode_changed_args = args

    def send_command(self, command: ProntonetCommand):
        """
        Sends command to command protocol socket.

        Parameters
        ----------
        command : ProntonetCommand
            Dataclass which contains command for Prontonet device (bytes),
            struct.unpack() pattern for response and command response type.

        Returns
        -------
        Structure passed to function as third argument of ProntonetCommand object.
        """

        self.__command_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.__command_socket.connect((self.ip, self.port))
        self.__command_socket.send(ProntonetProtocol.connect())
        self.__command_socket.recv(1024)
        self.__command_socket.send(command.command)
        data = self.__command_socket.recv(2048)
        self.__command_socket.close()
        pattern = command.unpack_pattern
        if '#' in pattern:
            return command.response_type(data[8:-1])
        else:
            val = struct.unpack(pattern, data)
            return command.response_type(*val[2:])

Methods

def attach_on_alarm_status_changed(self, func, *args) ‑> None

Method to attach function, which will be executed after alarm status changed.

Parameters

func
Function to be executed when alarm status changed.
args
Function arguments.

Returns

None
 
Expand source code
def attach_on_alarm_status_changed(self, func, *args) -> None:
    """
    Method to attach function, which will be executed after alarm status changed.

    Parameters
    ----------
    func
        Function to be executed when alarm status changed.
    args
        Function arguments.

    Returns
    -------
    None
    """

    self.__on_alarm_status_changed = func
    self.__on_alarm_status_changed_args = args
def attach_on_connected(self, func, *args) ‑> None

Method to attach function, which will be executed after status socket connected.

Parameters

func
Function to be executed when status socket is connected.
args
Function arguments.

Returns

None
 
Expand source code
def attach_on_connected(self, func, *args) -> None:
    """
    Method to attach function, which will be executed after status socket connected.

    Parameters
    ----------
    func
        Function to be executed when status socket is connected.
    args
        Function arguments.

    Returns
    -------
    None
    """

    self.__on_connected = func
    self.__on_connected_args = args
def attach_on_decoder_audio_mode_changed(self, func, *args) ‑> None

Method to attach function, which will be executed after decoder audio mode changed.

Parameters

func
Function to be executed when decoder audio mode changed.
args
Function arguments.

Returns

None
 
Expand source code
def attach_on_decoder_audio_mode_changed(self, func, *args) -> None:
    """
    Method to attach function, which will be executed after decoder audio mode changed.

    Parameters
    ----------
    func
        Function to be executed when decoder audio mode changed.
    args
        Function arguments.

    Returns
    -------
    None
    """

    self.__on_decoder_audio_mode_changed = func
    self.__on_decoder_audio_mode_changed_args = args
def attach_on_encoder_audio_mode_changed(self, func, *args) ‑> None

Method to attach function, which will be executed after encoder audio mode changed.

Parameters

func
Function to be executed when encoder audio mode changed.
args
Function arguments.

Returns

None
 
Expand source code
def attach_on_encoder_audio_mode_changed(self, func, *args) -> None:
    """
    Method to attach function, which will be executed after encoder audio mode changed.

    Parameters
    ----------
    func
        Function to be executed when encoder audio mode changed.
    args
        Function arguments.

    Returns
    -------
    None
    """

    self.__on_encoder_audio_mode_changed = func
    self.__on_encoder_audio_mode_changed_args = args
def attach_on_line_status_changed(self, func, *args) ‑> None

Method to attach function, which will be executed after line status changed.

Parameters

func
Function to be executed when line status changed.
args
Function arguments.

Returns

None
 
Expand source code
def attach_on_line_status_changed(self, func, *args) -> None:
    """
    Method to attach function, which will be executed after line status changed.

    Parameters
    ----------
    func
        Function to be executed when line status changed.
    args
        Function arguments.

    Returns
    -------
    None
    """

    self.__on_line_status_changed = func
    self.__on_line_status_changed_args = args
def attach_on_status_socket_disconnected(self, func, *args) ‑> None

Method to attach function, which will be executed after status socket disconnected.

Parameters

func
Function to be executed when status socket is disconnected.
args
Function arguments.

Returns

None
 
Expand source code
def attach_on_status_socket_disconnected(self, func, *args) -> None:
    """
    Method to attach function, which will be executed after status socket disconnected.

    Parameters
    ----------
    func
        Function to be executed when status socket is disconnected.
    args
        Function arguments.

    Returns
    -------
    None
    """

    self.__on_status_socket_disconnected = func
    self.__on_status_socket_disconnected_args = args
def attach_on_status_socket_reconnected(self, func, *args) ‑> None

Method to attach function, which will be executed after status socket reconnected.

Parameters

func
Function to be executed when status socket is reconnected.
args
Function arguments.

Returns

None
 
Expand source code
def attach_on_status_socket_reconnected(self, func, *args) -> None:
    """
    Method to attach function, which will be executed after status socket reconnected.

    Parameters
    ----------
    func
        Function to be executed when status socket is reconnected.
    args
        Function arguments.

    Returns
    -------
    None
    """

    self.__on_status_socket_reconnected = func
    self.__on_status_socket_reconnected_args = args
def connect(self) ‑> None

Connects to status protocol socket and starts listening for messages.

Returns

None
 
Expand source code
def connect(self) -> None:
    """
    Connects to status protocol socket and starts listening for messages.

    Returns
    -------
    None
    """

    self.__status_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

    self.__status_socket.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPIDLE, 1)
    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPINTVL, 1)
    self.__status_socket.setsockopt(socket.SOL_TCP, socket.TCP_KEEPCNT, 5)

    self.__status_socket.connect((self.ip, self.status_port))

    if self.__on_connected is not None:
        self.__on_connected(self.__on_connected_args)

    self.__status_th = threading.Thread(target=self.__status_loop, daemon=True)
    self.__status_th.start()
def disconnect(self) ‑> None

Stops listening for status messages and closes status socket.

Returns

None
 
Expand source code
def disconnect(self) -> None:
    """
    Stops listening for status messages and closes status socket.

    Returns
    -------
    None
    """

    self.__status_th.join()
    self.__status_socket.close()
def send_command(self, command: ProntonetCommand)

Sends command to command protocol socket.

Parameters

command : ProntonetCommand
Dataclass which contains command for Prontonet device (bytes), struct.unpack() pattern for response and command response type.

Returns

Structure passed to function as third argument of ProntonetCommand object.

Expand source code
def send_command(self, command: ProntonetCommand):
    """
    Sends command to command protocol socket.

    Parameters
    ----------
    command : ProntonetCommand
        Dataclass which contains command for Prontonet device (bytes),
        struct.unpack() pattern for response and command response type.

    Returns
    -------
    Structure passed to function as third argument of ProntonetCommand object.
    """

    self.__command_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.__command_socket.connect((self.ip, self.port))
    self.__command_socket.send(ProntonetProtocol.connect())
    self.__command_socket.recv(1024)
    self.__command_socket.send(command.command)
    data = self.__command_socket.recv(2048)
    self.__command_socket.close()
    pattern = command.unpack_pattern
    if '#' in pattern:
        return command.response_type(data[8:-1])
    else:
        val = struct.unpack(pattern, data)
        return command.response_type(*val[2:])