udpack.core

  1import socket
  2import json
  3
  4import numpy as np
  5
  6
  7class UDP:
  8    """
  9    UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.
 10    """
 11
 12    def __init__(self, recv_addr=None, send_addr=None):
 13        """
 14        Initialize UDP Tx and Rx
 15
 16        Args:
 17            recv_addr: address to listen on, None if not receiving (tx only)
 18            send_addr: address of target host, None if not sending (rx only)
 19        """
 20        self.recv_addr = recv_addr
 21        self.send_addr = send_addr
 22
 23        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 24
 25        if self.recv_addr:
 26            self._sock.bind(self.recv_addr)
 27            print("UDP Rx is initialized:", self.recv_addr)
 28
 29        if self.send_addr:
 30            print("UDP Tx is initialized:", self.send_addr)
 31
 32    def stop(self) -> None:
 33        """
 34        Close the socket.
 35        """
 36        self._sock.close()
 37
 38    def recv(self, bufsize=1024, timeout=None) -> bytes:
 39        """
 40        Receive data
 41
 42        timeout == None: blocking forever
 43        timeout == 0: non-blocking (the actual delay is around 0.1s)
 44        timeout > 0: blocking for timeout seconds
 45
 46        Args:
 47            bufsize: size of data buffer to receive
 48            timeout: timeout in seconds
 49        """
 50        if not self.recv_addr:
 51            raise ValueError("Cannot receive data without a receive address")
 52        self._sock.settimeout(timeout)
 53        try:
 54            buffer, addr = self._sock.recvfrom(bufsize)
 55        except (socket.timeout, BlockingIOError):
 56            return None
 57        return buffer
 58
 59    def send(self, buffer: bytes) -> None:
 60        """
 61        Send a byte buffer to the target device.
 62
 63        Args:
 64            buffer: data to send
 65        """
 66        if not self.send_addr:
 67            raise ValueError("Cannot send data without a send address")
 68        self._sock.sendto(buffer, self.send_addr)
 69
 70    def recv_dict(self, bufsize=1024, timeout=None) -> dict:
 71        """
 72        Receive data and deserialize it into a python dictionary.
 73
 74        See `recv()` for more information on timeout.
 75
 76        Args:
 77            bufsize: size of data buffer to receive
 78            timeout: timeout in seconds
 79        """
 80        buffer = self.recv(bufsize=bufsize, timeout=timeout)
 81        if not buffer:
 82            return None
 83        serialized_data = buffer.decode("utf-8")
 84        data = json.loads(serialized_data)
 85        return data
 86
 87    def send_dict(self, data: dict) -> None:
 88        """
 89        Serialize a python dictionary and send it.
 90
 91        Args:
 92            data: data to send
 93        """
 94        buffer = json.dumps(data)
 95        buffer = buffer.encode()
 96        self.send(buffer)
 97
 98    def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray:
 99        """
100        Receive data and deserialize it into a numpy array.
101
102        See `recv()` for more information on timeout.
103
104        Args:
105            bufsize: size of data buffer to receive
106            dtype: numpy data type
107            timeout: timeout in seconds
108        """
109        buffer = self.recv(bufsize=bufsize, timeout=timeout)
110        if not buffer:
111            return None
112        data = np.frombuffer(buffer, dtype=dtype)
113        return data
114
115    def send_numpy(self, data: np.ndarray) -> None:
116        """
117        Serialize a numpy array and send it.
118
119        Args:
120            data: data to send
121        """
122        buffer = data.tobytes()
123        self.send(buffer)
class UDP:
  8class UDP:
  9    """
 10    UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.
 11    """
 12
 13    def __init__(self, recv_addr=None, send_addr=None):
 14        """
 15        Initialize UDP Tx and Rx
 16
 17        Args:
 18            recv_addr: address to listen on, None if not receiving (tx only)
 19            send_addr: address of target host, None if not sending (rx only)
 20        """
 21        self.recv_addr = recv_addr
 22        self.send_addr = send_addr
 23
 24        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
 25
 26        if self.recv_addr:
 27            self._sock.bind(self.recv_addr)
 28            print("UDP Rx is initialized:", self.recv_addr)
 29
 30        if self.send_addr:
 31            print("UDP Tx is initialized:", self.send_addr)
 32
 33    def stop(self) -> None:
 34        """
 35        Close the socket.
 36        """
 37        self._sock.close()
 38
 39    def recv(self, bufsize=1024, timeout=None) -> bytes:
 40        """
 41        Receive data
 42
 43        timeout == None: blocking forever
 44        timeout == 0: non-blocking (the actual delay is around 0.1s)
 45        timeout > 0: blocking for timeout seconds
 46
 47        Args:
 48            bufsize: size of data buffer to receive
 49            timeout: timeout in seconds
 50        """
 51        if not self.recv_addr:
 52            raise ValueError("Cannot receive data without a receive address")
 53        self._sock.settimeout(timeout)
 54        try:
 55            buffer, addr = self._sock.recvfrom(bufsize)
 56        except (socket.timeout, BlockingIOError):
 57            return None
 58        return buffer
 59
 60    def send(self, buffer: bytes) -> None:
 61        """
 62        Send a byte buffer to the target device.
 63
 64        Args:
 65            buffer: data to send
 66        """
 67        if not self.send_addr:
 68            raise ValueError("Cannot send data without a send address")
 69        self._sock.sendto(buffer, self.send_addr)
 70
 71    def recv_dict(self, bufsize=1024, timeout=None) -> dict:
 72        """
 73        Receive data and deserialize it into a python dictionary.
 74
 75        See `recv()` for more information on timeout.
 76
 77        Args:
 78            bufsize: size of data buffer to receive
 79            timeout: timeout in seconds
 80        """
 81        buffer = self.recv(bufsize=bufsize, timeout=timeout)
 82        if not buffer:
 83            return None
 84        serialized_data = buffer.decode("utf-8")
 85        data = json.loads(serialized_data)
 86        return data
 87
 88    def send_dict(self, data: dict) -> None:
 89        """
 90        Serialize a python dictionary and send it.
 91
 92        Args:
 93            data: data to send
 94        """
 95        buffer = json.dumps(data)
 96        buffer = buffer.encode()
 97        self.send(buffer)
 98
 99    def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray:
100        """
101        Receive data and deserialize it into a numpy array.
102
103        See `recv()` for more information on timeout.
104
105        Args:
106            bufsize: size of data buffer to receive
107            dtype: numpy data type
108            timeout: timeout in seconds
109        """
110        buffer = self.recv(bufsize=bufsize, timeout=timeout)
111        if not buffer:
112            return None
113        data = np.frombuffer(buffer, dtype=dtype)
114        return data
115
116    def send_numpy(self, data: np.ndarray) -> None:
117        """
118        Serialize a numpy array and send it.
119
120        Args:
121            data: data to send
122        """
123        buffer = data.tobytes()
124        self.send(buffer)

UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.

UDP(recv_addr=None, send_addr=None)
13    def __init__(self, recv_addr=None, send_addr=None):
14        """
15        Initialize UDP Tx and Rx
16
17        Args:
18            recv_addr: address to listen on, None if not receiving (tx only)
19            send_addr: address of target host, None if not sending (rx only)
20        """
21        self.recv_addr = recv_addr
22        self.send_addr = send_addr
23
24        self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
25
26        if self.recv_addr:
27            self._sock.bind(self.recv_addr)
28            print("UDP Rx is initialized:", self.recv_addr)
29
30        if self.send_addr:
31            print("UDP Tx is initialized:", self.send_addr)

Initialize UDP Tx and Rx

Arguments:
  • recv_addr: address to listen on, None if not receiving (tx only)
  • send_addr: address of target host, None if not sending (rx only)
recv_addr
send_addr
def stop(self) -> None:
33    def stop(self) -> None:
34        """
35        Close the socket.
36        """
37        self._sock.close()

Close the socket.

def recv(self, bufsize=1024, timeout=None) -> bytes:
39    def recv(self, bufsize=1024, timeout=None) -> bytes:
40        """
41        Receive data
42
43        timeout == None: blocking forever
44        timeout == 0: non-blocking (the actual delay is around 0.1s)
45        timeout > 0: blocking for timeout seconds
46
47        Args:
48            bufsize: size of data buffer to receive
49            timeout: timeout in seconds
50        """
51        if not self.recv_addr:
52            raise ValueError("Cannot receive data without a receive address")
53        self._sock.settimeout(timeout)
54        try:
55            buffer, addr = self._sock.recvfrom(bufsize)
56        except (socket.timeout, BlockingIOError):
57            return None
58        return buffer

Receive data

timeout == None: blocking forever timeout == 0: non-blocking (the actual delay is around 0.1s) timeout > 0: blocking for timeout seconds

Arguments:
  • bufsize: size of data buffer to receive
  • timeout: timeout in seconds
def send(self, buffer: bytes) -> None:
60    def send(self, buffer: bytes) -> None:
61        """
62        Send a byte buffer to the target device.
63
64        Args:
65            buffer: data to send
66        """
67        if not self.send_addr:
68            raise ValueError("Cannot send data without a send address")
69        self._sock.sendto(buffer, self.send_addr)

Send a byte buffer to the target device.

Arguments:
  • buffer: data to send
def recv_dict(self, bufsize=1024, timeout=None) -> dict:
71    def recv_dict(self, bufsize=1024, timeout=None) -> dict:
72        """
73        Receive data and deserialize it into a python dictionary.
74
75        See `recv()` for more information on timeout.
76
77        Args:
78            bufsize: size of data buffer to receive
79            timeout: timeout in seconds
80        """
81        buffer = self.recv(bufsize=bufsize, timeout=timeout)
82        if not buffer:
83            return None
84        serialized_data = buffer.decode("utf-8")
85        data = json.loads(serialized_data)
86        return data

Receive data and deserialize it into a python dictionary.

See recv() for more information on timeout.

Arguments:
  • bufsize: size of data buffer to receive
  • timeout: timeout in seconds
def send_dict(self, data: dict) -> None:
88    def send_dict(self, data: dict) -> None:
89        """
90        Serialize a python dictionary and send it.
91
92        Args:
93            data: data to send
94        """
95        buffer = json.dumps(data)
96        buffer = buffer.encode()
97        self.send(buffer)

Serialize a python dictionary and send it.

Arguments:
  • data: data to send
def recv_numpy( self, bufsize=1024, dtype=<class 'numpy.float32'>, timeout=None) -> numpy.ndarray:
 99    def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray:
100        """
101        Receive data and deserialize it into a numpy array.
102
103        See `recv()` for more information on timeout.
104
105        Args:
106            bufsize: size of data buffer to receive
107            dtype: numpy data type
108            timeout: timeout in seconds
109        """
110        buffer = self.recv(bufsize=bufsize, timeout=timeout)
111        if not buffer:
112            return None
113        data = np.frombuffer(buffer, dtype=dtype)
114        return data

Receive data and deserialize it into a numpy array.

See recv() for more information on timeout.

Arguments:
  • bufsize: size of data buffer to receive
  • dtype: numpy data type
  • timeout: timeout in seconds
def send_numpy(self, data: numpy.ndarray) -> None:
116    def send_numpy(self, data: np.ndarray) -> None:
117        """
118        Serialize a numpy array and send it.
119
120        Args:
121            data: data to send
122        """
123        buffer = data.tobytes()
124        self.send(buffer)

Serialize a numpy array and send it.

Arguments:
  • data: data to send