183 lines
5.0 KiB
Python
183 lines
5.0 KiB
Python
import fractions
|
|
import logging
|
|
from typing import cast
|
|
|
|
import av
|
|
import numpy as np
|
|
from av import VideoCodecContext, VideoFrame
|
|
|
|
# for e in av.codecs_available:
|
|
# print(e)
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
logging.getLogger('libav').setLevel(logging.DEBUG)
|
|
MAX_FRAME_RATE = 60
|
|
|
|
|
|
def f0():
|
|
# input_ = av.open('asd','w')
|
|
# in_stream = input_.streams.video[0]
|
|
# libx264
|
|
codec: VideoCodecContext = av.CodecContext.create('libx264', "w")
|
|
|
|
codec.width = 640
|
|
codec.height = 480
|
|
codec.pix_fmt = 'yuv420p' # 常用格式, 大部分解码器都支持
|
|
|
|
# codec.width = 100
|
|
# codec.height = 100
|
|
# codec.bit_rate = 100000
|
|
# codec.pix_fmt = "yuv420p"
|
|
# codec.framerate = fractions.Fraction(MAX_FRAME_RATE, 1)
|
|
# codec.time_base = fractions.Fraction(1, MAX_FRAME_RATE)
|
|
# codec.options = {
|
|
# "profile": "baseline",
|
|
# "level": "31",
|
|
# "tune": "zerolatency",
|
|
# }
|
|
codec.open()
|
|
pts = 0
|
|
while True:
|
|
f = VideoFrame.from_ndarray(np.zeros((640, 480, 3), dtype=np.uint8))
|
|
pts += 1
|
|
f.pts = pts
|
|
r = codec.encode(f)
|
|
if r.__len__() > 0:
|
|
print(r[0].__buffer__(0).__len__())
|
|
|
|
|
|
def f1():
|
|
import av
|
|
|
|
# 创建一个输出容器
|
|
output = av.open('output.mp4', 'w')
|
|
|
|
# 添加一个视频流
|
|
stream = output.add_stream('libx264', rate=24)
|
|
stream.width = 640
|
|
stream.height = 480
|
|
stream.pix_fmt = 'yuv420p'
|
|
|
|
# 编码循环
|
|
for i in range(100):
|
|
frame = av.VideoFrame(width=640, height=480, format='rgb24')
|
|
# ... 对帧进行处理 ...
|
|
for packet in stream.encode(frame):
|
|
output.mux(packet)
|
|
|
|
# 完成编码
|
|
for packet in stream.encode():
|
|
output.mux(packet)
|
|
|
|
# 关闭输出容器
|
|
output.close()
|
|
|
|
|
|
def f2():
|
|
import av
|
|
from av import VideoFrame
|
|
from fractions import Fraction
|
|
|
|
# 1. 创建输出容器
|
|
output_path = 'output.mp4'
|
|
output = av.open(output_path, 'w')
|
|
|
|
# 2. 创建视频流(使用 libx264 编码器)
|
|
stream = output.add_stream('libx264', rate=24) # 帧率
|
|
|
|
# 3. 设置编解码器上下文参数
|
|
stream.width = 640
|
|
stream.height = 480
|
|
stream.pix_fmt = 'yuv420p' # 常用格式, 大部分解码器都支持
|
|
stream.codec_context.time_base = Fraction(1, 24)
|
|
|
|
# 4. 手动打开编解码器上下文 (可选, add_stream 已经完成了这一步)
|
|
stream.codec_context.open()
|
|
|
|
# 5. 编码循环
|
|
for i in range(100): # 生成 100 帧
|
|
frame = av.VideoFrame(width=640, height=480, format='rgb24')
|
|
|
|
# 生成测试数据
|
|
# 在实际应用中,您需要从图像源获取数据
|
|
import numpy as np
|
|
import colorsys
|
|
cx = int(640 / 100 * i)
|
|
cy = int(480 / 100 * i)
|
|
rgb = (np.array(colorsys.hsv_to_rgb(i / 100.0, 1.0, 1.0)) * 255).astype(np.uint8)
|
|
|
|
data = np.zeros((frame.height, frame.width, 3), dtype=np.uint8)
|
|
data[cy - 10:cy + 10, cx - 10:cx + 10, :] = rgb
|
|
frame.planes[0].update(data)
|
|
|
|
for packet in stream.encode(frame):
|
|
output.mux(packet)
|
|
|
|
# 6. 刷新编码器
|
|
for packet in stream.encode():
|
|
output.mux(packet)
|
|
|
|
# 7. 关闭容器
|
|
output.close()
|
|
|
|
|
|
def f3():
|
|
import av
|
|
from av import VideoFrame
|
|
from fractions import Fraction
|
|
|
|
# 1. 创建输出容器
|
|
output_path = 'output.mp4'
|
|
output = av.open(output_path, 'w')
|
|
|
|
# 2. 获取编码器codec
|
|
codec = av.Codec('libx264', "w")
|
|
|
|
# 3. 手动创建并配置编解码器上下文
|
|
ctx = av.CodecContext.create(codec, mode="w")
|
|
ctx.width = 640
|
|
ctx.height = 480
|
|
ctx.pix_fmt = 'yuv420p' # 常用格式, 大部分解码器都支持
|
|
ctx.time_base = Fraction(1, 24)
|
|
|
|
# 4. 添加视频流 (使用创建好的编解码器上下文).
|
|
stream = output.add_stream('libx264', rate=24) # 使用编码器名称
|
|
stream.codec_context = ctx # 将之前创建好的 ctx 赋值给 stream
|
|
stream.width = ctx.width # 宽度
|
|
stream.height = ctx.height
|
|
stream.pix_fmt = ctx.pix_fmt
|
|
|
|
# 5. 手动打开编解码器上下文 (在添加到 stream 后会自动打开)
|
|
ctx.open()
|
|
|
|
# 6. 编码循环
|
|
for i in range(100): # 生成 100 帧
|
|
frame = av.VideoFrame(width=640, height=480, format='rgb24')
|
|
|
|
# 生成测试数据
|
|
# 在实际应用中,您需要从图像源获取数据
|
|
import numpy as np
|
|
import colorsys
|
|
cx = int(640 / 100 * i)
|
|
cy = int(480 / 100 * i)
|
|
rgb = (np.array(colorsys.hsv_to_rgb(i / 100.0, 1.0, 1.0)) * 255).astype(np.uint8)
|
|
|
|
data = np.zeros((frame.height, frame.width, 3), dtype=np.uint8)
|
|
data[cy - 10:cy + 10, cx - 10:cx + 10, :] = rgb
|
|
frame.planes[0].update(data)
|
|
|
|
for packet in stream.encode(frame):
|
|
output.mux(packet)
|
|
|
|
# 7. 刷新编码器
|
|
for packet in stream.encode(None):
|
|
output.mux(packet)
|
|
|
|
# 8. 关闭容器
|
|
output.close()
|
|
|
|
print(f"视频已保存到 {output_path}")
|
|
|
|
|
|
if __name__ == '__main__':
|
|
f0()
|