Skip to content

pyav_receiver

plantimager.controller.camera.pyav_receiver Link

PyAVReceiver Link

PyAVReceiver()
Source code in plantimager/controller/camera/pyav_receiver.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
def __init__(self, ):
    super().__init__()
    self.uri = None
    self.format = None
    self._opened = False
    self._wait_times = [0]*30
    self._decode_times = [0]*30
    self._base_rate: int = None
    self._t0: float = None
    self._stream: av.VideoStream = None
    self._container: av.container.InputContainer | None = None
    def _finalizer():
        print("Closing stream")
        if self._container: self._container.close()
    finalize(self, _finalizer)

open Link

open(uri, format)

Open a new video stream from uri using the specified format. :param uri: :param format: :return bool:

Source code in plantimager/controller/camera/pyav_receiver.py
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def open(self, uri: str, format: str) -> bool:
    """
    Open a new video stream from uri using the specified format.
    :param uri:
    :param format:
    :return bool:
    """
    self.uri = uri
    self.format = format
    if self._container:
        self._container.close()
        self._opened = False
    try:
        self._container = av.open(
            self.uri, "r", format=self.format, buffer_size=1024, timeout=60
        )
    except UnicodeDecodeError:
        return False
    except av.TimeoutError:
        return False
    except av.ConnectionRefusedError:
        return False
    except av.FFmpegError:
        return False
    self._container.flags = self._container.flags | 0x40 # NO_BUFFER
    self._stream = self._container.streams.video[0]
    self._base_rate = int(self._stream.base_rate)
    self._wait_times = [0] * int(self._base_rate)
    self._decode_times = [0] * int(self._base_rate)
    self._opened = True
    self._t0 = None
    return True