ffmpeg -hide_banner -i video.mp4 \
-ss 00:00:30 -t 10 \
-y video_trim.mp4
ffprobe -hide_banner video2.mp4
Input #0, mov,mp4,m4a,3gp,3g2,mj2, from 'video2.mp4':
Metadata:
major_brand : isom
minor_version : 512
compatible_brands: isomiso2avc1mp41
encoder : Lavf58.76.100
Duration: 00:00:07.92, start: 0.000000, bitrate: 2839 kb/s
Stream #0:0(jpn):
Video: h264 (High 10) (avc1 / 0x31637661),
yuv420p10le(tv, bt709),
1920x1080 (SAR 1:1 DAR 16:9),
2513 kb/s, 23.98 fps,
23.98 tbr, 24k tbn, 47.95 tbc (default)
Metadata:
handler_name : Video
vendor_id : [0][0][0][0]
Stream #0:1(jpn):
Audio: aac (LC) (mp4a / 0x6134706D),
48000 Hz, 5.1, fltp, 342 kb/s (default)
Metadata:
handler_name : Audio
vendor_id : [0][0][0][0]
ffmpeg -hide_banner -i viofo_ph.mp4 \
-filter_complex \
"[0]format=gbrp,histogram=display_mode=stack[hist];\
[hist]scale=iw*2:ih[hist_scaled];\
[0]edgedetect,scale=iw/4:ih/4[edges];\
[0][hist_scaled]overlay[hist_over];\
[hist_over][edges]overlay=x=800:y=0" \
-y outputs/histogram.mp4
ffmpeg.probefunction which returns
ffproberesults as a Python
dict
import ffmpeg
input_video = ffmpeg.input("viofo_ph.mp4")
hist = (
input_video
.filter("format", "gbrp")
.filter("histogram", display_mode="stack")
.filter("scale", "iw*2", "ih")
)
edges = (
input_video
.filter("edgedetect")
.filter("scale", "iw/4", "ih/4")
)
hist_overlay = input_video.overlay(hist)
out_stream = hist_overlay.overlay(edges, x=800, y=0)
out_stream.output(
f"outputs/histogram_python.mp4"
).overwrite_output().run()
ffmpeg -i viofo_ph.mp4 -filter_complex \
"[0]format=gbrp[s0];\
[s0]histogram=display_mode=stack[s1];\
[s1]scale=iw*2:ih[s2];\
[0][s2]overlay=eof_action=repeat[s3];\
[0]edgedetect[s4];[s4]scale=iw/4:ih/4[s5];\
[s3][s5]overlay=eof_action=repeat:x=800:y=0[s6]" \
-map [s6] outputs/histogram_python.mp4
from typing import Sequence
import ffmpeg
import numpy as np
import cv2
INPUT_FILE = "registration_plates.mp4"
OUTPUT_FILE = "outputs/viofo_marked.mp4"
def mark_plates(frame: cv2.typing.MatLike,
found_plates: Sequence[cv2.typing.Rect]):
y_offset = video_height // 20
x_offset = video_width // 20
for plate in found_plates:
x, y, width, height = plate
registration_plate = frame[
y : y + height, x : x + width
]
new_height = height * 20
new_width = width * 20
big_plate = cv2.resize(registration_plate,
(new_width, new_height))
frame[
y_offset : y_offset + new_height,
x_offset : x_offset + new_width
] = big_plate
cv2.rectangle(
frame, (x, y),
(x + width, y + height), (0, 255, 0), 5
)
y_offset += new_height + 10
video_width, video_height, framerate = next(
(
(
s["width"],
s["height"],
int(s["r_frame_rate"].split("/")[0])
) for s in ffmpeg.probe(INPUT_FILE)["streams"]
if s["codec_type"] == "video"
)
)
process1 = (
ffmpeg.input(INPUT_FILE)
.output("pipe:", format="rawvideo", pix_fmt="bgr24")
.run_async(pipe_stdout=True)
)
process2 = (
ffmpeg.input(
"pipe:",
format="rawvideo",
pix_fmt="bgr24",
s=f"{video_width}x{video_height}",
framerate=framerate // 4,
)
.output(OUTPUT_FILE, pix_fmt="yuv420p")
.overwrite_output()
.run_async(pipe_stdin=True)
)
classifier = cv2.CascadeClassifier(
"haarcascade_plate_number.xml"
)
while in_bytes := process1.stdout.read(
video_width * video_height * 3):
in_frame = (
np.frombuffer(in_bytes, np.uint8)
.reshape([video_height, video_width, 3])
.astype(np.uint8)
)
frame = in_frame.copy()
found_plates = classifier.detectMultiScale(
frame, minNeighbors=4, scaleFactor=1.05,
minSize=(50, 20), maxSize=(100, 40)
)
mark_plates(frame, found_plates)
process2.stdin.write(frame.astype(np.uint8).tobytes())
process2.stdin.close()
process1.wait()
process2.wait()
ffmpeg.probe()
ffmpeg.compile()to retrieve the generated command.
ffmpeg -h full
ffmpeg-python.
import socket
from threading import Thread
from contextlib import contextmanager
import tempfile
import ffmpeg
from pathlib import Path
from tqdm import tqdm
@contextmanager
def open_progress_socket():
with tempfile.TemporaryDirectory() as tempdir:
sock = socket.socket(
socket.AF_UNIX, socket.SOCK_STREAM)
socket_filename = Path(tempdir) / "progress.sock"
sock.bind(str(socket_filename))
sock.settimeout(15)
try:
sock.listen(1)
yield sock, socket_filename
finally:
sock.close()
def watch_progress(sock: socket.socket, total_duration: int):
conn, _ = sock.accept()
abs_progress = 0
with tqdm(total=total_duration) as pbar:
while rec := conn.recv(4096):
out_time_us = dict(
map(
lambda x: x.split("="),
rec.decode("utf-8").split("\n")[:-1],
)
)["out_time_us"]
processed_seconds = round(
int(out_time_us) / 1000000
)
delta = processed_seconds - abs_progress
pbar.update(delta)
abs_progress = processed_seconds
with open_progress_socket() as (sock, socket_filename):
total_duration = float(
ffmpeg.probe(
"outputs/viofo_marked.mp4"
)["streams"][0]["duration"]
)
print(f"Total duration: {total_duration}")
t = Thread(target=watch_progress, args=(sock, total_duration))
t.start()
(
ffmpeg.input("outputs/viofo_marked.mp4")
.output(
"outputs/viofo_marked_h265.mp4",
vcodec="hevc"
)
.overwrite_output()
.global_args(
"-progress",
f"unix://{socket_filename}"
)
).run(quiet=True)
t.join()