My goal of this script is to start recording to a video file when motion is detected. When motion is no longer detected, the writer will be released and the script will create a new video the next time that motion is detected...and so on.
With the below script, I'm able to start writing to a video file once motion is detected, but I have to press the q button to release the writer and make the video playable. If I don't press q, the recording will stop, but the next time there's motion it will just be added to the existing video. I've tried writer.release() in a few places without success.
# import the necessary packages
from pyimagesearch.tempimage import TempImage
import argparse
import warnings
import datetime
import imutils
import json
import numpy as np
import time
import cv2
print("[INFO] Kicking off script - " +
datetime.datetime.now().strftime("%Y-%m-%d_%H_%M_%S"))
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-c", "--conf", required=True,
help="path to the JSON configuration file")
args = vars(ap.parse_args())
# filter warnings, load the configuration and initialize the Dropbox
# client
warnings.filterwarnings("ignore")
conf = json.load(open(args["conf"]))
client = None
# initialize the camera and grab a reference to the raw camera capture
# if the video argument is None, then we are reading from webcam
if not conf["use_ip_cam"]:
camera = cv2.VideoCapture(0)
time.sleep(0.25)
# otherwise, we are reading from a video input
else:
camera = cv2.VideoCapture(conf["ip_cam_addr"])
# allow the camera to warmup, then initialize the average frame, last
# uploaded timestamp, and frame motion counter
print("[INFO] warming up...")
time.sleep(conf["camera_warmup_time"])
avg = None
lastUploaded = datetime.datetime.now()
motionCounter = 0
fourcc = 0x00000020 # a little hacky, but works for now
writer = None
(h, w) = (None, None)
zeros = None
output = None
# capture frames from the camera
# for f in camera.capture_continuous(rawCapture, format="bgr",
# use_video_port=True):
while True:
# grab the raw NumPy array representing the image and initialize
# the timestamp and occupied/unoccupied text
(grabbed, frame) = camera.read()
# frame = f.array
timestamp = datetime.datetime.now()
motion_detected = False
# if the frame could not be grabbed, then we have reached the end
# of the video
if not grabbed:
break
# resize the frame, convert it to grayscale, and blur it
frame = imutils.resize(frame, width=500)
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
# if the average frame is None, initialize it
if avg is None:
print("[INFO] starting background model...")
avg = gray.copy().astype("float")
# frame.truncate(0)
continue
# accumulate the weighted average between the current frame and
# previous frames, then compute the difference between the current
# frame and running average
cv2.accumulateWeighted(gray, avg, 0.5)
frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg))
# threshold the delta image, dilate the thresholded image to fill
# in holes, then find contours on thresholded image
thresh = cv2.threshold(frameDelta, conf["delta_thresh"], 255,
cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
(_, cnts, _) = cv2.findContours(thresh.copy(),
cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# loop over the contours
for c in cnts:
# if the contour is too small, ignore it
if cv2.contourArea(c) < conf["min_area"]:
continue
# compute the bounding box for the contour, draw it on the frame,
# and update the text
(x, y, w1, h1) = cv2.boundingRect(c)
cv2.rectangle(frame, (x, y), (x + w1, y + h1), (0, 255, 0), 2)
motion_detected = True
fps = camera.get(cv2.CAP_PROP_FPS)
ts = timestamp.strftime("%Y-%m-%d_%H_%M_%S")
time_and_fps = ts + " - fps: " + str(fps)
# draw the text and timestamp on the frame
cv2.putText(frame, "Motion Detected: {}".format(motion_detected), (10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
cv2.putText(frame, time_and_fps, (10, frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.35, (0, 0, 255), 1)
# check to see if the room is occupied
if motion_detected:
motionCounter += 1
# check to see if the number of frames with consistent motion is
# high enough
if motionCounter >= conf["min_motion_frames"]:
# check if the writer is None
if writer is None:
print("hitting writer is none")
# store the image dimensions, initialzie the video
# writer, and construct the zeros array
(h2, w2) = frame.shape[:2]
writer = cv2.VideoWriter("/Users/user/Library/Mobile Documents/com~apple~CloudDocs/testMotionDetection/" +
datetime.datetime.now().strftime("%Y-%m-%d_%H_%M_%S") + ".mp4",
fourcc, fps,
(w2, h2), True)
zeros = np.zeros((h2, w2), dtype="uint8")
# construct the final output frame, storing the
# original frame
output = np.zeros((h2, w2, 3), dtype="uint8")
output[0:h2, 0:w2] = frame
# write the output frame to file
writer.write(output)
# otherwise, there is no motion
else:
writer.release()
# Traceback (most recent call last):
# File "pi_surveillance.py", line 178, in <module>
# writer.release()
# AttributeError: 'NoneType' object has no attribute 'release'
motionCounter = 0
# check to see if the frames should be displayed to screen
if conf["show_video"]:
# display the security feed
cv2.imshow("Security Feed", frame)
key = cv2.waitKey(1) & 0xFF
# if the `q` key is pressed, break from the loop
if key == ord("q"):
break
# cleanup the camera and close any open windows
print("[INFO] cleaning up...")
camera.release()
cv2.destroyAllWindows()
# writer.release() - only releases writer when q is pressed
I was able to get this working correctly. This script creates a VideoWriter object for every frame with a temporary file. If motion is no detected within that frame, the writer is released and the file that was created with VideoWriter gets deleted.
If motion is detected, the file is kept and written to. Once motion is no longer detected, a countdown timer is started so it continues recording as long as you set. If motion is detected before the timer hits 0, then it continues to record and resets the timer and so on.
This is still being tuned, but works pretty well.
#!/usr/local/bin/python3
import argparse
import warnings
import datetime
import imutils
import json
import numpy as np
import os
import time
import cv2
print("[INFO] Kicking off script - " +
datetime.datetime.now().strftime("%Y-%m-%d_%H_%M_%S"))
# construct the argument parser and parse the arguments
ap = argparse.ArgumentParser()
ap.add_argument("-c", "--conf", required=True,
help="path to the JSON configuration file")
args = vars(ap.parse_args())
# filter warnings, load the configuration
warnings.filterwarnings("ignore")
conf = json.load(open(args["conf"]))
# initialize the camera and grab a reference to the raw camera capture
# if the video argument is None, then we are reading from webcam
if not conf["use_ip_cam"]:
camera = cv2.VideoCapture(0)
time.sleep(0.25)
# otherwise, we are reading from a video input
else:
camera = cv2.VideoCapture(conf["ip_cam_addr"])
# allow the camera to warmup, then initialize the average frame, last
# uploaded timestamp, and frame motion counter
print("[INFO] warming up...")
time.sleep(conf["camera_warmup_time"])
avg = None
lastUploaded = datetime.datetime.now()
motion_counter = 0
non_motion_timer = conf["nonMotionTimer"]
fourcc = 0x00000020 # a little hacky, but works for now
writer = None
(h, w) = (None, None)
zeros = None
output = None
made_recording = False
# capture frames from the camera
while True:
# grab the raw NumPy array representing the image and initialize
# the timestamp and occupied/unoccupied text
(grabbed, frame) = camera.read()
timestamp = datetime.datetime.now()
motion_detected = False
# if the frame could not be grabbed, then we have reached the end
# of the video
if not grabbed:
print("[INFO] Frame couldn't be grabbed. Breaking - " +
datetime.datetime.now().strftime("%Y-%m-%d_%H_%M_%S"))
break
# resize the frame, convert it to grayscale, and blur it
frame = imutils.resize(frame, width=conf["resizeWidth"])
gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
gray = cv2.GaussianBlur(gray, (21, 21), 0)
# if the average frame is None, initialize it
if avg is None:
print("[INFO] starting background model...")
avg = gray.copy().astype("float")
# frame.truncate(0)
continue
# accumulate the weighted average between the current frame and
# previous frames, then compute the difference between the current
# frame and running average
cv2.accumulateWeighted(gray, avg, 0.5)
frameDelta = cv2.absdiff(gray, cv2.convertScaleAbs(avg))
# threshold the delta image, dilate the thresholded image to fill
# in holes, then find contours on thresholded image
thresh = cv2.threshold(frameDelta, conf["delta_thresh"], 255,
cv2.THRESH_BINARY)[1]
thresh = cv2.dilate(thresh, None, iterations=2)
(_, cnts, _) = cv2.findContours(thresh.copy(),
cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
# loop over the contours
for c in cnts:
# if the contour is too small, ignore it
if cv2.contourArea(c) < conf["min_area"]:
continue
# compute the bounding box for the contour, draw it on the frame,
# and update the text
(x, y, w1, h1) = cv2.boundingRect(c)
cv2.rectangle(frame, (x, y), (x + w1, y + h1), (0, 255, 0), 2)
motion_detected = True
fps = int(round(camera.get(cv2.CAP_PROP_FPS)))
record_fps = 10
ts = timestamp.strftime("%Y-%m-%d_%H_%M_%S")
time_and_fps = ts + " - fps: " + str(fps)
# draw the text and timestamp on the frame
cv2.putText(frame, "Motion Detected: {}".format(motion_detected), (10, 20),
cv2.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 2)
cv2.putText(frame, time_and_fps, (10, frame.shape[0] - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.35, (0, 0, 255), 1)
# Check if writer is None TODO: make path configurable
if writer is None:
filename = datetime.datetime.now().strftime("%Y-%m-%d_%H_%M_%S")
file_path = (conf["userDir"] + "/Library/Mobile Documents/"
"com~apple~CloudDocs/testMotionDetection/testing/"
"{filename}.mp4")
file_path = file_path.format(filename=filename)
(h2, w2) = frame.shape[:2]
writer = cv2.VideoWriter(file_path, fourcc, record_fps, (w2, h2), True)
zeros = np.zeros((h2, w2), dtype="uint8")
def record_video():
# construct the final output frame, storing the original frame
output = np.zeros((h2, w2, 3), dtype="uint8")
output[0:h2, 0:w2] = frame
# write the output frame to file
writer.write(output)
# print("[DEBUG] Recording....")
if motion_detected:
# increment the motion counter
motion_counter += 1
# check to see if the number of frames with motion is high enough
if motion_counter >= conf["min_motion_frames"]:
if conf["create_image"]:
# create image TODO: make path configurable
image_path = (conf["userDir"] + "/Library/Mobile Documents/"
"com~apple~CloudDocs/testMotionDetection/testing"
"/{filename}.jpg").format(filename=filename)
cv2.imwrite(image_path, frame)
record_video()
made_recording = True
non_motion_timer = conf["nonMotionTimer"]
# If there is no motion, continue recording until timer reaches 0
# Else clean everything up
else: # TODO: implement a max recording time
# print("[DEBUG] no motion")
if made_recording is True and non_motion_timer > 0:
non_motion_timer -= 1
# print("[DEBUG] first else and timer: " + str(non_motion_timer))
record_video()
else:
# print("[DEBUG] hit else")
motion_counter = 0
if writer is not None:
# print("[DEBUG] hit if 1")
writer.release()
writer = None
if made_recording is False:
# print("[DEBUG] hit if 2")
os.remove(file_path)
made_recording = False
non_motion_timer = conf["nonMotionTimer"]
# check to see if the frames should be displayed to screen
if conf["show_video"]:
cv2.imshow("Security Feed", frame)
key = cv2.waitKey(1) & 0xFF
# if the `q` key is pressed, break from the loop
if key == ord("q"):
break
# cleanup the camera and close any open windows
print("[INFO] cleaning up...")
camera.release()
cv2.destroyAllWindows()
Example config file:
{
"show_video": true,
"use_ip_cam": false,
"ip_cam_addr": "rtsp://<ip>/live0.264",
"create_image": true,
"min_upload_seconds": 5,
"min_motion_frames": 12,
"camera_warmup_time": 2.5,
"delta_thresh": 5,
"resolution": [640, 480],
"fps": 16,
"min_area": 500,
"userDir": "/Path/to/user",
"resizeWidth": 500,
"nonMotionTimer": 36
}
User contributions licensed under CC BY-SA 3.0