"""The VidCap class."""
import inspect
from io import BytesIO
from os import environ
from sys import exit
from traceback import format_exc
from typing import Dict, Generator, List, Tuple
import cv2
import numpy as np
import numpy.typing as npt
import requests
from PIL import Image, ImageFile
from requests.auth import HTTPDigestAuth
from face01lib.Calc import Cal
from face01lib.logger import Logger
ImageFile.LOAD_TRUNCATED_IMAGES = True
"""TODO #18 opencvの環境変数変更 要調査"""
# environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;udp"
environ["OPENCV_FFMPEG_CAPTURE_OPTIONS"] = "rtsp_transport;tcp"
[ドキュメント]
class VidCap:
"""VidCap class.
contains methods that initially process the input video data
"""
def __init__(self, log_level: str = 'info') -> None:
"""init.
Args:
log_level (str, optional): Receive log level value. Defaults to 'info'.
"""
# Setup logger: common way
self.log_level: str = log_level
import os.path
name: str = __name__
dir: str = os.path.dirname(__file__)
parent_dir, _ = os.path.split(dir)
self.logger = Logger(self.log_level).logger(name, parent_dir)
# デバッグ用imshow()
[ドキュメント]
def frame_imshow_for_debug(
self,
frame: npt.NDArray[np.uint8]
) -> None:
"""Used for debugging.
Display the given frame data in a GUI window for 3 seconds.
Args:
frame (npt.NDArray[np.uint8]): Image data called 'frame'
Return:
None
"""
self.frame_maybe: npt.NDArray[np.uint8] = frame
if isinstance(self.frame_maybe, np.ndarray):
cv2.imshow("DEBUG", self.frame_maybe)
self.logger.debug(inspect.currentframe().f_back.f_code.co_filename)
self.logger.debug(inspect.currentframe().f_back.f_lineno)
cv2.moveWindow('window DEBUG', 0, 0)
cv2.waitKey(3000)
cv2.destroyAllWindows()
else:
for fra in self.frame_maybe:
fr: npt.NDArray[np.uint8] = fra["img"]
cv2.imshow("DEBUG", fr)
cv2.moveWindow('window DEBUG', 0, 0)
self.logger.debug(inspect.currentframe().f_back.f_code.co_filename)
self.logger.debug(inspect.currentframe().f_back.f_lineno)
cv2.waitKey(3000)
cv2.destroyAllWindows()
[ドキュメント]
def resize_frame(
self,
set_width: int,
set_height: int,
frame: npt.NDArray[np.uint8]
) -> npt.NDArray[np.uint8]:
"""Return resized frame data.
Args:
set_width (int): Width described in config.ini
set_height (int): Height described in config.ini
frame (npt.NDArray[np.uint8]): Image data
Returns:
npt.NDArray[np.uint8]: small_frame
"""
self.set_width: int = set_width
self.set_height: int = set_height
self.frame: npt.NDArray[np.uint8] = frame
small_frame: npt.NDArray[np.uint8] = \
cv2.resize(self.frame, (self.set_width, self.set_height)) # cv2.Mat
return small_frame
[ドキュメント]
def return_movie_property(
self,
set_width: int,
vcap
) -> Tuple[int, ...]:
"""Return input movie file's property.
Args:
set_width (int): Width which set in config.ini
vcap (cv2.VideoCapture): Handle of input movie processing
Returns:
Tuple[int,...]: self.set_width, fps, height, width, set_height
"""
self.set_width: int = set_width
self.vcap = vcap
# # debug
# if vcap.isOpened():
# print("VideoCaptureオブジェクトが正しく開かれています。")
# else:
# print("VideoCaptureオブジェクトが開かれていません。")
fps: int = self.vcap.get(cv2.CAP_PROP_FPS)
height: int = self.vcap.get(cv2.CAP_PROP_FRAME_HEIGHT)
width: int = self.vcap.get(cv2.CAP_PROP_FRAME_WIDTH)
fps: int = int(fps)
height: int = int(height)
width: int = int(width)
if width <= 0:
self.logger.warning("Can't receive input data")
self.logger.warning("-" * 20)
self.logger.warning(format_exc(limit=None, chain=True))
self.logger.warning("-" * 20)
self.logger.warning("exit")
exit(0)
# BUGFIX: vcapを閉じておく
self.finalize(self.vcap)
# # debug
# if vcap.isOpened():
# print("VideoCaptureオブジェクトが正しく開かれています。")
# else:
# print("VideoCaptureオブジェクトが開かれていません。")
set_height: int = int((self.set_width * height) / width)
return self.set_width,fps,height,width,set_height
# cdef, python版
def _cal_angle_coordinate(self, height: int, width: int) -> Tuple[Tuple[int,int,int,int], ...]:
self.height: int = height
self.width: int = width
"""Pre-calculate the angle of view (TOP_LEFT, TOP_RIGHT).
Args:
height (int)
width (int)
Returns:
Tuple[int,int,int,int]: TOP_LEFT,TOP_RIGHT,BOTTOM_LEFT,BOTTOM_RIGHT,CENTER
"""
# order: (top, bottom, left, right)
TOP_LEFT: Tuple[int,int,int,int] = (0,int(self.height/2), 0, int(self.width/2))
TOP_RIGHT: Tuple[int,int,int,int] = (0,int( self.height/2),int(self.width/2),self.width)
BOTTOM_LEFT: Tuple[int,int,int,int] = (int(self.height/2),self.height,0,int(self.width/2))
BOTTOM_RIGHT: Tuple[int,int,int,int] = (int(self.height/2),self.height,int(self.width/2),self.width)
CENTER: Tuple[int,int,int,int] = (int(self.height/4),int(self.height/4)*3,int(self.width/4),int(self.width/4)*3)
return TOP_LEFT,TOP_RIGHT,BOTTOM_LEFT,BOTTOM_RIGHT,CENTER
def _angle_of_view_specification(
self,
set_area: str,
frame: npt.NDArray[np.uint8],
TOP_LEFT: Tuple[int, int, int, int],
TOP_RIGHT: Tuple[int, int, int, int],
BOTTOM_LEFT: Tuple[int, int, int, int],
BOTTOM_RIGHT: Tuple[int, int, int, int],
CENTER: Tuple[int, int, int, int]
) -> npt.NDArray[np.uint8]:
"""Return ndarray data which area specification coordinates for frame.
Args:
set_area (str): Described in config.ini
frame (npt.NDArray[np.uint8]): Image data which described ndarray
TOP_LEFT (Tuple[int,int,int,int]): Top-left coordinate
TOP_RIGHT (Tuple[int,int,int,int]): Top-right coordinate
BOTTOM_LEFT (Tuple[int,int,int,int]): Bottom-left coordinate
BOTTOM_RIGHT (Tuple[int,int,int,int]): Bottom-right coordinate
CENTER (Tuple[int,int,int,int]): Coordinates to keep the angle of view in the center of the screen
Returns:
npt.NDArray[np.uint8]: self.frame
Note:
Face_location order: top, right, bottom, left
TOP_LEFT order: (top, bottom, left, right)
How to slice: img[top: bottom, left: right]
"""
self.set_area: str = set_area
self.frame: npt.NDArray[np.uint8] = frame
self.TOP_LEFT: Tuple[int, int, int, int] = TOP_LEFT
self.TOP_RIGHT: Tuple[int, int, int, int] = TOP_RIGHT
self.BOTTOM_LEFT: Tuple[int, int, int, int] = BOTTOM_LEFT
self.BOTTOM_RIGHT: Tuple[int, int, int, int] = BOTTOM_RIGHT
self.CENTER: Tuple[int, int, int, int] = CENTER
# VidCap().frame_imshow_for_debug(self.frame)
if self.set_area=='NONE':
return self.frame
elif self.set_area=='TOP_LEFT':
self.frame = self.frame[self.TOP_LEFT[0]:self.TOP_LEFT[1],self.TOP_LEFT[2]:self.TOP_LEFT[3]]
elif self.set_area=='TOP_RIGHT':
self.frame = self.frame[self.TOP_RIGHT[0]:self.TOP_RIGHT[1],self.TOP_RIGHT[2]:self.TOP_RIGHT[3]]
elif self.set_area=='BOTTOM_LEFT':
self.frame = self.frame[self.BOTTOM_LEFT[0]:self.BOTTOM_LEFT[1],self.BOTTOM_LEFT[2]:self.BOTTOM_LEFT[3]]
elif self.set_area=='BOTTOM_RIGHT':
self.frame = self.frame[self.BOTTOM_RIGHT[0]:self.BOTTOM_RIGHT[1],self.BOTTOM_RIGHT[2]:self.BOTTOM_RIGHT[3]]
elif self.set_area=='CENTER':
self.frame = self.frame[self.CENTER[0]:self.CENTER[1],self.CENTER[2]:self.CENTER[3]]
# VidCap().frame_imshow_for_debug(self.frame)
return self.frame
[ドキュメント]
def return_vcap(self, movie: str) -> cv2.VideoCapture:
"""Return vcap object.
Args:
movie (str): movie
Returns:
object: cv2.VideoCapture
"""
self.movie: str = movie
# movie=movie
if self.movie == 'usb' or self.movie == 'USB': # USB カメラ読み込み時使用
live_camera_number: int = 0
for camera_number in range(0, 5):
vcap = cv2.VideoCapture(camera_number)
# BUG: cv2.CAP_FFMPEG
# vcap = cv2.VideoCapture(camera_number, cv2.CAP_FFMPEG)
# vcap = cv2.VideoCapture(camera_number, cv2.CAP_DSHOW)
ret: bool
frame: npt.NDArray[np.uint8]
ret, frame = vcap.read()
if ret:
"""DEBUG
self.frame_imshow_for_debug(frame)
"""
if ret:
live_camera_number = camera_number
self.finalize(vcap)
break
if camera_number == 4 and ret is False:
self.logger.warning("USBカメラの受信が出来ません")
self.logger.warning("以下のエラーをシステム管理者へお伝えください")
self.logger.warning("-" * 20)
self.logger.warning(format_exc(limit=None, chain=True))
self.logger.exception("USBカメラとの通信に異常が発生しました")
self.logger.warning("-" * 20)
self.logger.warning("終了します")
self.finalize(vcap)
exit(0)
# print(f'live_camera_number: {live_camera_number}')
vcap = cv2.VideoCapture(live_camera_number)
return vcap
else:
vcap = cv2.VideoCapture(self.movie)
# # debug
# if vcap.isOpened():
# print("VideoCaptureオブジェクトが正しく開かれています。")
# else:
# print("VideoCaptureオブジェクトが開かれていません。")
# import os
# current_directory = os.getcwd()
# print("現在の作業ディレクトリ:", current_directory)
return vcap
[ドキュメント]
def finalize(self, vcap) -> None:
"""Release vcap and Destroy window.
Args:
vcap (cv2.VideoCapture): vcap which is handle of input video process
"""
self.vcap = vcap
self.vcap.release()
cv2.destroyAllWindows()
# @lru_cache(maxsize=None)
[ドキュメント]
def frame_generator(
self,
CONFIG: Dict
) -> Generator:
"""Generator: Return resized frame data.
Args:
CONFIG (Dict): CONFIG
Raises:
StopIteration: `ret` == False, then raise `StopIteration`
Yields:
Generator: Resized frame data (npt.NDArray[np.uint8])
"""
self.CONFIG: Dict = CONFIG
"""Initial values"""
frame_skip_counter: int
set_width: int = self.CONFIG["set_width"]
set_height: int = self.CONFIG["set_height"]
movie: str = self.CONFIG["movie"]
set_area: str = self.CONFIG["set_area"]
ret: bool
# vcap
frame: npt.NDArray[np.uint8]
# RootDir: str = self.CONFIG["RootDir"]
# chdir(RootDir) # Not use
# Calculate coordinates of corners: Tuple[int,int,int,int] for 'angle of view'.
TOP_LEFT: Tuple[int, int, int, int]
TOP_RIGHT: Tuple[int, int, int, int]
BOTTOM_LEFT: Tuple[int, int, int, int]
BOTTOM_RIGHT: Tuple[int, int, int, int]
CENTER: Tuple[int, int, int, int]
TOP_LEFT, TOP_RIGHT, BOTTOM_LEFT, BOTTOM_RIGHT, CENTER = \
self._cal_angle_coordinate(
self.CONFIG["height"],
self.CONFIG["width"]
)
if (movie == 'usb' or movie == 'USB'): # USB カメラ読み込み時使用
camera_number: int = 0
live_camera_number: int = 0
for camera_number in range(-1, 5):
vcap = cv2.VideoCapture(camera_number)
ret, frame = vcap.read()
if ret:
live_camera_number = camera_number
break
# TODO: #39 変数camera_numberの挙動が奇妙。-1は本当か、なぜログ出力が5行もでる?
self.logger.info(f'CAMERA DEVICE NUMBER: {camera_number}')
while vcap.isOpened():
# frame_skipの数値に満たない場合は処理をスキップ
for frame_skip_counter in range(1, self.CONFIG["frame_skip"]):
ret, frame = vcap.read()
if frame_skip_counter < self.CONFIG["frame_skip"]:
continue
if ret == False:
self.logger.warning("ERROR OCURRED\nREPORTED BY FACE01")
self.logger.warning("-" * 20)
self.logger.warning(format_exc(limit=None, chain=True))
self.logger.warning("-" * 20)
self.finalize(vcap)
break
else:
# 画角値をもとに各frameを縮小
# python版
frame = self._angle_of_view_specification(set_area, frame, TOP_LEFT, TOP_RIGHT, BOTTOM_LEFT, BOTTOM_RIGHT, CENTER) # type: ignore
# 各frameリサイズ
resized_frame = self.resize_frame(set_width, set_height, frame)
"""DEBUG
self.frame_imshow_for_debug(resized_frame)
"""
yield resized_frame
elif 'http' in movie: # http通信を行う場合
"""see bellow
[Panasonic製ネットワークカメラの画像を取得して顔検出をしてみる](https://qiita.com/mix_dvd/items/a0bdbe0ba628d5282639)
[Python, Requestsの使い方](https://note.nkmk.me/python-requests-usage/)
"""
url = movie
# 画像の取得
try:
# responseの内容について分岐
while True:
# frame_skipの数値に満たない場合は処理をスキップ
for frame_skip_counter in range(1, self.CONFIG["frame_skip"]):
response = requests.get(url, auth = HTTPDigestAuth(self.CONFIG["user"], self.CONFIG["passwd"]))
# print(f'response: {response}')
if frame_skip_counter < self.CONFIG["frame_skip"]:
continue
# {'Status': '200', 'Connection': 'Close', 'Set-Cookie': 'Session=0', 'Accept-Ranges': 'bytes',
# 'Cache-Control': 'no-cache', 'Content-length': '40140', 'Content-type': 'image/jpeg'}
# if response.headers['Status'] == '200' and response.headers['Content-type'] == 'image/jpeg':
if response.headers['Content-type'] == 'image/jpeg':
# 取得した画像データをOpenCVで扱う形式に変換
img_bin = BytesIO(response.content)
img_pil = Image.open(img_bin)
img_np = np.asarray(img_pil)
frame = cv2.cvtColor(img_np, cv2.COLOR_RGBA2BGR)
"""DEBUG
cv2.imshow("video_capture_DEBUG", frame)
cv2.moveWindow("video_capture_DEBUG", 0,0)
cv2.waitKey(5000)
cv2.destroyAllWindows()
exit(0)
"""
# 画角値をもとに各frameを縮小
# python版
frame = self._angle_of_view_specification(set_area, frame, TOP_LEFT, TOP_RIGHT, BOTTOM_LEFT, BOTTOM_RIGHT, CENTER)
# 各frameリサイズ
resized_frame = self.resize_frame(set_width, set_height, frame)
"""DEBUG
cv2.imshow("video_capture_DEBUG", frame)
cv2.moveWindow("video_capture_DEBUG", 0,0)
cv2.waitKey(500)
cv2.destroyAllWindows()
"""
try:
yield resized_frame
except TypeError as e:
self.logger.warning(e)
except Exception as e:
self.logger.warning(e)
finally:
yield resized_frame
else:
self.logger.warning("以下のエラーをシステム管理者へお伝えください")
self.logger.warning(f"ステータスコード: {response.headers['Status']}")
self.logger.warning(f"コンテントタイプ: {response.headers['Content-type']}")
self.logger.warning("-" * 20)
self.logger.warning(format_exc(limit=None, chain=True))
self.logger.warning("-" * 20)
exit(0)
except:
self.logger.warning("以下のエラーをシステム管理者へお伝えください")
self.logger.warning("-" * 20)
self.logger.warning(format_exc(limit=None, chain=True))
self.logger.exception("通信に異常が発生しました")
self.logger.warning("-" * 20)
self.logger.warning("終了します")
exit(0)
else: # RTSPの場合は通常のテスト動画と同じ
vcap = cv2.VideoCapture(movie)
# vcap = cv2.VideoCapture(movie, cv2.CAP_FFMPEG)
cnt: int = 1
debug_num:int = 1
while vcap.isOpened():
ret, frame = vcap.read()
"""DEBUG"""
# self.frame_imshow_for_debug(frame)
# frame_skipの数値に満たない場合は処理をスキップ
if cnt < self.CONFIG["frame_skip"]:
cnt += 1
continue
cnt = 0
if ret == False:
self.logger.warning("ERROR OCURRED")
self.logger.warning("DATA RECEPTION HAS ENDED")
self.logger.warning("-" * 20)
self.logger.warning(format_exc(limit=None, chain=True))
self.logger.warning("-" * 20)
self.finalize(vcap)
raise StopIteration()
# else:
if ret == True:
"""C++実装試験
# frame = frame.astype(dtype='float64')
size(frame.shape, frame.strides, set_area, frame, TOP_LEFT)
"""
# 画角値をもとに各frameを縮小
# python版
angle_frame = self._angle_of_view_specification(
set_area,
frame,
TOP_LEFT,
TOP_RIGHT,
BOTTOM_LEFT,
BOTTOM_RIGHT,
CENTER
)
# 各frameリサイズ
resized_frame = self.resize_frame(set_width, set_height, angle_frame)
"""DEBUG"""
# self.frame_imshow_for_debug(resized_frame)
yield resized_frame
elif ret == False:
self.finalize(vcap)
"""Reference
- [imreadで返される配列について](https://qiita.com/Castiel/items/53ecbee3c06b9d92759e)
cv2.imreadで読み込んだframe変数について。
> 3次元配列が出力されていることがわかる.この配列をA×B×3次元配列とする.
> Aは画素の行数であり,Bは画素の列数である
> (読み込む画像のサイズによって,行列のサイズは変わるため変数A,Bとした).3は,RGBの輝度である.
> 上の画像において,輝度が縦に大量に並んでいるが,これは
> [[[0行0列目の輝度]~[0行B列目の輝度]]~[[A行0列目の輝度]~[A行B列目の輝度]]]の順に並んでいる.
> (画像において0行0列目は,左上)
> よって,imreadで返される配列とは,画素の輝度を行列の順に格納したものである
> imreadで返された配列の顔画像の部分(顔画像の左上の行列から,右下の行列までの**区分行列**(ブロック行列)の輝度)
> だけを取り出すことで,切り取ることができた.
- [NumPyの軸(axis)と次元数(ndim)とは何を意味するのか](https://deepage.net/features/numpy-axis.html)
"""