Multimedia processing with FFmpeg and Python

MichaƂ Rokita, EuroPython 2024

About me

Trimming a video file


                    ffmpeg -hide_banner -i video.mp4 \
                        -ss 00:00:30 -t 10 \
                        -y video_trim.mp4
                

Edge detection

Original video

Edge detection using ffmpeg

ffprobe


                    ffprobe -hide_banner video2.mp4
                    

                        Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'video2.mp4':
                          Metadata:
                            major_brand     : isom
                            minor_version   : 512
                            compatible_brands: isomiso2avc1mp41
                            encoder         : Lavf58.76.100
                          Duration: 00:00:07.92, start: 0.000000, bitrate: 2839 kb/s
                          Stream #0:0(jpn):
                                Video: h264 (High 10) (avc1 / 0x31637661),
                                       yuv420p10le(tv, bt709),
                                       1920x1080 (SAR 1:1 DAR 16:9),
                                       2513 kb/s, 23.98 fps,
                                       23.98 tbr, 24k tbn, 47.95 tbc (default)
                                Metadata:
                                  handler_name    : Video
                                  vendor_id       : [0][0][0][0]
                          Stream #0:1(jpn):
                                Audio: aac (LC) (mp4a / 0x6134706D),
                                       48000 Hz, 5.1, fltp, 342 kb/s (default)
                                Metadata:
                                  handler_name    : Audio
                                  vendor_id       : [0][0][0][0]
                      

Complex filtering

  • Generate an RGB color histogram from the source video
  • Run edge detection on the source video
  • Place the results of the above on top of the source video as an overlay

Complex filtering


                    ffmpeg -hide_banner -i viofo_ph.mp4 \
                      -filter_complex \
                      "[0]format=gbrp,histogram=display_mode=stack[hist];\
                      [hist]scale=iw*2:ih[hist_scaled];\
                      [0]edgedetect,scale=iw/4:ih/4[edges];\
                      [0][hist_scaled]overlay[hist_over];\
                      [hist_over][edges]overlay=x=800:y=0" \
                      -y outputs/histogram.mp4
                
graph TD video.mp4 --> 0 0 --> |format=gbrp,
histogram=display_mode=stack| hist hist -->|scale=iw*2:ih| hist_scaled 0 -->|edgedetect,scale=iw/4:ih/4| edges 0 -->|overlay - arg1| hist_over hist_scaled -->|overlay - arg2| hist_over hist_over --> out[outputs/histogram.mp4] edges -->|overlay=x=800:y=0| out[outputs/histogram.mp4]

ffmpeg-python

ffmpeg-python

  • https://github.com/kkroening/ffmpeg-python
  • A convenient wrapper for the ffmpeg CLI.
  • Generates ffmpeg arguments from your Python code.
  • Provides helper functions for running ffmpeg in a subprocess.
  • Includes a
    ffmpeg.probe
    function which returns
    ffprobe
    results as a Python
    dict

Complex filtering in Python


                    import ffmpeg

                    input_video = ffmpeg.input("viofo_ph.mp4")

                    hist = (
                        input_video
                        .filter("format", "gbrp")
                        .filter("histogram", display_mode="stack")
                        .filter("scale", "iw*2", "ih")
                    )

                    edges = (
                        input_video
                        .filter("edgedetect")
                        .filter("scale", "iw/4", "ih/4")
                    )

                    hist_overlay = input_video.overlay(hist)

                    out_stream = hist_overlay.overlay(edges, x=800, y=0)

                    out_stream.output(
                        f"outputs/histogram_python.mp4"
                    ).overwrite_output().run()
                  

Generated ffmpeg call


                      ffmpeg -i viofo_ph.mp4 -filter_complex \
                                "[0]format=gbrp[s0];\
                                [s0]histogram=display_mode=stack[s1];\
                                [s1]scale=iw*2:ih[s2];\
                                [0][s2]overlay=eof_action=repeat[s3];\
                                [0]edgedetect[s4];[s4]scale=iw/4:ih/4[s5];\
                                [s3][s5]overlay=eof_action=repeat:x=800:y=0[s6]" \
                             -map [s6] outputs/histogram_python.mp4
                  

FFmpeg + Python + OpenCV

Original video

FFmpeg + Python + OpenCV

graph TD subgraph ffmpeg process #1 a[outputs/registration_plates.mp4] --> b[decompressed frames] end subgraph Python b -->|stdout, deserialisation| c[numpy array] c -->|cv2 + cv2.CascadeClassifier| d[find registration plates, mark them] end subgraph ffmpeg process #2 d -->|stdin, serialisation| e[raw frames] e --> f[outputs/registration_plates_marked.mp4] end

FFmpeg + Python + OpenCV


                        from typing import Sequence

                        import ffmpeg
                        import numpy as np
                        import cv2

                        INPUT_FILE = "registration_plates.mp4"
                        OUTPUT_FILE = "outputs/viofo_marked.mp4"

                        def mark_plates(frame: cv2.typing.MatLike,
                                        found_plates: Sequence[cv2.typing.Rect]):
                            y_offset = video_height // 20
                            x_offset = video_width // 20
                            for plate in found_plates:
                                x, y, width, height = plate
                                registration_plate = frame[
                                  y : y + height, x : x + width
                                ]
                                new_height = height * 20
                                new_width = width * 20
                                big_plate = cv2.resize(registration_plate,
                                                      (new_width, new_height))
                                frame[
                                    y_offset : y_offset + new_height,
                                    x_offset : x_offset + new_width
                                ] = big_plate
                                cv2.rectangle(
                                  frame, (x, y),
                                  (x + width, y + height), (0, 255, 0), 5
                                )
                                y_offset += new_height + 10


                        video_width, video_height, framerate = next(
                            (
                                 (
                                     s["width"],
                                     s["height"],
                                     int(s["r_frame_rate"].split("/")[0])
                                 ) for s in ffmpeg.probe(INPUT_FILE)["streams"]
                                 if s["codec_type"] == "video"
                            )
                        )

                        process1 = (
                            ffmpeg.input(INPUT_FILE)
                            .output("pipe:", format="rawvideo", pix_fmt="bgr24")
                            .run_async(pipe_stdout=True)
                        )

                        process2 = (
                            ffmpeg.input(
                                "pipe:",
                                format="rawvideo",
                                pix_fmt="bgr24",
                                s=f"{video_width}x{video_height}",
                                framerate=framerate // 4,
                            )
                            .output(OUTPUT_FILE, pix_fmt="yuv420p")
                            .overwrite_output()
                            .run_async(pipe_stdin=True)
                        )

                        classifier = cv2.CascadeClassifier(
                          "haarcascade_plate_number.xml"
                        )

                        while in_bytes := process1.stdout.read(
                          video_width * video_height * 3):
                            in_frame = (
                                np.frombuffer(in_bytes, np.uint8)
                                .reshape([video_height, video_width, 3])
                                .astype(np.uint8)
                            )
                            frame = in_frame.copy()
                            found_plates = classifier.detectMultiScale(
                                frame, minNeighbors=4, scaleFactor=1.05,
                                minSize=(50, 20), maxSize=(100, 40)
                            )
                            mark_plates(frame, found_plates)
                            process2.stdin.write(frame.astype(np.uint8).tobytes())

                        process2.stdin.close()
                        process1.wait()
                        process2.wait()

                  

Testing

  • For simple processing, just use
    ffmpeg.probe()
  • Use
    ffmpeg.compile()
    to retrieve the generated command.
  • The FFmpeg maintainers provide a diverse library of multimedia samples at fate-suite.ffmpeg.org

Conclusions

  • FFmpeg is very powerful and complex -
    ffmpeg -h full
  • When using it for complex multimedia processing it's better to use a good wrapper, like
    ffmpeg-python
    .
  • The CLI is great for basic operations, like trimming streams, transcoding subtitles, compressing videos ...
  • `ffmpeg-python` seems to be abandoned - last commit was two years ago, no response from the maintainer

Thank you!

qr code
https://mrokita.github.io/europython-ffmpeg

Bonus - tracking progress


                    import socket
                    from threading import Thread
                    from contextlib import contextmanager
                    import tempfile
                    import ffmpeg
                    from pathlib import Path
                    from tqdm import tqdm


                    @contextmanager
                    def open_progress_socket():
                        with tempfile.TemporaryDirectory() as tempdir:
                            sock = socket.socket(
                                socket.AF_UNIX, socket.SOCK_STREAM)
                            socket_filename = Path(tempdir) / "progress.sock"
                            sock.bind(str(socket_filename))
                            sock.settimeout(15)
                            try:
                                sock.listen(1)
                                yield sock, socket_filename
                            finally:
                                sock.close()


                    def watch_progress(sock: socket.socket, total_duration: int):
                        conn, _ = sock.accept()
                        abs_progress = 0
                        with tqdm(total=total_duration) as pbar:
                            while rec := conn.recv(4096):
                                out_time_us = dict(
                                    map(
                                        lambda x: x.split("="),
                                        rec.decode("utf-8").split("\n")[:-1],
                                    )
                                )["out_time_us"]
                                processed_seconds = round(
                                    int(out_time_us) / 1000000
                                )
                                delta = processed_seconds - abs_progress
                                pbar.update(delta)
                                abs_progress = processed_seconds


                    with open_progress_socket() as (sock, socket_filename):
                        total_duration = float(
                            ffmpeg.probe(
                                "outputs/viofo_marked.mp4"
                            )["streams"][0]["duration"]
                        )
                        print(f"Total duration: {total_duration}")
                        t = Thread(target=watch_progress, args=(sock, total_duration))
                        t.start()
                        (
                            ffmpeg.input("outputs/viofo_marked.mp4")
                            .output(
                                "outputs/viofo_marked_h265.mp4",
                                vcodec="hevc"
                            )
                            .overwrite_output()
                            .global_args(
                                "-progress",
                                f"unix://{socket_filename}"
                            )
                        ).run(quiet=True)
                        t.join()

              

Tracking progress