Files
rk_pve/mp4/decoder.py
2025-10-03 18:08:00 +02:00

64 lines
2.4 KiB
Python

# People's Video Editor: high quality, GPU accelerated mp4 editor
# Copyright (C) 2025 Roz K <roz@rozk.net>
#
# This file is part of People's Video Editor.
#
# People's Video Editor is free software: you can redistribute it and/or modify it under the terms of the
# GNU General Public License as published by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# Foobar is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty
# of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License along with Foobar.
# If not, see <https://www.gnu.org/licenses/>.
from . import libav
from .packet import Packet
from .frame import Frame
class Decoder:
__slots__ = '_context'
def __init__(self, stream):
self._context = libav.codec_alloc_context(stream.codec)
if not self._context:
raise MemoryError
errcode = libav.codec_parameters_to_context(self._context, stream.parameters)
if errcode < 0:
libav.codec_free_context(self._context)
raise Exception("Failed to set context parameters")
errcode = libav.codec_open(self._context, stream.codec)
if errcode < 0:
libav.codec_free_context(self._context)
raise Exception("Failed to open codec context")
def __del__(self):
if self._context:
libav.codec_free_context(self._context)
def _receive(self):
frames = []
while True:
frame = Frame()
errcode = libav.codec_receive_frame(self._context, frame)
if errcode in (libav.AVERROR_EOF, libav.AVERROR_EAGAIN):
break
elif errcode < 0:
errstring = libav.strerror(errcode)
raise Exception(f"Failed to receive frame: {errstring}")
frames.append(frame)
return frames
def decode(self, packet):
if not self._context:
return None
errcode = libav.codec_send_packet(self._context, packet)
if errcode < 0:
errstring = libav.strerror(errcode)
raise Exception(f"Failed to send packet: {errstring}")
return self._receive()
def flush(self):
return self.decode(None)