remove aiortc

This commit is contained in:
remilia 2025-02-26 21:42:27 +08:00
parent f51d244466
commit d92a9778d4
4 changed files with 0 additions and 601 deletions

View File

@ -6,7 +6,6 @@ readme = "README.md"
requires-python = ">=3.12,<3.13"
dependencies = [
"aiohttp-cors>=0.7.0",
"aiortc>=1.9.0",
"cupy-cuda12x>=13.3.0",
"opencv-python>=4.10.0.84",
"pyqt6-fluent-widgets[full]>=1.7.4",
@ -16,17 +15,5 @@ dependencies = [
"tqdm>=4.67.1",
"vtk>=9.4.0",
"jupyter>=1.1.1",
"av",
"python-miio>=0.5.12",
]
[tool.uv]
override-dependencies = [
"av>=14.0.0",
]
[tool.uv.sources]
av = [
{ path = "./av-14.1.0-cp312-cp312-win_amd64.whl", marker = "sys_platform == 'win32'" },
{ path = "./av-14.2.0rc1-cp312-cp312-linux_x86_64.whl", marker = "sys_platform == 'linux'" },
]

View File

@ -1,374 +0,0 @@
import fractions
import logging
from itertools import tee
from struct import pack, unpack_from
from typing import Iterator, List, Optional, Sequence, Tuple, Type, TypeVar
import av
import math
from aiortc.codecs.base import Decoder, Encoder
from aiortc.jitterbuffer import JitterFrame
from aiortc.mediastreams import VIDEO_TIME_BASE, convert_timebase
from av.frame import Frame
from av.packet import Packet
logger = logging.getLogger(__name__)
gpu = '0'
DEFAULT_BITRATE = 1000000 # 1 Mbps
MIN_BITRATE = 500000 # 500 kbps
# MAX_BITRATE = 3000000 # 3 Mbps
MAX_BITRATE = 10000000 # 3 Mbps
MAX_FRAME_RATE = 120
PACKET_MAX = 1300
NAL_TYPE_FU_A = 28
NAL_TYPE_STAP_A = 24
NAL_HEADER_SIZE = 1
FU_A_HEADER_SIZE = 2
LENGTH_FIELD_SIZE = 2
STAP_A_HEADER_SIZE = NAL_HEADER_SIZE + LENGTH_FIELD_SIZE
DESCRIPTOR_T = TypeVar("DESCRIPTOR_T", bound="H264PayloadDescriptor")
T = TypeVar("T")
def pairwise(iterable: Sequence[T]) -> Iterator[Tuple[T, T]]:
a, b = tee(iterable)
next(b, None)
return zip(a, b)
class H264PayloadDescriptor:
def __init__(self, first_fragment):
self.first_fragment = first_fragment
def __repr__(self):
return f"H264PayloadDescriptor(FF={self.first_fragment})"
@classmethod
def parse(cls: Type[DESCRIPTOR_T], data: bytes) -> Tuple[DESCRIPTOR_T, bytes]:
output = bytes()
# NAL unit header
if len(data) < 2:
raise ValueError("NAL unit is too short")
nal_type = data[0] & 0x1F
f_nri = data[0] & (0x80 | 0x60)
pos = NAL_HEADER_SIZE
if nal_type in range(1, 24):
# single NAL unit
output = bytes([0, 0, 0, 1]) + data
obj = cls(first_fragment=True)
elif nal_type == NAL_TYPE_FU_A:
# fragmentation unit
original_nal_type = data[pos] & 0x1F
first_fragment = bool(data[pos] & 0x80)
pos += 1
if first_fragment:
original_nal_header = bytes([f_nri | original_nal_type])
output += bytes([0, 0, 0, 1])
output += original_nal_header
output += data[pos:]
obj = cls(first_fragment=first_fragment)
elif nal_type == NAL_TYPE_STAP_A:
# single time aggregation packet
offsets = []
while pos < len(data):
if len(data) < pos + LENGTH_FIELD_SIZE:
raise ValueError("STAP-A length field is truncated")
nalu_size = unpack_from("!H", data, pos)[0]
pos += LENGTH_FIELD_SIZE
offsets.append(pos)
pos += nalu_size
if len(data) < pos:
raise ValueError("STAP-A data is truncated")
offsets.append(len(data) + LENGTH_FIELD_SIZE)
for start, end in pairwise(offsets):
end -= LENGTH_FIELD_SIZE
output += bytes([0, 0, 0, 1])
output += data[start:end]
obj = cls(first_fragment=True)
else:
raise ValueError(f"NAL unit type {nal_type} is not supported")
return obj, output
class H264Decoder(Decoder):
def __init__(self) -> None:
self.codec = av.CodecContext.create("h264", "r")
def decode(self, encoded_frame: JitterFrame) -> List[Frame]:
try:
packet = av.Packet(encoded_frame.data)
packet.pts = encoded_frame.timestamp
packet.time_base = VIDEO_TIME_BASE
frames = self.codec.decode(packet)
except av.AVError as e:
logger.warning(
"H264Decoder() failed to decode, skipping package: " + str(e)
)
return []
return frames
assert "h264_nvenc" in av.codecs_available
def create_encoder_context(
codec_name: str, width: int, height: int, bitrate: int
) -> Tuple[av.CodecContext, bool]:
# codec = av.CodecContext.create('libx264', "w")
codec = av.CodecContext.create('h264_nvenc', "w")
codec.width = width
codec.height = height
codec.bit_rate = bitrate
# codec.bit_rate = 6000000
codec.pix_fmt = "yuv420p"
codec.framerate = fractions.Fraction(MAX_FRAME_RATE, 1)
codec.time_base = fractions.Fraction(1, MAX_FRAME_RATE)
codec.options = dict(
# delay='0',
# crf='1',
# profile="baseline",
# level="31",
# tune="zerolatency", # does nothing using h264_omx
gpu=gpu,
preset='fast',
)
# codec.options = {
# "profile": "baseline",
# "level": "31",
# "tune": "zerolatency", # does nothing using h264_omx
# }
codec.open()
return codec, True
return codec, codec_name == "h264_omx"
class H264NVENCEncoder(Encoder):
def __init__(self) -> None:
self.buffer_data = b""
self.buffer_pts: Optional[int] = None
self.codec: Optional[av.CodecContext] = None
self.codec_buffering = False
self.__target_bitrate = MAX_BITRATE
@staticmethod
def _packetize_fu_a(data: bytes) -> List[bytes]:
available_size = PACKET_MAX - FU_A_HEADER_SIZE
payload_size = len(data) - NAL_HEADER_SIZE
num_packets = math.ceil(payload_size / available_size)
num_larger_packets = payload_size % num_packets
package_size = payload_size // num_packets
f_nri = data[0] & (0x80 | 0x60) # fni of original header
nal = data[0] & 0x1F
fu_indicator = f_nri | NAL_TYPE_FU_A
fu_header_end = bytes([fu_indicator, nal | 0x40])
fu_header_middle = bytes([fu_indicator, nal])
fu_header_start = bytes([fu_indicator, nal | 0x80])
fu_header = fu_header_start
packages = []
offset = NAL_HEADER_SIZE
while offset < len(data):
if num_larger_packets > 0:
num_larger_packets -= 1
payload = data[offset: offset + package_size + 1]
offset += package_size + 1
else:
payload = data[offset: offset + package_size]
offset += package_size
if offset == len(data):
fu_header = fu_header_end
packages.append(fu_header + payload)
fu_header = fu_header_middle
assert offset == len(data), "incorrect fragment data"
return packages
@staticmethod
def _packetize_stap_a(
data: bytes, packages_iterator: Iterator[bytes]
) -> Tuple[bytes, bytes]:
counter = 0
available_size = PACKET_MAX - STAP_A_HEADER_SIZE
stap_header = NAL_TYPE_STAP_A | (data[0] & 0xE0)
payload = bytes()
try:
nalu = data # with header
while len(nalu) <= available_size and counter < 9:
stap_header |= nalu[0] & 0x80
nri = nalu[0] & 0x60
if stap_header & 0x60 < nri:
stap_header = stap_header & 0x9F | nri
available_size -= LENGTH_FIELD_SIZE + len(nalu)
counter += 1
payload += pack("!H", len(nalu)) + nalu
nalu = next(packages_iterator)
if counter == 0:
nalu = next(packages_iterator)
except StopIteration:
nalu = None
if counter <= 1:
return data, nalu
else:
return bytes([stap_header]) + payload, nalu
@staticmethod
def _split_bitstream(buf: bytes) -> Iterator[bytes]:
# Translated from: https://github.com/aizvorski/h264bitstream/blob/master/h264_nal.c#L134
i = 0
while True:
# Find the start of the NAL unit.
#
# NAL Units start with the 3-byte start code 0x000001 or
# the 4-byte start code 0x00000001.
i = buf.find(b"\x00\x00\x01", i)
if i == -1:
return
# Jump past the start code
i += 3
nal_start = i
# Find the end of the NAL unit (end of buffer OR next start code)
i = buf.find(b"\x00\x00\x01", i)
if i == -1:
yield buf[nal_start: len(buf)]
return
elif buf[i - 1] == 0:
# 4-byte start code case, jump back one byte
yield buf[nal_start: i - 1]
else:
yield buf[nal_start:i]
@classmethod
def _packetize(cls, packages: Iterator[bytes]) -> List[bytes]:
packetized_packages = []
packages_iterator = iter(packages)
package = next(packages_iterator, None)
while package is not None:
if len(package) > PACKET_MAX:
packetized_packages.extend(cls._packetize_fu_a(package))
package = next(packages_iterator, None)
else:
packetized, package = cls._packetize_stap_a(package, packages_iterator)
packetized_packages.append(packetized)
return packetized_packages
def _encode_frame(
self, frame: av.VideoFrame, force_keyframe: bool
) -> Iterator[bytes]:
if self.codec and (
frame.width != self.codec.width
or frame.height != self.codec.height
# we only adjust bitrate if it changes by over 10%
or abs(self.target_bitrate - self.codec.bit_rate) / self.codec.bit_rate
> 0.1
):
self.buffer_data = b""
self.buffer_pts = None
self.codec = None
if force_keyframe:
# force a complete image
frame.pict_type = av.video.frame.PictureType.I
else:
# reset the picture type, otherwise no B-frames are produced
frame.pict_type = av.video.frame.PictureType.NONE
# print(self.codec)
if self.codec is None:
self.codec, self.codec_buffering = create_encoder_context(
"h264_nvenc",
frame.width,
frame.height,
bitrate=self.target_bitrate,
)
# try:
# self.codec, self.codec_buffering = create_encoder_context(
# "h264_omx", frame.width, frame.height, bitrate=self.target_bitrate
# )
# except Exception:
# self.codec, self.codec_buffering = create_encoder_context(
# "h264_nvenc",
# frame.width,
# frame.height,
# bitrate=self.target_bitrate,
# )
data_to_send = b""
for package in self.codec.encode(frame):
package_bytes = bytes(package)
if self.codec_buffering:
# delay sending to ensure we accumulate all packages
# for a given PTS
if package.pts == self.buffer_pts:
self.buffer_data += package_bytes
else:
data_to_send += self.buffer_data
self.buffer_data = package_bytes
self.buffer_pts = package.pts
else:
data_to_send += package_bytes
if data_to_send:
yield from self._split_bitstream(data_to_send)
def encode(
self, frame: Frame, force_keyframe: bool = False
) -> Tuple[List[bytes], int]:
assert isinstance(frame, av.VideoFrame)
packages = self._encode_frame(frame, force_keyframe)
timestamp = convert_timebase(frame.pts, frame.time_base, VIDEO_TIME_BASE)
return self._packetize(packages), timestamp
def pack(self, packet: Packet) -> Tuple[List[bytes], int]:
assert isinstance(packet, av.Packet)
packages = self._split_bitstream(bytes(packet))
timestamp = convert_timebase(packet.pts, packet.time_base, VIDEO_TIME_BASE)
return self._packetize(packages), timestamp
@property
def target_bitrate(self) -> int:
"""
Target bitrate in bits per second.
"""
return self.__target_bitrate
@target_bitrate.setter
def target_bitrate(self, bitrate: int) -> None:
# print(bitrate)
return
bitrate = max(MIN_BITRATE, min(bitrate, MAX_BITRATE))
self.__target_bitrate = bitrate
def h264_depayload(payload: bytes) -> bytes:
descriptor, data = H264PayloadDescriptor.parse(payload)
return data

View File

@ -1,212 +0,0 @@
import asyncio
import json
import logging
import os
import time
from fractions import Fraction
import aiohttp
import aiohttp_cors
import aiortc.rtcrtpsender
import numpy as np
import zmq
import zmq.asyncio
from aiohttp import web, WSMessage
from aiortc import MediaStreamTrack, RTCPeerConnection, RTCSessionDescription, RTCConfiguration, RTCRtpCodecCapability
from av import VideoFrame
import H264NV
from BusClient import BusClient
from config import VIDEO_WIDTH, VIDEO_HEIGHT
from utils.Msg import BMMsg, Msg, KillMsg, MoveAxisMsg, ImageArgMsg, SeqIdMinMax
ROOT = os.path.dirname(__file__)
from nodes.Node import Node
web.WebSocketResponse()
logger = logging.getLogger(__name__)
def generate_placeholder():
z = np.zeros((VIDEO_HEIGHT, VIDEO_WIDTH, 4), dtype=np.uint8)
z[:, :, 3] = 0
z[:, :, :3] = 128
return z.tobytes()
class WebRTC(Node):
# def __init__(self):
# super().__init__(False)
def loop(self):
self1 = self
placeholder = generate_placeholder()
class ZMQStreamTrack(MediaStreamTrack):
def __init__(self):
super().__init__()
self.c = BusClient(BMMsg, pub=False, conflare=True, poller=True)
self.kind = 'video'
self.pts = 0
self.buffer = placeholder
async def recv(self):
events = self.c.poller.poll(int(1000 / 60))
if events:
msg: BMMsg = Msg.decode_msg(events[0][0].recv())
self.buffer = msg.data
frame = VideoFrame.from_bytes(self.buffer, VIDEO_WIDTH, VIDEO_HEIGHT)
frame.time_base = Fraction(1, 60)
frame.pts = self.pts
self.pts += 1
return frame
pcs = set()
async def offer(request):
params = await request.json()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
pc = RTCPeerConnection(RTCConfiguration([]))
pcs.add(pc)
rc = pc.addTransceiver(ZMQStreamTrack(), 'sendonly')
rc.setCodecPreferences([RTCRtpCodecCapability(mimeType='video/H264',
clockRate=90000,
channels=None,
parameters={
'level-asymmetry-allowed': '1',
'packetization-mode': '1',
'profile-level-id': '42e01f'
})])
await pc.setRemoteDescription(offer)
answer = await pc.createAnswer()
await pc.setLocalDescription(answer)
return web.Response(
content_type="application/json",
text=json.dumps(
{"sdp": pc.localDescription.sdp, "type": pc.localDescription.type}
),
)
async def on_shutdown(app):
# close peer connections
coros = [pc.close() for pc in pcs]
await asyncio.gather(*coros)
pcs.clear()
async def ws_send(ws, name):
# s = BusClient.sub(zmq.asyncio.Context(), KillMsg, TickMsg)
c = BusClient(KillMsg, ImageArgMsg, SeqIdMinMax, MoveAxisMsg, ctx=zmq.asyncio.Context(), pub=False)
while True:
msg = await c.recv_async()
if isinstance(msg, KillMsg):
if msg.name == name:
break
if not ws.closed:
if isinstance(msg, ImageArgMsg):
if name != msg.sender:
dc = msg.__dict__.copy()
del dc['sender']
await ws.send_str(json.dumps(dict(
type='Arg', value=dc
)))
if isinstance(msg, SeqIdMinMax):
await ws.send_str(json.dumps(dict(
type='SeqIdMinMax', value=dict(
min=msg.min,
max=msg.max,
)
)))
if isinstance(msg, MoveAxisMsg):
if name != msg.sender:
dc = msg.__dict__.copy()
del dc['sender']
await ws.send_str(json.dumps(dict(
type='MoveAxisMsg', value=dc
)))
async def ws_recv(ws, name):
c = BusClient(ctx=zmq.asyncio.Context(), sub=False)
async for msg in ws:
msg: WSMessage = msg
if msg.type == aiohttp.WSMsgType.TEXT:
ss = msg.data
# await c.send_async(StrMsg(ss))
j = json.loads(ss)
v = j["value"]
logger.debug(f'ws recv {j}')
match j["type"]:
case "MoveAxisMsg":
await c.send_async(MoveAxisMsg(sender=name, axis=v['axis'], value=v['value']))
case "Arg":
await c.send_async(ImageArgMsg(sender=name, **v))
elif msg.type == aiohttp.WSMsgType.ERROR:
print(f'ws connection closed with exception', ws.exception())
await c.send_async(KillMsg(name))
async def handle_post(request):
print(request)
# try:
# # Parse the JSON body from the request
# data = await request.json()
# print("Received data:", data)
#
# # Process the data (this is just a placeholder)
# response_data = {'status': 'success', 'received': data}
#
# # Return a JSON response
# return web.json_response(response_data, status=200)
# except Exception as e:
# # Handle errors and return a bad request response
# return web.json_response({'error': str(e)}, status=400)
return web.Response()
async def websocket_handler(request):
ws = web.WebSocketResponse()
await ws.prepare(request)
name = str(time.time())
task1 = asyncio.create_task(ws_send(ws, name))
task2 = asyncio.create_task(ws_recv(ws, name))
await asyncio.gather(task1, task2)
# await task1
# await task2
# for i in range(10):
# await ws.send_str('asdasd')
# await asyncio.sleep(0.1)
return ws
f0 = aiortc.RTCRtpSender.__init__
def f1(*args, **kwargs):
f0(*args, **kwargs)
if not args[1] == 'audio':
args[0]._RTCRtpSender__encoder = H264NV.H264NVENCEncoder()
aiortc.RTCRtpSender.__init__ = f1
app = web.Application()
app.on_shutdown.append(on_shutdown)
app.router.add_post("/offer", offer)
app.router.add_post('/d', handle_post) # Define the POST route
app.add_routes([web.get('/ws', websocket_handler)])
cors = aiohttp_cors.setup(app, defaults={
"*": aiohttp_cors.ResourceOptions(
allow_credentials=True,
expose_headers="*",
allow_headers="*"
)
})
for route in list(app.router.routes()):
cors.add(route)
web.run_app(
app, access_log=None, host='0.0.0.0', port=8081
)

View File

@ -10,7 +10,6 @@ from nodes.ImageFFMPEG import ImageFFMPEG
from nodes.Loader import Loader
from nodes.Muxer import Muxer
from nodes.Robot import Robot
from nodes.WebRTC import WebRTC
from qtonly import kde_pyqt6_mainui
from utils.Msg import KillMsg
@ -21,7 +20,6 @@ if __name__ == '__main__':
pps = []
ps = [
Broker(),
WebRTC(),
kde_pyqt6_mainui,
Device(level=logging.DEBUG),
ImageFFMPEG(),