This commit is contained in:
flandre 2025-01-06 11:21:04 +08:00
commit b277145104
29 changed files with 5396 additions and 0 deletions

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.idea
.venv
bak
__pycache__

0
README.md Normal file
View File

24
pyproject.toml Normal file
View File

@ -0,0 +1,24 @@
[project]
name = "flandre"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"aiohttp-cors>=0.7.0",
"aiortc>=1.9.0",
"cupy-cuda12x>=13.3.0",
"opencv-python>=4.10.0.84",
"pyqt6-fluent-widgets[full]>=1.7.4",
"pyqt6>=6.8.0",
"pyzmq>=26.2.0",
"scipy>=1.14.1",
"tqdm>=4.67.1",
"vtk>=9.4.0",
]
[tool.uv]
no-binary-package = ["av"]
override-dependencies = [
"av>=14.0.0",
]

49
src/BusClient.py Normal file
View File

@ -0,0 +1,49 @@
import zmq
from zmq import Context
from Msg import Msg
class BusClient:
fp = 5001
bp = 5002
def __init__(self,
*msgs: type(Msg),
ctx=None,
pub=True,
sub=True,
conflare=False,
poller=False
):
if ctx is None:
self.ctx: Context = zmq.Context()
else:
self.ctx = ctx
if sub:
self.sub = self.ctx.socket(zmq.SUB)
for msg in msgs:
self.sub.setsockopt(zmq.SUBSCRIBE, msg.eid())
if conflare:
self.sub.setsockopt(zmq.CONFLATE, 1)
self.sub.connect(f'tcp://127.0.0.1:{self.bp}')
if poller:
self.poller = zmq.Poller()
self.poller.register(self.sub, zmq.POLLIN)
if pub:
self.pub = self.ctx.socket(zmq.PUB)
self.pub.connect(f'tcp://127.0.0.1:{self.fp}')
def recv(self):
b = self.sub.recv()
return Msg.decode_msg(b)
def send(self, msg: Msg):
return self.pub.send(msg.encode_msg())
async def recv_async(self):
b = await self.sub.recv()
return Msg.decode_msg(b)
async def send_async(self, msg: Msg):
return self.pub.send(msg.encode_msg())

375
src/H264NV.py Normal file
View File

@ -0,0 +1,375 @@
import fractions
import logging
import math
from itertools import tee
from struct import pack, unpack_from
from typing import Iterator, List, Optional, Sequence, Tuple, Type, TypeVar
import av
from aiortc.jitterbuffer import JitterFrame
from aiortc.mediastreams import VIDEO_TIME_BASE, convert_timebase
from aiortc.codecs.base import Decoder, Encoder
from av.frame import Frame
from av.packet import Packet
logger = logging.getLogger(__name__)
gpu = '0'
DEFAULT_BITRATE = 1000000 # 1 Mbps
MIN_BITRATE = 500000 # 500 kbps
# MAX_BITRATE = 3000000 # 3 Mbps
MAX_BITRATE = 10000000 # 3 Mbps
MAX_FRAME_RATE = 120
PACKET_MAX = 1300
NAL_TYPE_FU_A = 28
NAL_TYPE_STAP_A = 24
NAL_HEADER_SIZE = 1
FU_A_HEADER_SIZE = 2
LENGTH_FIELD_SIZE = 2
STAP_A_HEADER_SIZE = NAL_HEADER_SIZE + LENGTH_FIELD_SIZE
DESCRIPTOR_T = TypeVar("DESCRIPTOR_T", bound="H264PayloadDescriptor")
T = TypeVar("T")
def pairwise(iterable: Sequence[T]) -> Iterator[Tuple[T, T]]:
a, b = tee(iterable)
next(b, None)
return zip(a, b)
class H264PayloadDescriptor:
def __init__(self, first_fragment):
self.first_fragment = first_fragment
def __repr__(self):
return f"H264PayloadDescriptor(FF={self.first_fragment})"
@classmethod
def parse(cls: Type[DESCRIPTOR_T], data: bytes) -> Tuple[DESCRIPTOR_T, bytes]:
output = bytes()
# NAL unit header
if len(data) < 2:
raise ValueError("NAL unit is too short")
nal_type = data[0] & 0x1F
f_nri = data[0] & (0x80 | 0x60)
pos = NAL_HEADER_SIZE
if nal_type in range(1, 24):
# single NAL unit
output = bytes([0, 0, 0, 1]) + data
obj = cls(first_fragment=True)
elif nal_type == NAL_TYPE_FU_A:
# fragmentation unit
original_nal_type = data[pos] & 0x1F
first_fragment = bool(data[pos] & 0x80)
pos += 1
if first_fragment:
original_nal_header = bytes([f_nri | original_nal_type])
output += bytes([0, 0, 0, 1])
output += original_nal_header
output += data[pos:]
obj = cls(first_fragment=first_fragment)
elif nal_type == NAL_TYPE_STAP_A:
# single time aggregation packet
offsets = []
while pos < len(data):
if len(data) < pos + LENGTH_FIELD_SIZE:
raise ValueError("STAP-A length field is truncated")
nalu_size = unpack_from("!H", data, pos)[0]
pos += LENGTH_FIELD_SIZE
offsets.append(pos)
pos += nalu_size
if len(data) < pos:
raise ValueError("STAP-A data is truncated")
offsets.append(len(data) + LENGTH_FIELD_SIZE)
for start, end in pairwise(offsets):
end -= LENGTH_FIELD_SIZE
output += bytes([0, 0, 0, 1])
output += data[start:end]
obj = cls(first_fragment=True)
else:
raise ValueError(f"NAL unit type {nal_type} is not supported")
return obj, output
class H264Decoder(Decoder):
def __init__(self) -> None:
self.codec = av.CodecContext.create("h264", "r")
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
try:
packet = av.Packet(encoded_frame.data)
packet.pts = encoded_frame.timestamp
packet.time_base = VIDEO_TIME_BASE
frames = self.codec.decode(packet)
except av.AVError as e:
logger.warning(
"H264Decoder() failed to decode, skipping package: " + str(e)
)
return []
return frames
assert "h264_nvenc" in av.codecs_available
def create_encoder_context(
codec_name: str, width: int, height: int, bitrate: int
) -> Tuple[av.CodecContext, bool]:
# codec = av.CodecContext.create('libx264', "w")
codec = av.CodecContext.create('h264_nvenc', "w")
codec.width = width
codec.height = height
codec.bit_rate = bitrate
# codec.bit_rate = 6000000
codec.pix_fmt = "yuv420p"
codec.framerate = fractions.Fraction(MAX_FRAME_RATE, 1)
codec.time_base = fractions.Fraction(1, MAX_FRAME_RATE)
codec.options = dict(
# delay='0',
# crf='1',
# profile="baseline",
# level="31",
# tune="zerolatency", # does nothing using h264_omx
gpu=gpu,
preset='fast',
)
# codec.options = {
# "profile": "baseline",
# "level": "31",
# "tune": "zerolatency", # does nothing using h264_omx
# }
codec.open()
return codec, True
return codec, codec_name == "h264_omx"
class H264NVENCEncoder(Encoder):
def __init__(self) -> None:
self.buffer_data = b""
self.buffer_pts: Optional[int] = None
self.codec: Optional[av.CodecContext] = None
self.codec_buffering = False
self.__target_bitrate = MAX_BITRATE
@staticmethod
def _packetize_fu_a(data: bytes) -> List[bytes]:
available_size = PACKET_MAX - FU_A_HEADER_SIZE
payload_size = len(data) - NAL_HEADER_SIZE
num_packets = math.ceil(payload_size / available_size)
num_larger_packets = payload_size % num_packets
package_size = payload_size // num_packets
f_nri = data[0] & (0x80 | 0x60) # fni of original header
nal = data[0] & 0x1F
fu_indicator = f_nri | NAL_TYPE_FU_A
fu_header_end = bytes([fu_indicator, nal | 0x40])
fu_header_middle = bytes([fu_indicator, nal])
fu_header_start = bytes([fu_indicator, nal | 0x80])
fu_header = fu_header_start
packages = []
offset = NAL_HEADER_SIZE
while offset < len(data):
if num_larger_packets > 0:
num_larger_packets -= 1
payload = data[offset: offset + package_size + 1]
offset += package_size + 1
else:
payload = data[offset: offset + package_size]
offset += package_size
if offset == len(data):
fu_header = fu_header_end
packages.append(fu_header + payload)
fu_header = fu_header_middle
assert offset == len(data), "incorrect fragment data"
return packages
@staticmethod
def _packetize_stap_a(
data: bytes, packages_iterator: Iterator[bytes]
) -> Tuple[bytes, bytes]:
counter = 0
available_size = PACKET_MAX - STAP_A_HEADER_SIZE
stap_header = NAL_TYPE_STAP_A | (data[0] & 0xE0)
payload = bytes()
try:
nalu = data # with header
while len(nalu) <= available_size and counter < 9:
stap_header |= nalu[0] & 0x80
nri = nalu[0] & 0x60
if stap_header & 0x60 < nri:
stap_header = stap_header & 0x9F | nri
available_size -= LENGTH_FIELD_SIZE + len(nalu)
counter += 1
payload += pack("!H", len(nalu)) + nalu
nalu = next(packages_iterator)
if counter == 0:
nalu = next(packages_iterator)
except StopIteration:
nalu = None
if counter <= 1:
return data, nalu
else:
return bytes([stap_header]) + payload, nalu
@staticmethod
def _split_bitstream(buf: bytes) -> Iterator[bytes]:
# Translated from: https://github.com/aizvorski/h264bitstream/blob/master/h264_nal.c#L134
i = 0
while True:
# Find the start of the NAL unit.
#
# NAL Units start with the 3-byte start code 0x000001 or
# the 4-byte start code 0x00000001.
i = buf.find(b"\x00\x00\x01", i)
if i == -1:
return
# Jump past the start code
i += 3
nal_start = i
# Find the end of the NAL unit (end of buffer OR next start code)
i = buf.find(b"\x00\x00\x01", i)
if i == -1:
yield buf[nal_start: len(buf)]
return
elif buf[i - 1] == 0:
# 4-byte start code case, jump back one byte
yield buf[nal_start: i - 1]
else:
yield buf[nal_start:i]
@classmethod
def _packetize(cls, packages: Iterator[bytes]) -> List[bytes]:
packetized_packages = []
packages_iterator = iter(packages)
package = next(packages_iterator, None)
while package is not None:
if len(package) > PACKET_MAX:
packetized_packages.extend(cls._packetize_fu_a(package))
package = next(packages_iterator, None)
else:
packetized, package = cls._packetize_stap_a(package, packages_iterator)
packetized_packages.append(packetized)
return packetized_packages
def _encode_frame(
self, frame: av.VideoFrame, force_keyframe: bool
) -> Iterator[bytes]:
if self.codec and (
frame.width != self.codec.width
or frame.height != self.codec.height
# we only adjust bitrate if it changes by over 10%
or abs(self.target_bitrate - self.codec.bit_rate) / self.codec.bit_rate
> 0.1
):
self.buffer_data = b""
self.buffer_pts = None
self.codec = None
if force_keyframe:
# force a complete image
frame.pict_type = av.video.frame.PictureType.I
else:
# reset the picture type, otherwise no B-frames are produced
frame.pict_type = av.video.frame.PictureType.NONE
# print(self.codec)
if self.codec is None:
self.codec, self.codec_buffering = create_encoder_context(
"h264_nvenc",
frame.width,
frame.height,
bitrate=self.target_bitrate,
)
# try:
# self.codec, self.codec_buffering = create_encoder_context(
# "h264_omx", frame.width, frame.height, bitrate=self.target_bitrate
# )
# except Exception:
# self.codec, self.codec_buffering = create_encoder_context(
# "h264_nvenc",
# frame.width,
# frame.height,
# bitrate=self.target_bitrate,
# )
data_to_send = b""
for package in self.codec.encode(frame):
package_bytes = bytes(package)
if self.codec_buffering:
# delay sending to ensure we accumulate all packages
# for a given PTS
if package.pts == self.buffer_pts:
self.buffer_data += package_bytes
else:
data_to_send += self.buffer_data
self.buffer_data = package_bytes
self.buffer_pts = package.pts
else:
data_to_send += package_bytes
if data_to_send:
yield from self._split_bitstream(data_to_send)
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, av.VideoFrame)
packages = self._encode_frame(frame, force_keyframe)
timestamp = convert_timebase(frame.pts, frame.time_base, VIDEO_TIME_BASE)
return self._packetize(packages), timestamp
def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
assert isinstance(packet, av.Packet)
packages = self._split_bitstream(bytes(packet))
timestamp = convert_timebase(packet.pts, packet.time_base, VIDEO_TIME_BASE)
return self._packetize(packages), timestamp
@property
def target_bitrate(self) -> int:
"""
Target bitrate in bits per second.
"""
return self.__target_bitrate
@target_bitrate.setter
def target_bitrate(self, bitrate: int) -> None:
# print(bitrate)
return
bitrate = max(MIN_BITRATE, min(bitrate, MAX_BITRATE))
self.__target_bitrate = bitrate
def h264_depayload(payload: bytes) -> bytes:
descriptor, data = H264PayloadDescriptor.parse(payload)
return data

101
src/Msg.py Normal file
View File

@ -0,0 +1,101 @@
import dataclasses
import json
import struct
import time
from enum import auto, Enum
class BG(Enum):
Msg1 = auto()
Msg2 = auto()
BMMsg = auto()
TickMsg = auto()
KillMsg = auto()
StrMsg = auto()
MoveAxisMsg = auto()
ImageArgMsg = auto()
class Msg:
@classmethod
def decode_base(clz, data: bytes) -> 'Msg':
# c = clz()
# c.__dict__.update(json.loads(data.decode()))
# return c
return clz(**json.loads(data.decode()))
def encode(self) -> bytes:
return json.dumps(self.__dict__).encode()
@classmethod
def decode(self, data: bytes) -> 'Msg':
return self.decode_base(data)
@staticmethod
def decode_msg(msg):
eid = struct.unpack('I', msg[:4])[0]
class_: 'Msg' = globals()[BG(eid).name]
return class_.decode(msg[4:])
@classmethod
def eid(cls):
return struct.pack('I', BG[cls.__name__].value)
def encode_msg(self):
return self.eid() + self.encode()
@dataclasses.dataclass
class Msg1(Msg):
a: int = 0
b: int = 1
@dataclasses.dataclass
class KillMsg(Msg):
name: str = ''
@dataclasses.dataclass
class Msg2(Msg):
a: int = 2
b: int = 2
@dataclasses.dataclass
class TickMsg(Msg):
time: float = 0
@dataclasses.dataclass
class StrMsg(Msg):
value: str = ''
@dataclasses.dataclass
class MoveAxisMsg(Msg):
axis: str
value: int
@dataclasses.dataclass
class ImageArgMsg(Msg):
sender: str
v1: int
class BMMsg(Msg):
def __init__(self, t: int, data: bytes):
self.data = data
self.t = t
def encode(self) -> bytes:
return struct.pack('I', self.t) + self.data
@classmethod
def decode(clz, data: bytes) -> 'Msg':
return clz(
struct.unpack('I', data[:4])[0],
data[4:]
)

69
src/RfFile.py Normal file
View File

@ -0,0 +1,69 @@
from pathlib import Path
from attr import dataclass
@dataclass
class RfFolder:
parent: Path
D: str
L: int = 30
C: str = 'PAR'
@staticmethod
def from_path(p: Path | str) -> 'RfFolder':
p = Path(p)
D, L, C = p.name.split(',')
L = int(L.replace('L=', ''))
C = C.replace('C=', '')
return RfFolder(p.parent, D, L, C)
@property
def name(self):
return f'{self.D},L={self.L},C={self.C}'
@property
def path(self):
return self.parent / self.name
@property
def all(self):
return [RfFile.from_path(f) for f in self.path.glob('*.bin')]
@dataclass
class RfFile:
folder: RfFolder
X: int = None
Y: int = None
Z: int = None
S: int = None
E: int = None
@staticmethod
def from_path(p: Path | str) -> 'RfFile':
p = Path(p)
folder = RfFolder.from_path(p.parent)
return RfFile(folder, **{(c := pair.split('='))[0]: int(c[1]) for pair in p.stem.split(',')})
@property
def path(self):
arr = []
d = self.__dict__.copy()
# print(d)
d.__delitem__('folder')
for k in d:
if d[k] is not None:
arr.append(f'{k}={d[k]}')
filename = ','.join(arr) + '.bin'
# print(filename)
return self.folder.path / filename
def test2():
r = RfFile.from_path('/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/TEST1,L=30,C=PAR/S=925,E=4.bin')
print(r)
if __name__ == '__main__':
test2()

3
src/config.py Normal file
View File

@ -0,0 +1,3 @@
socket1 = '127.0.0.1:5003'
video_height = 1920
video_width = 1080

50
src/nodes/Beamformer.py Normal file
View File

@ -0,0 +1,50 @@
import cv2
import numpy as np
import zmq
import cupy as cp
from Msg import BMMsg, ImageArgMsg, KillMsg
from config import socket1, video_width, video_height
from nodes.Node import Node
class Beamformer(Node):
topics = [ImageArgMsg]
def __init__(self):
super(Beamformer, self).__init__()
self.arg = ImageArgMsg('', 1400)
def process(self, data: bytes):
if data is None:
return
b = np.frombuffer(data, dtype=np.int16)
b = b.reshape((256, 1502)).astype(np.float32)
b = b[:, :self.arg.v1]
b -= b.min()
b /= b.max()
b *= 255
b = b.astype(np.uint8)
b = cv2.rotate(b, cv2.ROTATE_90_CLOCKWISE)
b = cv2.resize(b, (video_width, video_height))
b = cv2.cvtColor(b, cv2.COLOR_GRAY2RGBA, b)
self.send(BMMsg(0, b.tobytes()))
def loop(self):
s2 = self.context.socket(zmq.PULL)
s2.setsockopt(zmq.CONFLATE, 1)
s2.connect(f"tcp://{socket1}")
self.c.poller.register(s2, zmq.POLLIN)
buffer = None
while True:
socks = dict(self.c.poller.poll())
if s2 in socks and socks[s2] == zmq.POLLIN:
buffer = s2.recv()
if self.c.sub in socks and socks[self.c.sub] == zmq.POLLIN:
r = self.recv()
if isinstance(r, KillMsg):
if r.name == '':
return
if isinstance(r, ImageArgMsg):
self.arg = r
self.process(buffer)

26
src/nodes/Broker.py Normal file
View File

@ -0,0 +1,26 @@
import threading
import zmq
from zmq import ContextTerminated
from nodes.Node import Node
from Msg import KillMsg
class Broker(Node):
def loop(self):
def t():
while True:
r:KillMsg = self.recv()
if r.name == '':
self.context.term()
break
threading.Thread(target=t, daemon=True).start()
frontend = self.context.socket(zmq.XSUB)
backend = self.context.socket(zmq.XPUB)
frontend.bind(f"tcp://*:{self.fp}")
backend.bind(f"tcp://*:{self.bp}")
try:
zmq.proxy(frontend, backend)
except ContextTerminated as e:
return

View File

@ -0,0 +1,63 @@
import logging
import threading
from abc import abstractmethod
import zmq
from zmq import Context, Socket
from clients.Msg import Msg, KillMsg
class BusClient:
fp = 5001
bp = 5002
topics = [KillMsg.eid()]
def __init__(self, enable_init=True):
self.context: Context = None
self.pub_socket: Socket = None
self.sub_socket: Socket = None
self.enable_init = enable_init
self.isalive = True
# def recv(self):
# return self.sub_socket.recv()
#
# def send(self, msg):
# return self.pub_socket.send(msg)
def recv(self):
b = self.sub_socket.recv()
return Msg.decode_msg(b)
def send(self, msg: Msg):
return self.pub_socket.send(msg.encode_msg())
@abstractmethod
def loop(self):
pass
def __call__(self, *args, **kwargs):
logging.basicConfig(level=logging.INFO)
self.context = zmq.Context()
if self.enable_init:
self.pub_socket = self.context.socket(zmq.PUB)
self.pub_socket.connect(f"tcp://127.0.0.1:{self.fp}")
self.sub_socket = self.context.socket(zmq.SUB)
self.sub_socket.connect(f"tcp://127.0.0.1:{self.bp}")
if not self.topics:
self.sub_socket.setsockopt(zmq.SUBSCRIBE, b'')
else:
for topic in self.topics:
print(f"{self.__class__.__name__} Subscribing to topic {topic}")
self.sub_socket.setsockopt(zmq.SUBSCRIBE, topic)
self.loop()
print(self.__class__.__name__, 'exiting')
@classmethod
def sub(cls, ctx, *msgs: type(Msg)):
s = ctx.socket(zmq.SUB)
for msg in msgs:
s.setsockopt(zmq.SUBSCRIBE, msg.eid())
s.connect(f'tcp://127.0.0.1:{cls.bp}')
return s

34
src/nodes/ImageCV.py Normal file
View File

@ -0,0 +1,34 @@
import sys
import cv2
import numpy as np
import zmq
from PyQt6.QtWidgets import QMainWindow, QApplication
from Image import Ui_MainWindow
from RfFile import RfFolder
from Msg import StrMsg, MoveAxisMsg, KillMsg, Msg, ImageArgMsg, BMMsg
from config import video_height, video_width
from nodes.Node import Node
class ImageCV(Node):
topics = [BMMsg]
def __init__(self):
super().__init__()
self.buffer = np.zeros((video_height, video_width, 3), dtype=np.uint8) + 128
def loop(self):
cv2.namedWindow('image', cv2.WINDOW_NORMAL)
while True:
socks = dict(self.c.poller.poll(0.001))
if self.c.sub in socks and socks[self.c.sub] == zmq.POLLIN:
r = self.recv()
if isinstance(r, BMMsg):
b = np.frombuffer(r.data, dtype=np.uint8)
b = np.reshape(b, (video_height, video_width, 4))
self.buffer = b
cv2.imshow('image', self.buffer)
cv2.waitKey(1)

58
src/nodes/ImageQt.py Normal file
View File

@ -0,0 +1,58 @@
import sys
import numpy as np
import zmq
from PyQt6 import QtCore
from PyQt6.QtCore import QByteArray
from PyQt6.QtGui import QImage, QPixmap
from PyQt6.QtWidgets import QMainWindow, QApplication
from Image import Ui_MainWindow
from RfFile import RfFolder
from Msg import StrMsg, MoveAxisMsg, KillMsg, Msg, ImageArgMsg, BMMsg
from ZMQReceiver import ZMQReceiver
from config import video_height, video_width
from nodes.Node import Node
class Adv(QMainWindow, Ui_MainWindow):
def __init__(self, p: Node, parent=None):
super(Adv, self).__init__(parent)
self.p = p
self.setupUi(self)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.zmq_event.connect(self.on_zmq_event)
zmq_receiver.start()
def on_zmq_event(self, msg: QByteArray):
msg = Msg.decode_msg(msg.data())
if isinstance(msg, KillMsg):
if msg.name == '':
self.close()
elif isinstance(msg, BMMsg):
# cvImg = np.frombuffer(msg.data, dtype=np.uint8).reshape((video_height, video_width, 4))[:, :, :3].copy()
# height, width, channel = cvImg.shape
# bytesPerLine = 3 * width
# qImg = QImage(cvImg.data, width, height, bytesPerLine, QImage.Format.Format_RGB888).rgbSwapped()
qImg = QImage(msg.data, video_width, video_height, 4 * video_width, QImage.Format.Format_RGB888).rgbSwapped()
self.label.setPixmap(QPixmap(qImg))
@QtCore.pyqtSlot(int)
def vc(self, v):
if self.horizontalSlider.sender() is None:
self.p.send(ImageArgMsg('ui', v))
class ImageQt(Node):
topics = [BMMsg]
def __init__(self):
super().__init__()
def loop(self):
app = QApplication(sys.argv)
MainWindow = Adv(self)
# MainWindow.move(int(px), int(py))
# MainWindow.resize(int(sx), int(sy))
MainWindow.show()
app.exec()

27
src/nodes/Loader.py Normal file
View File

@ -0,0 +1,27 @@
import numpy as np
import zmq
from RfFile import RfFolder
from Msg import StrMsg, MoveAxisMsg, KillMsg
from nodes.Node import Node
class Loader(Node):
topics = [MoveAxisMsg]
def loop(self):
s2 = self.context.socket(zmq.PUSH)
s2.bind("tcp://*:5003")
rff = RfFolder.from_path('/run/media/lambda/b86dccdc-f134-464b-a310-6575ee9ae85c/cap4/trim/R1,L=30,C=PAR')
all_files = rff.all
while True:
r = self.recv()
if isinstance(r, MoveAxisMsg):
for f in all_files:
if f.S == r.value:
# s2.send(np.zeros((256, 1500), dtype=np.uint16).tobytes())
s2.send(f.path.read_bytes())
elif isinstance(r, KillMsg):
if r.name == '':
break

52
src/nodes/MainUI.py Normal file
View File

@ -0,0 +1,52 @@
import sys
import numpy as np
import zmq
from PyQt6 import QtCore
from PyQt6.QtCore import QByteArray
from PyQt6.QtWidgets import QMainWindow, QApplication
from Main import Ui_MainWindow
from RfFile import RfFolder
from Msg import StrMsg, MoveAxisMsg, KillMsg, Msg, ImageArgMsg
from ZMQReceiver import ZMQReceiver
from nodes.Node import Node
class Adv(QMainWindow, Ui_MainWindow):
def __init__(self, p: Node, parent=None):
super(Adv, self).__init__(parent)
self.p = p
self.setupUi(self)
zmq_receiver = ZMQReceiver(self)
zmq_receiver.zmq_event.connect(self.on_zmq_event)
zmq_receiver.start()
self.horizontalSlider.valueChanged.connect(self.vc)
def on_zmq_event(self, msg: QByteArray):
msg = Msg.decode_msg(msg.data())
if isinstance(msg, KillMsg):
if msg.name == '':
self.close()
elif isinstance(msg, ImageArgMsg):
self.horizontalSlider.setValue(msg.v1)
@QtCore.pyqtSlot(int)
def vc(self, v):
if self.horizontalSlider.sender() is None:
self.p.send(ImageArgMsg('ui', v))
class MainUI(Node):
topics = [ImageArgMsg]
def __init__(self):
super().__init__()
def loop(self):
app = QApplication(sys.argv)
MainWindow = Adv(self)
# MainWindow.move(int(px), int(py))
# MainWindow.resize(int(sx), int(sy))
MainWindow.show()
app.exec()

35
src/nodes/Node.py Normal file
View File

@ -0,0 +1,35 @@
import logging
from abc import abstractmethod
import zmq
from BusClient import BusClient
from Msg import Msg, KillMsg
class Node:
fp = BusClient.fp
bp = BusClient.bp
topics = []
def __init__(self, enable_init=True):
self.enable_init = enable_init
self.isalive = True
def recv(self):
return self.c.recv()
def send(self, msg: Msg):
return self.c.send(msg)
@abstractmethod
def loop(self):
pass
def __call__(self, *args, **kwargs):
logging.basicConfig(level=logging.INFO)
self.context = zmq.Context()
if self.enable_init:
self.c = BusClient(*([KillMsg] + self.topics), poller=True)
self.loop()
print(self.__class__.__name__, 'exiting')

198
src/nodes/WebRTC.py Normal file
View File

@ -0,0 +1,198 @@
import asyncio
import json
import os
import zmq.asyncio
import time
from fractions import Fraction
import aiohttp
import aiohttp_cors
import aiortc.rtcrtpsender
import cv2
import numpy as np
import zmq
from aiohttp import web, WSMessage
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription, RTCConfiguration, RTCRtpCodecCapability
from av import VideoFrame
import H264NV
from BusClient import BusClient
from Msg import BMMsg, Msg, TickMsg, KillMsg, StrMsg, MoveAxisMsg, ImageArgMsg
from config import video_width, video_height
ROOT = os.path.dirname(__file__)
from nodes.Node import Node
web.WebSocketResponse()
def generate_placeholder():
z = np.zeros((video_height, video_width, 4), dtype=np.uint8)
z[:, :, 3] = 0
z[:, :, :3] = 128
return z.tobytes()
class WebRTC(Node):
# def __init__(self):
# super().__init__(False)
def loop(self):
self1 = self
placeholder = generate_placeholder()
class ZMQStreamTrack(MediaStreamTrack):
def __init__(self):
super().__init__()
self.c = BusClient(BMMsg, pub=False, conflare=True, poller=True)
self.kind = 'video'
self.pts = 0
self.buffer = placeholder
async def recv(self):
events = self.c.poller.poll(int(1000 / 60))
if events:
msg: BMMsg = Msg.decode_msg(events[0][0].recv())
self.buffer = msg.data
frame = VideoFrame.from_bytes(self.buffer, video_width, video_height)
frame.time_base = Fraction(1, 60)
frame.pts = self.pts
self.pts += 1
return frame
pcs = set()
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection(RTCConfiguration([]))
pcs.add(pc)
rc = pc.addTransceiver(ZMQStreamTrack(), 'sendonly')
rc.setCodecPreferences([RTCRtpCodecCapability(mimeType='video/H264',
clockRate=90000,
channels=None,
parameters={
'level-asymmetry-allowed': '1',
'packetization-mode': '1',
'profile-level-id': '42e01f'
})])
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
),
)
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
async def t3(ws, name):
# s = BusClient.sub(zmq.asyncio.Context(), KillMsg, TickMsg)
c = BusClient(KillMsg, ImageArgMsg, ctx=zmq.asyncio.Context(), pub=False)
while True:
msg = await c.recv_async()
if isinstance(msg, KillMsg):
if msg.name == name:
break
if not ws.closed:
if isinstance(msg, ImageArgMsg):
if name != msg.sender:
dc = msg.__dict__.copy()
del dc['sender']
await ws.send_str(json.dumps(dict(
type='Arg', value=dc
)))
async def t4(ws, name):
c = BusClient(ctx=zmq.asyncio.Context(), sub=False)
async for msg in ws:
msg: WSMessage = msg
if msg.type == aiohttp.WSMsgType.TEXT:
ss = msg.data
# await c.send_async(StrMsg(ss))
j = json.loads(ss)
v = j["value"]
match j["type"]:
case "Move":
await c.send_async(MoveAxisMsg('S', v['s']))
case "Arg":
await c.send_async(ImageArgMsg(sender=name, **v))
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f'ws connection closed with exception', ws.exception())
await c.send_async(KillMsg(name))
async def handle_post(request):
print(request)
# try:
# # Parse the JSON body from the request
# data = await request.json()
# print("Received data:", data)
#
# # Process the data (this is just a placeholder)
# response_data = {'status': 'success', 'received': data}
#
# # Return a JSON response
# return web.json_response(response_data, status=200)
# except Exception as e:
# # Handle errors and return a bad request response
# return web.json_response({'error': str(e)}, status=400)
return web.Response()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
name = str(time.time())
task1 = asyncio.create_task(t3(ws, name))
task2 = asyncio.create_task(t4(ws, name))
await asyncio.gather(task1, task2)
# await task1
# await task2
# for i in range(10):
# await ws.send_str('asdasd')
# await asyncio.sleep(0.1)
return ws
f0 = aiortc.RTCRtpSender.__init__
def f1(*args, **kwargs):
f0(*args, **kwargs)
if not args[1] == 'audio':
args[0]._RTCRtpSender__encoder = H264NV.H264NVENCEncoder()
aiortc.RTCRtpSender.__init__ = f1
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_post("/offer", offer)
app.router.add_post('/d', handle_post) # Define the POST route
app.add_routes([web.get('/ws', websocket_handler)])
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*"
)
})
for route in list(app.router.routes()):
cors.add(route)
web.run_app(
app, access_log=None, host='0.0.0.0', port=8081
)

103
src/rtcserver.py Normal file
View File

@ -0,0 +1,103 @@
import asyncio
import json
import logging
import os
from fractions import Fraction
import aiohttp_cors
import aiortc.rtcrtpsender
import zmq
from aiohttp import web
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription, RTCConfiguration, RTCRtpCodecCapability
from av import VideoFrame
import H264NV
ROOT = os.path.dirname(__file__)
class PlayerStreamTrackx(MediaStreamTrack):
def __init__(self):
super().__init__()
context = zmq.Context()
self.socket = context.socket(zmq.PULL)
self.socket.bind("tcp://*:5555")
self.kind = 'video'
self.pts = 0
async def recv(self):
frame = VideoFrame.from_bytes(self.socket.recv(), 1920, 1080)
frame.time_base = Fraction(1, 120)
frame.pts = self.pts
self.pts += 1
return frame
pcs = set()
px = PlayerStreamTrackx()
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection(RTCConfiguration([]))
# print(request.remote)
pcs.add(pc)
rc = pc.addTransceiver(px, 'sendonly')
rc.setCodecPreferences([RTCRtpCodecCapability(mimeType='video/H264',
clockRate=90000,
channels=None,
parameters={
'level-asymmetry-allowed': '1',
'packetization-mode': '1',
'profile-level-id': '42e01f'
})])
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
),
)
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
if __name__ == "__main__":
f0 = aiortc.RTCRtpSender.__init__
def f1(*args, **kwargs):
f0(*args, **kwargs)
if not args[1] == 'audio':
args[0]._RTCRtpSender__encoder = H264NV.H264NVENCEncoder()
aiortc.RTCRtpSender.__init__ = f1
logging.basicConfig(level=logging.INFO)
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_post("/offer", offer)
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*"
)
})
for route in list(app.router.routes()):
cors.add(route)
web.run_app(
app, access_log=None, host='0.0.0.0', port=8081
)

38
src/ui/Image.py Normal file
View File

@ -0,0 +1,38 @@
# Form implementation generated from reading ui file 'Image.ui'
#
# Created by: PyQt6 UI code generator 6.8.0
#
# WARNING: Any manual changes made to this file will be lost when pyuic6 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt6 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(800, 600)
self.centralwidget = QtWidgets.QWidget(parent=MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.label = QtWidgets.QLabel(parent=self.centralwidget)
self.label.setObjectName("label")
self.gridLayout.addWidget(self.label, 0, 0, 1, 1)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(parent=MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 800, 22))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(parent=MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))
self.label.setText(_translate("MainWindow", "TextLabel"))

41
src/ui/Image.ui Normal file
View File

@ -0,0 +1,41 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MainWindow</class>
<widget class="QMainWindow" name="MainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>800</width>
<height>600</height>
</rect>
</property>
<property name="windowTitle">
<string>MainWindow</string>
</property>
<widget class="QWidget" name="centralwidget">
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QLabel" name="label">
<property name="text">
<string>TextLabel</string>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QMenuBar" name="menubar">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>800</width>
<height>22</height>
</rect>
</property>
</widget>
<widget class="QStatusBar" name="statusbar"/>
</widget>
<resources/>
<connections/>
</ui>

40
src/ui/Main.py Normal file
View File

@ -0,0 +1,40 @@
# Form implementation generated from reading ui file 'Main.ui'
#
# Created by: PyQt6 UI code generator 6.8.0
#
# WARNING: Any manual changes made to this file will be lost when pyuic6 is
# run again. Do not edit this file unless you know what you are doing.
from PyQt6 import QtCore, QtGui, QtWidgets
class Ui_MainWindow(object):
def setupUi(self, MainWindow):
MainWindow.setObjectName("MainWindow")
MainWindow.resize(457, 205)
self.centralwidget = QtWidgets.QWidget(parent=MainWindow)
self.centralwidget.setObjectName("centralwidget")
self.gridLayout = QtWidgets.QGridLayout(self.centralwidget)
self.gridLayout.setObjectName("gridLayout")
self.horizontalSlider = QtWidgets.QSlider(parent=self.centralwidget)
self.horizontalSlider.setMinimum(1)
self.horizontalSlider.setMaximum(1500)
self.horizontalSlider.setOrientation(QtCore.Qt.Orientation.Horizontal)
self.horizontalSlider.setObjectName("horizontalSlider")
self.gridLayout.addWidget(self.horizontalSlider, 0, 0, 1, 1)
MainWindow.setCentralWidget(self.centralwidget)
self.menubar = QtWidgets.QMenuBar(parent=MainWindow)
self.menubar.setGeometry(QtCore.QRect(0, 0, 457, 22))
self.menubar.setObjectName("menubar")
MainWindow.setMenuBar(self.menubar)
self.statusbar = QtWidgets.QStatusBar(parent=MainWindow)
self.statusbar.setObjectName("statusbar")
MainWindow.setStatusBar(self.statusbar)
self.retranslateUi(MainWindow)
QtCore.QMetaObject.connectSlotsByName(MainWindow)
def retranslateUi(self, MainWindow):
_translate = QtCore.QCoreApplication.translate
MainWindow.setWindowTitle(_translate("MainWindow", "MainWindow"))

47
src/ui/Main.ui Normal file
View File

@ -0,0 +1,47 @@
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>MainWindow</class>
<widget class="QMainWindow" name="MainWindow">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>457</width>
<height>205</height>
</rect>
</property>
<property name="windowTitle">
<string>MainWindow</string>
</property>
<widget class="QWidget" name="centralwidget">
<layout class="QGridLayout" name="gridLayout">
<item row="0" column="0">
<widget class="QSlider" name="horizontalSlider">
<property name="minimum">
<number>1</number>
</property>
<property name="maximum">
<number>1500</number>
</property>
<property name="orientation">
<enum>Qt::Horizontal</enum>
</property>
</widget>
</item>
</layout>
</widget>
<widget class="QMenuBar" name="menubar">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>457</width>
<height>22</height>
</rect>
</property>
</widget>
<widget class="QStatusBar" name="statusbar"/>
</widget>
<resources/>
<connections/>
</ui>

18
src/ui/ZMQReceiver.py Normal file
View File

@ -0,0 +1,18 @@
import threading
from PyQt6 import QtCore
from nodes.Node import Node
class ZMQReceiver(QtCore.QObject):
zmq_event = QtCore.pyqtSignal('QByteArray')
def start(self):
threading.Thread(target=self._execute, daemon=True).start()
def _execute(self):
node: Node = self.parent().p
while True:
msg = node.recv()
self.zmq_event.emit(msg.encode_msg())

11
test/kill.py Normal file
View File

@ -0,0 +1,11 @@
import time
from BusClient import BusClient
from Msg import KillMsg
if __name__ == '__main__':
c = BusClient()
time.sleep(1)
for i in range(100):
c.send(KillMsg())

75
test/main.py Normal file
View File

@ -0,0 +1,75 @@
import base64
import threading
import time
from pathlib import Path
import cv2
import numpy as np
from tqdm import tqdm
import zmq
if __name__ == '__main__':
# r1 = base64.b64encode(Path('/home/lambda/Pictures/drawing.png').read_bytes()).decode()
# r1 = base64.b64encode(Path('/home/lambda/Pictures/remilia3.png').read_bytes()).decode()
# requests.post('http://localhost:12345/sendpic/test', json=dict(value='data:image/png;base64, ' + r1))
# for p in tqdm(Path('/home/lambda/Videos/pngs/New Folder/').glob('*.png')):
# r1 = base64.b64encode(p.read_bytes()).decode()
# requests.post('http://localhost:12345/sendpic/test', json=dict(value='data:image/png;base64, ' + r1))
# arr = [cv2.imread(str(img)).tobytes() for img in Path('/home/lambda/Videos/pngs/New Folder/').glob('*.png')]
arr = []
for img in tqdm(list(Path('/home/lambda/Videos/pngs/New Folder/').glob('*.png'))):
img = cv2.imread(str(img))
# img = cv2.resize(img, (1920 // 2, 1080 // 2))
img = img.reshape(1080, 1920, 3)
z = np.zeros((1080, 1920, 4), dtype=np.uint8)
z[:, :, :3] = img
img = z
img = cv2.cvtColor(img, cv2.COLOR_BGRA2RGBA)
arr.append(img.tobytes())
# img = cv2.resize(img, (1920 // 4, 1080 // 4,))
# while True:
# for p in Path('/home/lambda/Videos/pngs/New Folder/').glob('*.png'):
# img = cv2.imread(str(p))
# # print(img.shape)
# input()
# socket.send(img.tobytes())
# while True:
# for b in arr:
# input()
# socket.send(b)
lii = [0, True]
def t(li):
context = zmq.Context()
socket = context.socket(zmq.PUSH)
socket.connect("tcp://localhost:5555")
while True:
for i, b in enumerate(arr):
while True:
socket.send(arr[li[0]])
time.sleep(1 / 120)
if li[1]:
li[0] = i
break
threading.Thread(target=t, args=(lii,)).start()
# while True:
# for i, b in enumerate(arr):
# # input()
# lii[0] = i
# time.sleep(1 / 60)
while True:
input()
lii[1] = not lii[1]

31
test/test_process_cupy.py Normal file
View File

@ -0,0 +1,31 @@
import multiprocessing
from multiprocessing import Process
from threading import Thread
import cupy as cp
class TestProcessCupy:
def __init__(self):
pass
def t1(self):
print(cp.zeros((10, 10, 10)))
def __call__(self, *args, **kwargs):
print(cp.zeros((10, 10, 10)))
def p1():
# import cupy as cp
print(cp.zeros((10, 10, 10)))
# print(cp.asarray(z))
if __name__ == '__main__':
def ff():
print(cp.zeros((10, 10, 10)))
tpc = TestProcessCupy()
p2 = p1
z = cp.zeros((10, 10, 10))
multiprocessing.set_start_method('spawn')
# p = Process(target=p2)
p = Process(target=tpc)
p.start()
p.join()

34
test/zmqlargetest.py Normal file
View File

@ -0,0 +1,34 @@
import multiprocessing
import threading
import time
import zmq
from nodes.Broker import Broker
def thread2():
c = zmq.Context()
pull = c.socket(zmq.PULL)
pull.setsockopt(zmq.CONFLATE, 1)
pull.connect('tcp://127.0.0.1:5555')
while True:
print(pull.recv())
time.sleep(1)
if __name__ == '__main__':
c = zmq.Context()
push = c.socket(zmq.PUSH)
push.bind('tcp://*:5555')
cnt = 0
t = threading.Thread(target=thread2)
t.start()
for i in range(30):
cnt += 1
push.send(str(cnt).encode())
time.sleep(0.4)

137
test/zmqmyclasstest.py Normal file
View File

@ -0,0 +1,137 @@
import logging
import multiprocessing
import time
from pathlib import Path
import cv2
import numpy as np
from tqdm import tqdm
from nodes.Beamformer import Beamformer
from nodes.Broker import Broker
from nodes.ImageCV import ImageCV
from nodes.ImageQt import ImageQt
from nodes.Loader import Loader
from nodes.MainUI import MainUI
from nodes.Node import Node
from BusClient import BusClient
from Msg import Msg1, Msg2, BMMsg, TickMsg, StrMsg, KillMsg
from nodes.WebRTC import WebRTC
class M1(Node):
def loop(self):
cnt = 0
while True:
cnt += 1
self.send(str(cnt).encode())
time.sleep(1)
class M2(Node):
def loop(self):
while True:
print(self.recv())
class M3(Node):
topics = [StrMsg]
def loop(self):
arr = []
for img in tqdm(list(Path('/home/lambda/Videos/pngs/New Folder/').glob('*.png'))):
img = cv2.imread(str(img))
# img = cv2.resize(img, (1920 // 2, 1080 // 2))
img = img.reshape(1080, 1920, 3)
z = np.zeros((1080, 1920, 4), dtype=np.uint8)
z[:, :, :3] = img
img = z
img = cv2.cvtColor(img, cv2.COLOR_BGRA2RGBA)
arr.append(img.tobytes())
# while self.isalive:
# for b in arr:
# self.send(BMMsg(0, b))
# # self.pub_socket.send(b)
# # self.send(b)
# r = self.c.poller.poll(int(1000 / 60))
# # print(r)
# if r and Msg.decode_msg(r[0][0].recv()).name == '':
# self.isalive = False
# break
# # time.sleep(1 / 60)
while self.isalive:
for b in arr:
msg = self.recv()
if isinstance(msg, KillMsg):
if msg.name == '':
self.isalive = False
break
self.send(BMMsg(0, b))
# if r and Msg.decode_msg(r[0][0].recv()).name == '':
class M4(Node):
def loop(self):
while True:
self.send(Msg1())
time.sleep(1)
class MTIME(Node):
def loop(self):
while True:
t = time.time()
self.send(TickMsg(t))
time.sleep(10)
# print(t)
class MLISTEN(Node):
topics = [StrMsg]
def loop(self):
while self.isalive:
r = self.recv()
print(r)
if isinstance(r, KillMsg) and r.name == '':
self.isalive = False
break
self.send(TickMsg(time.time()))
class M6(Node):
topics = [Msg2.eid()]
def loop(self):
while True:
print(self.recv())
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
multiprocessing.set_start_method('spawn')
pps = []
ps = [
Broker(),
WebRTC(),
# M3(),
MainUI(),
ImageCV(),
MLISTEN(),
Beamformer(),
Loader(),
]
for p in ps:
pps.append(multiprocessing.Process(target=p))
for p in pps:
p.start()
c = BusClient(KillMsg)
while True:
x: KillMsg = c.recv()
if x.name == '':
break
for p in pps:
p.kill()

3653
uv.lock Normal file

File diff suppressed because it is too large Load Diff