udpack.core
1import socket 2import json 3 4import numpy as np 5 6 7class UDP: 8 """ 9 UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel. 10 """ 11 12 def __init__(self, recv_addr=None, send_addr=None): 13 """ 14 Initialize UDP Tx and Rx 15 16 Args: 17 recv_addr: address to listen on, None if not receiving (tx only) 18 send_addr: address of target host, None if not sending (rx only) 19 """ 20 self.recv_addr = recv_addr 21 self.send_addr = send_addr 22 23 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 24 25 if self.recv_addr: 26 self._sock.bind(self.recv_addr) 27 print("UDP Rx is initialized:", self.recv_addr) 28 29 if self.send_addr: 30 print("UDP Tx is initialized:", self.send_addr) 31 32 def stop(self) -> None: 33 """ 34 Close the socket. 35 """ 36 self._sock.close() 37 38 def recv(self, bufsize=1024, timeout=None) -> bytes: 39 """ 40 Receive data 41 42 timeout == None: blocking forever 43 timeout == 0: non-blocking (the actual delay is around 0.1s) 44 timeout > 0: blocking for timeout seconds 45 46 Args: 47 bufsize: size of data buffer to receive 48 timeout: timeout in seconds 49 """ 50 if not self.recv_addr: 51 raise ValueError("Cannot receive data without a receive address") 52 self._sock.settimeout(timeout) 53 try: 54 buffer, addr = self._sock.recvfrom(bufsize) 55 except (socket.timeout, BlockingIOError): 56 return None 57 return buffer 58 59 def send(self, buffer: bytes) -> None: 60 """ 61 Send a byte buffer to the target device. 62 63 Args: 64 buffer: data to send 65 """ 66 if not self.send_addr: 67 raise ValueError("Cannot send data without a send address") 68 self._sock.sendto(buffer, self.send_addr) 69 70 def recv_dict(self, bufsize=1024, timeout=None) -> dict: 71 """ 72 Receive data and deserialize it into a python dictionary. 73 74 See `recv()` for more information on timeout. 75 76 Args: 77 bufsize: size of data buffer to receive 78 timeout: timeout in seconds 79 """ 80 buffer = self.recv(bufsize=bufsize, timeout=timeout) 81 if not buffer: 82 return None 83 serialized_data = buffer.decode("utf-8") 84 data = json.loads(serialized_data) 85 return data 86 87 def send_dict(self, data: dict) -> None: 88 """ 89 Serialize a python dictionary and send it. 90 91 Args: 92 data: data to send 93 """ 94 buffer = json.dumps(data) 95 buffer = buffer.encode() 96 self.send(buffer) 97 98 def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray: 99 """ 100 Receive data and deserialize it into a numpy array. 101 102 See `recv()` for more information on timeout. 103 104 Args: 105 bufsize: size of data buffer to receive 106 dtype: numpy data type 107 timeout: timeout in seconds 108 """ 109 buffer = self.recv(bufsize=bufsize, timeout=timeout) 110 if not buffer: 111 return None 112 data = np.frombuffer(buffer, dtype=dtype) 113 return data 114 115 def send_numpy(self, data: np.ndarray) -> None: 116 """ 117 Serialize a numpy array and send it. 118 119 Args: 120 data: data to send 121 """ 122 buffer = data.tobytes() 123 self.send(buffer)
class
UDP:
8class UDP: 9 """ 10 UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel. 11 """ 12 13 def __init__(self, recv_addr=None, send_addr=None): 14 """ 15 Initialize UDP Tx and Rx 16 17 Args: 18 recv_addr: address to listen on, None if not receiving (tx only) 19 send_addr: address of target host, None if not sending (rx only) 20 """ 21 self.recv_addr = recv_addr 22 self.send_addr = send_addr 23 24 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 25 26 if self.recv_addr: 27 self._sock.bind(self.recv_addr) 28 print("UDP Rx is initialized:", self.recv_addr) 29 30 if self.send_addr: 31 print("UDP Tx is initialized:", self.send_addr) 32 33 def stop(self) -> None: 34 """ 35 Close the socket. 36 """ 37 self._sock.close() 38 39 def recv(self, bufsize=1024, timeout=None) -> bytes: 40 """ 41 Receive data 42 43 timeout == None: blocking forever 44 timeout == 0: non-blocking (the actual delay is around 0.1s) 45 timeout > 0: blocking for timeout seconds 46 47 Args: 48 bufsize: size of data buffer to receive 49 timeout: timeout in seconds 50 """ 51 if not self.recv_addr: 52 raise ValueError("Cannot receive data without a receive address") 53 self._sock.settimeout(timeout) 54 try: 55 buffer, addr = self._sock.recvfrom(bufsize) 56 except (socket.timeout, BlockingIOError): 57 return None 58 return buffer 59 60 def send(self, buffer: bytes) -> None: 61 """ 62 Send a byte buffer to the target device. 63 64 Args: 65 buffer: data to send 66 """ 67 if not self.send_addr: 68 raise ValueError("Cannot send data without a send address") 69 self._sock.sendto(buffer, self.send_addr) 70 71 def recv_dict(self, bufsize=1024, timeout=None) -> dict: 72 """ 73 Receive data and deserialize it into a python dictionary. 74 75 See `recv()` for more information on timeout. 76 77 Args: 78 bufsize: size of data buffer to receive 79 timeout: timeout in seconds 80 """ 81 buffer = self.recv(bufsize=bufsize, timeout=timeout) 82 if not buffer: 83 return None 84 serialized_data = buffer.decode("utf-8") 85 data = json.loads(serialized_data) 86 return data 87 88 def send_dict(self, data: dict) -> None: 89 """ 90 Serialize a python dictionary and send it. 91 92 Args: 93 data: data to send 94 """ 95 buffer = json.dumps(data) 96 buffer = buffer.encode() 97 self.send(buffer) 98 99 def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray: 100 """ 101 Receive data and deserialize it into a numpy array. 102 103 See `recv()` for more information on timeout. 104 105 Args: 106 bufsize: size of data buffer to receive 107 dtype: numpy data type 108 timeout: timeout in seconds 109 """ 110 buffer = self.recv(bufsize=bufsize, timeout=timeout) 111 if not buffer: 112 return None 113 data = np.frombuffer(buffer, dtype=dtype) 114 return data 115 116 def send_numpy(self, data: np.ndarray) -> None: 117 """ 118 Serialize a numpy array and send it. 119 120 Args: 121 data: data to send 122 """ 123 buffer = data.tobytes() 124 self.send(buffer)
UDP class for sending and receiving data from a UDP socket as a full duplex communcation channel.
UDP(recv_addr=None, send_addr=None)
13 def __init__(self, recv_addr=None, send_addr=None): 14 """ 15 Initialize UDP Tx and Rx 16 17 Args: 18 recv_addr: address to listen on, None if not receiving (tx only) 19 send_addr: address of target host, None if not sending (rx only) 20 """ 21 self.recv_addr = recv_addr 22 self.send_addr = send_addr 23 24 self._sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 25 26 if self.recv_addr: 27 self._sock.bind(self.recv_addr) 28 print("UDP Rx is initialized:", self.recv_addr) 29 30 if self.send_addr: 31 print("UDP Tx is initialized:", self.send_addr)
Initialize UDP Tx and Rx
Arguments:
- recv_addr: address to listen on, None if not receiving (tx only)
- send_addr: address of target host, None if not sending (rx only)
def
recv(self, bufsize=1024, timeout=None) -> bytes:
39 def recv(self, bufsize=1024, timeout=None) -> bytes: 40 """ 41 Receive data 42 43 timeout == None: blocking forever 44 timeout == 0: non-blocking (the actual delay is around 0.1s) 45 timeout > 0: blocking for timeout seconds 46 47 Args: 48 bufsize: size of data buffer to receive 49 timeout: timeout in seconds 50 """ 51 if not self.recv_addr: 52 raise ValueError("Cannot receive data without a receive address") 53 self._sock.settimeout(timeout) 54 try: 55 buffer, addr = self._sock.recvfrom(bufsize) 56 except (socket.timeout, BlockingIOError): 57 return None 58 return buffer
Receive data
timeout == None: blocking forever timeout == 0: non-blocking (the actual delay is around 0.1s) timeout > 0: blocking for timeout seconds
Arguments:
- bufsize: size of data buffer to receive
- timeout: timeout in seconds
def
send(self, buffer: bytes) -> None:
60 def send(self, buffer: bytes) -> None: 61 """ 62 Send a byte buffer to the target device. 63 64 Args: 65 buffer: data to send 66 """ 67 if not self.send_addr: 68 raise ValueError("Cannot send data without a send address") 69 self._sock.sendto(buffer, self.send_addr)
Send a byte buffer to the target device.
Arguments:
- buffer: data to send
def
recv_dict(self, bufsize=1024, timeout=None) -> dict:
71 def recv_dict(self, bufsize=1024, timeout=None) -> dict: 72 """ 73 Receive data and deserialize it into a python dictionary. 74 75 See `recv()` for more information on timeout. 76 77 Args: 78 bufsize: size of data buffer to receive 79 timeout: timeout in seconds 80 """ 81 buffer = self.recv(bufsize=bufsize, timeout=timeout) 82 if not buffer: 83 return None 84 serialized_data = buffer.decode("utf-8") 85 data = json.loads(serialized_data) 86 return data
Receive data and deserialize it into a python dictionary.
See recv() for more information on timeout.
Arguments:
- bufsize: size of data buffer to receive
- timeout: timeout in seconds
def
send_dict(self, data: dict) -> None:
88 def send_dict(self, data: dict) -> None: 89 """ 90 Serialize a python dictionary and send it. 91 92 Args: 93 data: data to send 94 """ 95 buffer = json.dumps(data) 96 buffer = buffer.encode() 97 self.send(buffer)
Serialize a python dictionary and send it.
Arguments:
- data: data to send
def
recv_numpy( self, bufsize=1024, dtype=<class 'numpy.float32'>, timeout=None) -> numpy.ndarray:
99 def recv_numpy(self, bufsize=1024, dtype=np.float32, timeout=None) -> np.ndarray: 100 """ 101 Receive data and deserialize it into a numpy array. 102 103 See `recv()` for more information on timeout. 104 105 Args: 106 bufsize: size of data buffer to receive 107 dtype: numpy data type 108 timeout: timeout in seconds 109 """ 110 buffer = self.recv(bufsize=bufsize, timeout=timeout) 111 if not buffer: 112 return None 113 data = np.frombuffer(buffer, dtype=dtype) 114 return data
Receive data and deserialize it into a numpy array.
See recv() for more information on timeout.
Arguments:
- bufsize: size of data buffer to receive
- dtype: numpy data type
- timeout: timeout in seconds
def
send_numpy(self, data: numpy.ndarray) -> None:
116 def send_numpy(self, data: np.ndarray) -> None: 117 """ 118 Serialize a numpy array and send it. 119 120 Args: 121 data: data to send 122 """ 123 buffer = data.tobytes() 124 self.send(buffer)
Serialize a numpy array and send it.
Arguments:
- data: data to send