450 lines
15 KiB
Python
Executable File
450 lines
15 KiB
Python
Executable File
#!/usr/bin/python3
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import datetime
|
|
from datetime import timedelta
|
|
import traceback
|
|
import logging.handlers
|
|
|
|
import cv2
|
|
import glob
|
|
import pandas as pd
|
|
import pika
|
|
import boto3
|
|
import serial
|
|
|
|
os.environ['GENICAM_GENTL64_PATH'] = '/opt/ids-peak_2.7.0.0-16268_amd64/lib/ids/cti/'
|
|
|
|
from ids_peak import ids_peak as peak
|
|
from ids_peak_ipl import ids_peak_ipl
|
|
|
|
###############################################
|
|
# Logger Setting #
|
|
###############################################
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
log_fileHandler = logging.handlers.RotatingFileHandler(
|
|
filename=f"./logs/gseps.log",
|
|
maxBytes=1024000,
|
|
backupCount=3,
|
|
mode='a')
|
|
|
|
log_fileHandler.setFormatter(formatter)
|
|
logger.addHandler(log_fileHandler)
|
|
|
|
###############################################
|
|
# Config #
|
|
###############################################
|
|
with open('./acquisition_config.json', 'r') as f:
|
|
info = json.load(f)
|
|
|
|
# CAM2_DEVICE_PATH = info['cam2_device_path']
|
|
LASER_DEVICE_PATH = info['laser_device_path']
|
|
S3_BUCKET = info['S3BucketName']
|
|
GSEPS_STAGE_BUCKET = info['gseps_stage_bucket']
|
|
S3_UPLOAD_KEY = info['BucketKey']
|
|
|
|
###############################################
|
|
# Camera Variable #
|
|
###############################################
|
|
m_device = None
|
|
m_dataStream = None
|
|
m_node_map_remote_device = None
|
|
|
|
# Load Camera2
|
|
# cam2 = cv2.VideoCapture(CAM2_DEVICE_PATH)
|
|
# _ = cam2.read()
|
|
# logger.info(f'CAM2 Load.')
|
|
|
|
|
|
# class Publisher:
|
|
# def __init__(self):
|
|
# self.__url = info['amqp_url']
|
|
# self.__port = info['amqp_port']
|
|
# self.__vhost = info['amqp_vhost']
|
|
# self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw'])
|
|
# self.__queue = info['amqp_queue']
|
|
# self.__ReadyQ = info['amqp_ReadyQ']
|
|
#
|
|
# def check_server_state(self):
|
|
# conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
|
|
# self.__port,
|
|
# self.__vhost,
|
|
# self.__cred))
|
|
# chan = conn.channel()
|
|
# method, properties, body = chan.basic_get(queue=self.__ReadyQ,
|
|
# auto_ack=True)
|
|
#
|
|
# if method:
|
|
# chan.queue_purge(queue=self.__ReadyQ)
|
|
# conn.close()
|
|
# return True
|
|
#
|
|
# conn.close()
|
|
# return False
|
|
#
|
|
# def pub(self, body: dict):
|
|
# try:
|
|
# conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
|
|
# self.__port,
|
|
# self.__vhost,
|
|
# self.__cred))
|
|
# chan = conn.channel()
|
|
# chan.basic_publish(exchange='',
|
|
# routing_key=self.__queue,
|
|
# body=json.dumps(body))
|
|
# conn.close()
|
|
# return
|
|
# except Exception as e:
|
|
# # add error alarm
|
|
# logger.error(traceback.format_exc())
|
|
|
|
|
|
def open_camera():
|
|
global m_device, m_node_map_remote_device
|
|
|
|
try:
|
|
# Create instance of the device manager
|
|
device_manager = peak.DeviceManager.Instance()
|
|
|
|
# Update the device manager
|
|
device_manager.Update()
|
|
|
|
# Return if no device was found
|
|
if device_manager.Devices().empty():
|
|
return False
|
|
|
|
# open the first openable device in the device manager's device list
|
|
device_count = device_manager.Devices().size()
|
|
for i in range(device_count):
|
|
if device_manager.Devices()[i].IsOpenable():
|
|
m_device = device_manager.Devices()[i].OpenDevice(peak.DeviceAccessType_Control)
|
|
|
|
# Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
|
|
m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
# logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def prepare_acquisition():
|
|
global m_dataStream
|
|
try:
|
|
data_streams = m_device.DataStreams()
|
|
if data_streams.empty():
|
|
# no data streams available
|
|
return False
|
|
|
|
m_dataStream = m_device.DataStreams()[0].OpenDataStream()
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def set_roi(x, y, width, height):
|
|
try:
|
|
# Get the minimum ROI and set it. After that there are no size restrictions anymore
|
|
x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
|
|
y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
|
|
w_min = m_node_map_remote_device.FindNode("Width").Minimum()
|
|
h_min = m_node_map_remote_device.FindNode("Height").Minimum()
|
|
|
|
m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
|
|
m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
|
|
m_node_map_remote_device.FindNode("Width").SetValue(w_min)
|
|
m_node_map_remote_device.FindNode("Height").SetValue(h_min)
|
|
|
|
# Get the maximum ROI values
|
|
x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
|
|
y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
|
|
w_max = m_node_map_remote_device.FindNode("Width").Maximum()
|
|
h_max = m_node_map_remote_device.FindNode("Height").Maximum()
|
|
|
|
if (x < x_min) or (y < y_min) or (x > x_max) or (y > y_max):
|
|
return False
|
|
elif (width < w_min) or (height < h_min) or ((x + width) > w_max) or ((y + height) > h_max):
|
|
return False
|
|
else:
|
|
# Now, set final AOI
|
|
m_node_map_remote_device.FindNode("OffsetX").SetValue(0)
|
|
m_node_map_remote_device.FindNode("OffsetY").SetValue(0)
|
|
m_node_map_remote_device.FindNode("Width").SetValue(w_max)
|
|
m_node_map_remote_device.FindNode("Height").SetValue(h_max)
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def alloc_and_announce_buffers():
|
|
try:
|
|
if m_dataStream:
|
|
# Flush queue and prepare all buffers for revoking
|
|
m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
|
|
|
|
# Clear all old buffers
|
|
for buffer in m_dataStream.AnnouncedBuffers():
|
|
m_dataStream.RevokeBuffer(buffer)
|
|
|
|
payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
|
|
|
|
# Get number of minimum required buffers
|
|
num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
|
|
|
|
# Alloc buffers
|
|
for count in range(num_buffers_min_required):
|
|
buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
|
|
m_dataStream.QueueBuffer(buffer)
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def start_acquisition():
|
|
try:
|
|
m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
|
|
m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
|
|
m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
|
|
# m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute()
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
# check alarm
|
|
def check_distance():
|
|
while True:
|
|
try:
|
|
ser = serial.Serial(LASER_DEVICE_PATH, baudrate=9600, timeout=1)
|
|
ser.write(b'O')
|
|
|
|
result = ser.read(6)[1:]
|
|
distance_bytes = [str(n) for n in result]
|
|
distance = int(''.join(distance_bytes))
|
|
logger.info(f"laser_value : {distance}")
|
|
|
|
return distance
|
|
|
|
except KeyboardInterrupt:
|
|
exit()
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
# logger.error(traceback.format_exc())
|
|
# raise
|
|
print(e)
|
|
# logger.error(f"Error : {e}")
|
|
continue
|
|
|
|
|
|
def image_capture_from_cam2(save_path):
|
|
global cam2
|
|
|
|
ret, frame = cam2.read()
|
|
if ret:
|
|
cv2.imwrite(save_path, frame)
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def upload_to_s3(company, image_list=None):
|
|
today = datetime.date.today()
|
|
|
|
if company == 'sdt':
|
|
s3 = boto3.resource('s3',
|
|
aws_access_key_id=info['AccessKey'],
|
|
aws_secret_access_key=info['SecretKey'],
|
|
region_name=info['Boto3RegionName'])
|
|
|
|
for path in image_list:
|
|
image_name = path.split('/')[-1]
|
|
s3.Bucket(S3_BUCKET).upload_file(path, f'{today}/{image_name}')
|
|
|
|
elif company == 'gseps':
|
|
yesterday = today - timedelta(days=1)
|
|
|
|
s3 = boto3.resource('s3')
|
|
|
|
laser_csv = glob.glob('./laser_value/*')
|
|
total_laser_info = pd.DataFrame()
|
|
for path in laser_csv:
|
|
csv = pd.read_csv(path)
|
|
total_laser_info = pd.concat([total_laser_info, csv], ignore_index=True)
|
|
|
|
laser_save_path = f'./laser_value/{yesterday}-mv_laser_height.csv'
|
|
total_laser_info.to_csv(laser_save_path, index=False)
|
|
s3.Bucket(GSEPS_STAGE_BUCKET).upload_file(laser_save_path, f'{S3_UPLOAD_KEY}/year={yesterday.year}/month={yesterday.month:02d}/day={yesterday.day:02d}/mv_laser_height.csv')
|
|
|
|
|
|
def check_hour_for_laser(current_time):
|
|
if current_time.minute == 0 and current_time.second == 0:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def main():
|
|
# logger.info(f'Cam Initialize.')
|
|
|
|
# initialize library
|
|
peak.Library.Initialize()
|
|
|
|
# add logger error
|
|
if not open_camera():
|
|
# error
|
|
print(1)
|
|
sys.exit(-1)
|
|
|
|
logger.info(f'CAM1 Load.')
|
|
|
|
if not prepare_acquisition():
|
|
# error
|
|
print(2)
|
|
sys.exit(-2)
|
|
|
|
if not set_roi(16, 16, 256, 128):
|
|
# error
|
|
print(3)
|
|
sys.exit(-3)
|
|
|
|
if not alloc_and_announce_buffers():
|
|
# error
|
|
print(4)
|
|
sys.exit(-4)
|
|
|
|
if not start_acquisition():
|
|
# error
|
|
print(5)
|
|
sys.exit(-5)
|
|
|
|
image_info = {}
|
|
# publisher = Publisher()
|
|
|
|
laser_history = pd.DataFrame()
|
|
laser_count = 0
|
|
while True:
|
|
try:
|
|
now_datetime = datetime.datetime.now()
|
|
now_unix = int(now_datetime.timestamp())
|
|
now = now_datetime.strftime("%Y%m%d-%H%M%S%f")
|
|
|
|
# laser_value = check_distance()
|
|
event_flag = 0
|
|
|
|
if True:
|
|
laser_count = 0
|
|
event_flag = 1
|
|
# logger.info(f"Capture Start at {laser_value}")
|
|
|
|
# Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
|
|
################# CHANGE ####################
|
|
buffer = m_dataStream.WaitForFinishedBuffer(1000)
|
|
|
|
image = ids_peak_ipl.Image.CreateFromSizeAndBuffer(
|
|
buffer.PixelFormat(),
|
|
buffer.BasePtr(),
|
|
buffer.Size(),
|
|
buffer.Width(),
|
|
buffer.Height()
|
|
)
|
|
|
|
image_processed = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast)
|
|
|
|
# Queue buffer again
|
|
m_dataStream.QueueBuffer(buffer)
|
|
###############################################
|
|
|
|
img_numpy = image_processed.get_numpy()
|
|
|
|
# save image
|
|
cam1_image_name = f'{now}_cam1.jpg'
|
|
cam2_image_name = f'{now}_cam2.jpg'
|
|
image_save_path = info['image_save_path']
|
|
|
|
if not os.path.exists(image_save_path):
|
|
os.makedirs(image_save_path)
|
|
|
|
cam1_image_path = os.path.join(image_save_path, cam1_image_name)
|
|
cam2_image_path = os.path.join(image_save_path, cam2_image_name)
|
|
cv2.imwrite(cam1_image_path, img_numpy)
|
|
print(cam1_image_path)
|
|
|
|
# Capture cam2
|
|
# if not image_capture_from_cam2(cam2_image_path):
|
|
# logger.error(f"Cannot Load CAM2!")
|
|
# cam2_image_path = None
|
|
|
|
# Upload image to MinIO(inference server)
|
|
# image_list = [cam1_image_path, cam2_image_path]
|
|
# upload_to_s3('sdt', image_list)
|
|
|
|
# publish
|
|
# image_info = {'to': {"cam1_image_name": cam1_image_name,
|
|
# "cam2_image_name": cam2_image_name,
|
|
# "laser": laser_value,
|
|
# "bucket": "gseps-dataset"}}
|
|
|
|
# if publisher.check_server_state():
|
|
# publisher.pub(image_info)
|
|
|
|
############## CHANGE #######################
|
|
# Logger Contents Change // MinIO => S3 #
|
|
#############################################
|
|
# logger.info(f'Successfully Uploaded To S3!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
|
|
# print(f'Successfully Uploaded To Minio!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
|
|
|
|
# current_laser_info = pd.DataFrame([{"timestamp": now_unix,
|
|
# "laser_value": laser_value,
|
|
# "event": event_flag}])
|
|
|
|
# laser_history = pd.concat([laser_history, current_laser_info], ignore_index=True)
|
|
#
|
|
# current_time = datetime.datetime.now()
|
|
# if check_hour_for_laser(current_time):
|
|
# laser_save_hour = current_time.strftime('%m%d-%H')
|
|
# hour = current_time.hour
|
|
|
|
# if not os.path.exists(f'./laser_value/{laser_save_hour}.csv'):
|
|
# laser_history.to_csv(f'./laser_value/{laser_save_hour}.csv', index=False)
|
|
# laser_history = pd.DataFrame()
|
|
|
|
# if hour == 0:
|
|
# upload_to_s3('gseps')
|
|
|
|
# laser_count += 1
|
|
# time.sleep(3)
|
|
|
|
except Exception as e:
|
|
print(traceback.format_exc())
|
|
# logger.error(traceback.format_exc())
|
|
exit()
|
|
|
|
peak.Library.Close()
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|