add request

This commit is contained in:
remilia 2025-02-26 21:41:15 +08:00
parent e4d54c6176
commit f51d244466
9 changed files with 199 additions and 143 deletions

View File

@ -1,26 +1,20 @@
import logging
import struct
import time
from pathlib import Path
import cupy as cp
import cv2
import numpy as np
import zmq
from config import PLAYBACK_SOCKET, VIDEO_WIDTH, VIDEO_HEIGHT, LIVE_SOCKET, IMAGING_CONFIG
from config import VIDEO_WIDTH, VIDEO_HEIGHT
from nodes.Node import Node
from utils.Msg import BMMsg, ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, \
RecordFrameMsg, ImagingConfigNameListMsg, Msg, RfFrameMsg
from utils.RfFile import RfFrame, RfSequenceMeta
from utils.Msg import BMMsg, ImageArgMsg, SetSeqMetaMsg, Msg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg
from utils.RfFile import RfSequenceMeta
from utils.RfMat import RfMat
from utils.RfMeta import RfFrameMeta
logger = logging.getLogger(__name__)
class Beamformer(Node):
topics = [RfFrameMsg]
topics = [BeamformerMsg]
def __init__(self, level=logging.INFO):
super(Beamformer, self).__init__(level=level)
@ -40,24 +34,28 @@ class Beamformer(Node):
self.send(BMMsg(0, d2.__bytes__()))
def loop(self):
req_socket = self.context.socket(zmq.REQ)
req_socket.connect('tcp://127.0.0.1:5559')
time.sleep(1)
while True:
req_socket.send(b'')
r = req_socket.recv()
if r.__len__() > 0:
self.send(RequestRfFrameMsg())
r = dict(self.c.poller.poll(1000))
if self.c.sub in r:
msg = self.recv()
if isinstance(msg, BeamformerMsg):
if msg.value == b'init':
time.sleep(1)
continue
if msg.value == b'nop':
continue
r = msg.value
id2 = r.index(Msg.magic(), 1)
id3 = r.index(Msg.magic(), id2 + 1)
print(id2, id3)
seq_msg: SetSeqMetaMsg = Msg.decode_msg(r[:id2])
arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3])
b_msg: RfFrameMsg = Msg.decode_msg(r[id3:])
b_msg: RfFrameWithMetaMsg = Msg.decode_msg(r[id3:])
s = b_msg.data
fb1 = cp.frombuffer(s, dtype=cp.int16)
seq_meta = RfSequenceMeta.from_name(seq_msg.name)
mat = RfMat(fb1.reshape(seq_meta.shape), RfFrameMeta(
encoder=b_msg.encoder,
sequence_id=b_msg.sequence_id,
), seq_meta)
mat = RfMat(fb1.reshape(seq_meta.shape), b_msg.meta, seq_meta)
self.process(mat, arg_msg)
else:
logger.debug('package lose')

View File

@ -1,4 +1,5 @@
import logging
import struct
import subprocess
import time
@ -7,13 +8,15 @@ import zmq
from config import LIVE_REP_SOCKET, CONFIG, DEVICE_CONFIG
from nodes.Node import Node
from utils.Msg import ImageArgMsg, KillMsg, SetDeviceConnectedMsg, SetDeviceEnabledMsg, DeviceEnabledMsg, \
DeviceConnectedMsg, SetDeviceConfigMsg, DeviceOnlineMsg, DeviceConfigListMsg
DeviceConnectedMsg, SetDeviceConfigMsg, DeviceOnlineMsg, DeviceConfigListMsg, RequestRfFrameMsg, RfFrameMsg, \
RfFrameWithMetaMsg
from utils.RfMeta import RfFrameMeta
logger = logging.getLogger(__name__)
class Device(Node):
topics = [SetDeviceConnectedMsg, SetDeviceEnabledMsg, SetDeviceConfigMsg]
topics = [SetDeviceConnectedMsg, SetDeviceEnabledMsg, SetDeviceConfigMsg, RequestRfFrameMsg]
def __init__(self, level=logging.INFO):
super(Device, self).__init__(level=level)
@ -137,7 +140,6 @@ class Device(Node):
logger.debug(f'device start loop')
while True:
msg = self.recv()
logger.debug(f'{msg}')
if isinstance(msg, KillMsg):
if msg.name == '':
return
@ -153,3 +155,12 @@ class Device(Node):
self.disconnect()
elif isinstance(msg, SetDeviceConfigMsg):
self.setfile(msg.value)
elif isinstance(msg, RequestRfFrameMsg):
braw = self.data()
if braw == b'':
continue
_, sequence_id, encoder = struct.unpack_from('=iqi', braw)
buffer = braw[4 + 8 + 4:]
self.send(RfFrameWithMetaMsg(0, RfFrameMeta(
encoder=encoder, sequence_id=sequence_id
), buffer))

View File

@ -27,31 +27,42 @@ class ImageFFMPEG(Node):
'-f', 'rawvideo',
'-pixel_format', 'rgb24',
'-video_size', '1080x1920',
'-framerate', '60',
'-framerate', '24',
'-hwaccel', 'nvdec',
'-i', '-',
'-vcodec', 'h264_nvenc',
'-preset', 'faster',
'-preset', 'fast',
'-gpu', '1',
# '-profile:v', 'high',
'-zerolatency', '1',
# '-tune', 'ull',
# '-level', '42',
'-pix_fmt', 'yuv420p',
# '-vcodec', 'libx264',
'-b:v', '40M',
# '-b:v', '40M',
'-f', 'flv',
'rtmp://localhost/live/livestream'
'rtmp://q1hyb.as/live/bscan'
# '-f', 'mpegts',
# 'srt://localhost:10080?streamid=#!::r=live/livestream,m=publish'
],
stdin=subprocess.PIPE,
)
lasttime = time.time()
while True:
# socks = dict(self.c.poller.poll(1 / 30))
events = self.c.poller.poll(1)
events = self.c.poller.poll(1000 / 24)
if events:
msg: BMMsg = Msg.decode_msg(events[0][0].recv())
b = np.frombuffer(msg.data, dtype=np.uint8)
b = np.reshape(b, (VIDEO_HEIGHT, VIDEO_WIDTH, 4))[:, :, :3]
self.buffer = b.tobytes()
p.stdin.write(self.buffer)
time.sleep(1 / 60)
# time.sleep(1 / 60)
currenttime = time.time()
logger.debug(f'{currenttime - lasttime}')
lasttime = currenttime
if __name__ == '__main__':

View File

@ -8,7 +8,7 @@ import zmq
from config import PLAYBACK_SOCKET_PORT, SOFTWARE_CONFIG
from nodes.Node import Node
from utils.Msg import MoveAxisMsg, KillMsg, SetSeqMetaMsg, SeqIdMinMax, SetBaseMsg, SeqListMsg, SeqIdList, \
SetSidMsg
SetSidMsg, RfFrameWithMetaMsg
from utils.RfFile import RfSequence
logger = logging.getLogger(__name__)
@ -33,10 +33,11 @@ class Loader(Node):
pass
elif isinstance(msg, SetSidMsg):
frame = rff.frames[msg.value]
buffer = io.BytesIO()
buffer.write(struct.pack('=iqi', 114514, frame.meta.sequence_id, frame.meta.encoder))
buffer.write(frame.bytes)
playback_socket.send(buffer.getvalue())
# buffer = io.BytesIO()
# buffer.write(struct.pack('=iqi', 114514, frame.meta.sequence_id, frame.meta.encoder))
# buffer.write(frame.bytes)
# playback_socket.send(buffer.getvalue())
self.send(RfFrameWithMetaMsg(1, frame.meta, frame.bytes))
elif isinstance(msg, SetSeqMetaMsg):
if base is None:
continue

View File

@ -1,26 +1,22 @@
import logging
import struct
import time
from pathlib import Path
import cupy as cp
import cv2
import numpy as np
import zmq
from config import PLAYBACK_SOCKET, VIDEO_WIDTH, VIDEO_HEIGHT, LIVE_SOCKET, IMAGING_CONFIG
from config import IMAGING_CONFIG
from nodes.Node import Node
from utils.Msg import BMMsg, ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, \
RecordFrameMsg, ImagingConfigNameListMsg, RfFrameMsg
from utils.RfFile import RfFrame, RfSequenceMeta
from utils.RfMat import RfMat
from utils.RfMeta import RfFrameMeta
from utils.Msg import ImageArgMsg, KillMsg, SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, \
ImagingConfigNameListMsg, RfFrameWithMetaMsg, BeamformerMsg, RequestRfFrameMsg
from utils.RfFile import RfSequenceMeta
logger = logging.getLogger(__name__)
class Muxer(Node):
topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg]
topics = [SetSeqMetaMsg, SetPlayMode, SetDeviceConfigMsg, SetRecordMsg, RfFrameWithMetaMsg, RequestRfFrameMsg,
ImageArgMsg]
def __init__(self, level=logging.INFO):
super(Muxer, self).__init__(level=level)
@ -33,89 +29,65 @@ class Muxer(Node):
self.play_mode = None
@property
def current_seq_meta(self):
seq_meta = None
match self.play_mode:
case 'live':
seq_meta = self.seq_meta_live
case 'playback':
seq_meta = self.seq_meta_playback
return seq_meta
def loop(self):
device_socket = self.context.socket(zmq.PULL)
device_socket.setsockopt(zmq.CONFLATE, 1)
rep_socket = self.context.socket(zmq.REP)
rep_socket.bind('tcp://*:5559')
self.arg = ImageArgMsg('', t_start=0, t_end=1499)
self.c.poller.register(device_socket, zmq.POLLIN)
self.c.poller.register(rep_socket, zmq.POLLIN)
rfframemsg: RfFrameMsg | None = None
time.sleep(1)
self.send(ImagingConfigNameListMsg([path.stem for path in IMAGING_CONFIG.glob('*.json')]))
while True:
socks = dict(self.c.poller.poll())
if device_socket in socks and socks[device_socket] == zmq.POLLIN:
buffer = device_socket.recv()
device_socket.disconnect(f"tcp://{LIVE_SOCKET}")
device_socket.connect(f"tcp://{LIVE_SOCKET}")
logger.debug(f'device receive {buffer.__len__()}')
_, sequence_id, encoder = struct.unpack_from('=iqi', buffer)
buf = buffer[4 + 8 + 4:]
logger.debug(f'live meta {self.seq_meta_live}, playback meta {self.seq_meta_playback}')
def current_meta(self):
seq_meta: RfSequenceMeta | None = {
"live": self.seq_meta_live,
"playback": self.seq_meta_playback,
}.get(self.play_mode, None)
if seq_meta is not None:
if (buf.__len__() // 2) == np.prod(seq_meta.shape):
self.seq_meta = seq_meta
rfframemsg = RfFrameMsg(sequence_id, encoder, buf)
fm = RfFrameMeta(
encoder=encoder,
sequence_id=sequence_id,
)
if self.record_enable:
(self.record_path / fm.filename).write_bytes(buf)
self.send(RecordFrameMsg(buf.__len__(), sequence_id))
if rep_socket in socks and socks[rep_socket] == zmq.POLLIN:
rep_socket.recv()
if rfframemsg is None or self.seq_meta is None:
rep_socket.send(b'')
else:
rep_socket.send(SetSeqMetaMsg('any', self.seq_meta.name).encode_msg() +
self.arg.encode_msg() +
rfframemsg.encode_msg())
if self.c.sub in socks and socks[self.c.sub] == zmq.POLLIN:
return seq_meta
def loop(self):
device_socket = self.context.socket(zmq.PULL)
self.sf = None
self.arg = ImageArgMsg('', t_start=0, t_end=1499)
self.c.poller.register(device_socket, zmq.POLLIN)
time.sleep(1)
self.send(ImagingConfigNameListMsg([path.stem for path in IMAGING_CONFIG.glob('*.json')]))
while True:
socks = dict(self.c.poller.poll())
for k in socks:
if k == device_socket:
pass
if k == self.c.sub:
msg = self.recv()
if isinstance(msg, KillMsg):
if isinstance(msg, RfFrameWithMetaMsg):
if msg.sender == 0 and self.play_mode == 'live':
self.send(BeamformerMsg(
SetSeqMetaMsg('any', self.seq_meta_live.name).encode_msg() +
self.arg.encode_msg() +
msg.encode_msg()
))
elif msg.sender == 1:
self.sf = msg
elif isinstance(msg, RequestRfFrameMsg):
if self.play_mode is None:
self.send(BeamformerMsg(b'init'))
elif self.play_mode == 'playback':
if self.sf is None:
self.send(BeamformerMsg(b'nop'))
elif (self.sf.data.__len__() // 2) != np.prod(self.seq_meta_playback.shape):
self.send(BeamformerMsg(b'nop'))
else:
self.send(BeamformerMsg(
SetSeqMetaMsg('any', self.seq_meta_playback.name).encode_msg() +
self.arg.encode_msg() +
self.sf.encode_msg()
))
elif isinstance(msg, KillMsg):
if msg.name == '':
return
if isinstance(msg, ImageArgMsg):
elif isinstance(msg, ImageArgMsg):
self.arg = msg
if isinstance(msg, SetSeqMetaMsg):
elif isinstance(msg, SetSeqMetaMsg):
match msg.target:
case 'live':
self.seq_meta_live = RfSequenceMeta.from_name(msg.name)
case 'playback':
self.seq_meta_playback = RfSequenceMeta.from_name(msg.name)
if isinstance(msg, SetPlayMode):
elif isinstance(msg, SetPlayMode):
logger.debug(f'set playmode {msg}')
self.play_mode = msg.value
if msg.value == 'live':
try:
device_socket.disconnect(f"tcp://{PLAYBACK_SOCKET}")
except:
pass
device_socket.connect(f"tcp://{LIVE_SOCKET}")
elif msg.value == 'playback':
try:
device_socket.disconnect(f"tcp://{LIVE_SOCKET}")
except:
pass
device_socket.connect(f"tcp://{PLAYBACK_SOCKET}")
logger.debug('connect to playback')
if isinstance(msg, SetRecordMsg):
elif isinstance(msg, SetRecordMsg):
self.record_enable = msg.enable
if msg.enable:
seq_meta = self.current_seq_meta

View File

@ -4,6 +4,8 @@ import struct
from enum import auto, Enum
from pathlib import Path
from utils.RfMeta import RfFrameMeta
class BG(Enum):
Msg1 = auto()
@ -34,6 +36,10 @@ class BG(Enum):
SetSidMsg = auto()
ImagingConfigNameListMsg = auto()
RfFrameMsg = auto()
RfFrameWithMetaMsg = auto()
BytesMsg = auto()
BeamformerMsg = auto()
RequestRfFrameMsg = auto()
class Msg:
@ -249,19 +255,69 @@ class RfFrameMsg(Msg):
) + self.data
@classmethod
def decode(cls, data: bytes) -> 'Msg':
def decode(cls, data: bytes) -> 'RfFrameMsg':
return cls(
*struct.unpack('II', data[:8]),
data[8:]
)
class RfFrameWithMetaMsg(Msg):
SPLIT = struct.pack('I', 114514)
def __init__(self, sender: int, meta: RfFrameMeta, data: bytes):
self.sender = sender
self.meta = meta
self.data = data
def encode(self) -> bytes:
return struct.pack('I', self.sender) + self.meta.name.encode() + self.SPLIT + self.data
@classmethod
def decode(cls, data: bytes) -> 'RfFrameWithMetaMsg':
id2 = data.index(cls.SPLIT)
return cls(
sender=struct.unpack('I', data[:4])[0],
meta=RfFrameMeta.from_name(data[4:id2].decode()),
data=data[id2 + 4:],
)
@dataclasses.dataclass
class BytesMsg(Msg):
value: bytes
def encode(self) -> bytes:
return self.value
@classmethod
def decode(cls, data: bytes) -> 'Msg':
return cls(data)
def split(self):
pass
# id2 = r.index(Msg.magic(), 1)
# id3 = r.index(Msg.magic(), id2 + 1)
# print(id2, id3)
# seq_msg: SetSeqMetaMsg = Msg.decode_msg(r[0:id2])
# arg_msg: ImageArgMsg = Msg.decode_msg(r[id2:id3])
# b_msg: RfFrameMsg = Msg.decode_msg(r[id3:-1])
class BeamformerMsg(BytesMsg):
pass
@dataclasses.dataclass
class RobotRtsiMsg(Msg):
pos: tuple[float, float, float, float, float, float]
force: tuple[float, float, float, float, float, float]
class RequestRfFrameMsg(Msg):
pass
def test():
values = set(item.name for item in BG)
for k in globals().keys():

View File

@ -6,8 +6,9 @@ from nodes.Beamformer import Beamformer
from nodes.Broker import Broker
from nodes.Device import Device
from nodes.ImageCV import ImageCV
from nodes.ImageFFMPEG import ImageFFMPEG
from nodes.Loader import Loader
from nodes.Muxer import Receiver
from nodes.Muxer import Muxer
from nodes.Robot import Robot
from nodes.WebRTC import WebRTC
from qtonly import kde_pyqt6_mainui
@ -20,13 +21,14 @@ if __name__ == '__main__':
pps = []
ps = [
Broker(),
# WebRTC(),
WebRTC(),
kde_pyqt6_mainui,
Device(level=logging.DEBUG),
ImageFFMPEG(),
ImageCV(level=logging.DEBUG),
Beamformer(),
Beamformer(level=logging.DEBUG),
Loader(),
Receiver(),
Muxer(level=logging.DEBUG),
Robot(),
]
for p in ps:

View File

@ -1,2 +1,4 @@
if __name__ == '__main__':
print(tuple(str((1, 2, 3))))
print([1,2,3][-1:1])
print([1,2,3][1:])

View File

@ -2,9 +2,12 @@ from miio.miioprotocol import MiIOProtocol
from config import SWITCH1_IP, SWITCH1_TOKEN, SWITCH2_IP, SWITCH2_TOKEN
from miio import Device
from utils.mi import c1_disconnect, c1_connect
if __name__ == '__main__':
m = MiIOProtocol(
SWITCH1_IP, SWITCH1_TOKEN,
)
r = m.send('get_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1}])
print(r[0]['value'])
# m = MiIOProtocol(
# SWITCH1_IP, SWITCH1_TOKEN,
# )
# r = m.send('get_properties', [{'did': 'MYDID', 'siid': 2, 'piid': 1}])
# print(r[0]['value'])
c1_connect()