501 lines
18 KiB
Python
501 lines
18 KiB
Python
#!/usr/bin/python3
|
|
import os
|
|
import sys
|
|
import json
|
|
import time
|
|
import datetime
|
|
from datetime import timedelta
|
|
import traceback
|
|
import logging.handlers
|
|
|
|
import cv2
|
|
import glob
|
|
import pandas as pd
|
|
import pika
|
|
import boto3
|
|
import serial
|
|
|
|
os.environ['GENICAM_GENTL64_PATH'] = '/opt/ids-peak_2.9.0.0-48_amd64/lib/ids/cti/'
|
|
|
|
from ids_peak import ids_peak as peak
|
|
from ids_peak_ipl import ids_peak_ipl
|
|
|
|
###############################################
|
|
# Logger Setting #
|
|
###############################################
|
|
logger = logging.getLogger()
|
|
logger.setLevel(logging.INFO)
|
|
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
|
|
log_fileHandler = logging.handlers.RotatingFileHandler(
|
|
filename=f"./logs/gseps.log",
|
|
maxBytes=1024000,
|
|
backupCount=3,
|
|
mode='a')
|
|
|
|
log_fileHandler.setFormatter(formatter)
|
|
logger.addHandler(log_fileHandler)
|
|
|
|
###############################################
|
|
# Config #
|
|
###############################################
|
|
with open('./acquisition_config.json', 'r') as f:
|
|
info = json.load(f)
|
|
|
|
LASER_SAVE_PATH = info['laser_save_path']
|
|
LASER_DEVICE_PATH = info['laser_device_path']
|
|
|
|
S3_BUCKET = info['S3BucketName']
|
|
GSEPS_STAGE_BUCKET = info['gseps_stage_bucket']
|
|
|
|
S3_UPLOAD_KEY = info['BucketKey']
|
|
|
|
s3 = boto3.resource('s3')
|
|
sdt_s3 = boto3.resource('s3',
|
|
aws_access_key_id=info['AccessKey'],
|
|
aws_secret_access_key=info['SecretKey'],
|
|
region_name=info['Boto3RegionName'])
|
|
|
|
###############################################
|
|
# Camera Variable #
|
|
###############################################
|
|
m_device = None
|
|
m_dataStream = None
|
|
m_node_map_remote_device = None
|
|
|
|
m_device_2 = None
|
|
m_dataStream_2 = None
|
|
m_node_map_remote_device_2 = None
|
|
|
|
|
|
class Publisher:
|
|
def __init__(self):
|
|
self.__url = info['amqp_url']
|
|
self.__port = info['amqp_port']
|
|
self.__vhost = info['amqp_vhost']
|
|
self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw'])
|
|
self.__queue = info['amqp_queue']
|
|
self.__ReadyQ = info['amqp_ReadyQ']
|
|
|
|
def check_server_state(self):
|
|
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
|
|
self.__port,
|
|
self.__vhost,
|
|
self.__cred))
|
|
chan = conn.channel()
|
|
method, properties, body = chan.basic_get(queue=self.__ReadyQ,
|
|
auto_ack=True)
|
|
|
|
if method:
|
|
chan.queue_purge(queue=self.__ReadyQ)
|
|
conn.close()
|
|
return True
|
|
|
|
conn.close()
|
|
return False
|
|
|
|
def pub(self, body: dict):
|
|
try:
|
|
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
|
|
self.__port,
|
|
self.__vhost,
|
|
self.__cred))
|
|
chan = conn.channel()
|
|
chan.basic_publish(exchange='',
|
|
routing_key=self.__queue,
|
|
body=json.dumps(body))
|
|
conn.close()
|
|
return
|
|
except Exception as e:
|
|
# add error alarm
|
|
logger.error(traceback.format_exc())
|
|
|
|
|
|
def open_camera():
|
|
global m_device, m_node_map_remote_device, m_device_2, m_node_map_remote_device_2
|
|
|
|
try:
|
|
# Create instance of the device manager
|
|
device_manager = peak.DeviceManager.Instance()
|
|
|
|
# Update the device manager
|
|
device_manager.Update()
|
|
|
|
# Return if no device was found
|
|
if device_manager.Devices().empty():
|
|
return False
|
|
|
|
# open the first openable device in the device manager's device list
|
|
device_count = device_manager.Devices().size()
|
|
if device_manager.Devices()[0].IsOpenable() and device_manager.Devices()[1].IsOpenable():
|
|
m_device = device_manager.Devices()[1].OpenDevice(peak.DeviceAccessType_Control)
|
|
# Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
|
|
m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
|
|
|
|
m_device_2 = device_manager.Devices()[0].OpenDevice(peak.DeviceAccessType_Control)
|
|
m_node_map_remote_device_2 = m_device_2.RemoteDevice().NodeMaps()[0]
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def prepare_acquisition():
|
|
global m_dataStream, m_dataStream_2
|
|
try:
|
|
data_streams = m_device.DataStreams()
|
|
data_streams_2 = m_device_2.DataStreams()
|
|
if data_streams.empty() or data_streams_2.empty():
|
|
# no data streams available
|
|
return False
|
|
m_dataStream = m_device.DataStreams()[0].OpenDataStream()
|
|
m_dataStream_2 = m_device_2.DataStreams()[0].OpenDataStream()
|
|
return True
|
|
|
|
except Exception as e:
|
|
print(e)
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def set_roi(width, height):
|
|
try:
|
|
offset_x = int((4512 - width)/2)
|
|
offset_y = int((4512 - height)/2)
|
|
# Get the minimum ROI and set it. After that there are no size restrictions anymore
|
|
x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
|
|
y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
|
|
w_min = m_node_map_remote_device.FindNode("Width").Minimum()
|
|
h_min = m_node_map_remote_device.FindNode("Height").Minimum()
|
|
|
|
m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
|
|
m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
|
|
m_node_map_remote_device.FindNode("Width").SetValue(w_min)
|
|
m_node_map_remote_device.FindNode("Height").SetValue(h_min)
|
|
|
|
# Get the maximum ROI values
|
|
x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
|
|
y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
|
|
w_max = m_node_map_remote_device.FindNode("Width").Maximum()
|
|
h_max = m_node_map_remote_device.FindNode("Height").Maximum()
|
|
|
|
if (offset_x < x_min) or (offset_y < y_min) or (offset_x > x_max) or (offset_y > y_max):
|
|
return False
|
|
elif (width < w_min) or (height < h_min) or ((offset_x + width) > w_max) or ((offset_y + height) > h_max):
|
|
return False
|
|
else:
|
|
# Now, set final AOI
|
|
m_node_map_remote_device.FindNode("OffsetX").SetValue(offset_x)
|
|
m_node_map_remote_device.FindNode("OffsetY").SetValue(offset_y)
|
|
m_node_map_remote_device.FindNode("Width").SetValue(width)
|
|
m_node_map_remote_device.FindNode("Height").SetValue(height)
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def alloc_and_announce_buffers():
|
|
try:
|
|
if m_dataStream and m_dataStream_2:
|
|
# cam1
|
|
# Flush queue and prepare all buffers for revoking
|
|
m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
|
|
|
|
# Clear all old buffers
|
|
for buffer in m_dataStream.AnnouncedBuffers():
|
|
m_dataStream.RevokeBuffer(buffer)
|
|
|
|
payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
|
|
|
|
# Get number of minimum required buffers
|
|
num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
|
|
|
|
# Alloc buffers
|
|
for count in range(num_buffers_min_required):
|
|
buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
|
|
m_dataStream.QueueBuffer(buffer)
|
|
|
|
# cam2
|
|
m_dataStream_2.Flush(peak.DataStreamFlushMode_DiscardAll)
|
|
|
|
for buffer in m_dataStream_2.AnnouncedBuffers():
|
|
m_dataStream_2.RevokeBuffer(buffer)
|
|
|
|
payload_size_2 = m_node_map_remote_device_2.FindNode("PayloadSize").Value()
|
|
num_buffers_min_required_2 = m_dataStream_2.NumBuffersAnnouncedMinRequired()
|
|
|
|
for count in range(num_buffers_min_required_2):
|
|
buffer = m_dataStream_2.AllocAndAnnounceBuffer(payload_size_2)
|
|
m_dataStream_2.QueueBuffer(buffer)
|
|
return True
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
def start_acquisition():
|
|
try:
|
|
m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
|
|
m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
|
|
m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
|
|
# m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute()
|
|
|
|
m_dataStream_2.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
|
|
m_node_map_remote_device_2.FindNode("TLParamsLocked").SetValue(1)
|
|
m_node_map_remote_device_2.FindNode("AcquisitionStart").Execute()
|
|
# m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute()
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(traceback.format_exc())
|
|
|
|
return False
|
|
|
|
|
|
# check alarm
|
|
def check_distance():
|
|
while True:
|
|
try:
|
|
ser = serial.Serial(LASER_DEVICE_PATH, baudrate=19200, timeout=1)
|
|
req = b'\x01\x04\x00\x00\x00\x01\x31\xca'
|
|
ser.write(req)
|
|
|
|
result = ser.read(6)
|
|
data_length = result[2]
|
|
hex_distance = result[3:3+data_length]
|
|
distance = int((int.from_bytes(hex_distance, byteorder='big'))/10)
|
|
logger.info(f"laser_value : {distance}mm")
|
|
|
|
ser.close()
|
|
|
|
return distance
|
|
|
|
except KeyboardInterrupt:
|
|
ser.close()
|
|
exit()
|
|
|
|
except Exception as e:
|
|
ser.close()
|
|
# raise
|
|
logger.error(f"Error : {e}")
|
|
continue
|
|
|
|
def upload_to_s3(company, image_list=None):
|
|
today = datetime.date.today()
|
|
|
|
if company == 'sdt':
|
|
for path in image_list:
|
|
image_name = path.split('/')[-1]
|
|
|
|
date = image_name.split('-')[0]
|
|
folder = f'{date[:4]}-{date[4:6]}-{date[6:]}'
|
|
|
|
sdt_s3.Bucket(S3_BUCKET).upload_file(path, f'{folder}/{image_name}')
|
|
|
|
os.remove(path)
|
|
|
|
elif company == 'gseps':
|
|
yesterday = today - timedelta(days=1)
|
|
file_name = f'{yesterday.year:04d}{yesterday.month:02d}{yesterday.day:02d}'
|
|
|
|
laser_csv = sorted(glob.glob(f'{LASER_SAVE_PATH}/{file_name}*.csv'))
|
|
total_laser_info = pd.DataFrame()
|
|
for path in laser_csv:
|
|
try:
|
|
csv = pd.read_csv(path)
|
|
total_laser_info = pd.concat([total_laser_info, csv], ignore_index=True)
|
|
#os.remove(path)
|
|
|
|
except pd.errors.EmptyDataError as e:
|
|
logger.error(f"Error: {path} pandas empty data error, {e}")
|
|
|
|
laser_save_path = f'{LASER_SAVE_PATH}/{yesterday}-mv_laser_height.csv'
|
|
total_laser_info.to_csv(laser_save_path, index=False)
|
|
s3.Bucket(GSEPS_STAGE_BUCKET).upload_file(laser_save_path, f'{S3_UPLOAD_KEY}/year={yesterday.year}/month={yesterday.month:02d}/day={yesterday.day: 02d}/mv_laser_height.csv')
|
|
sdt_s3.Bucket('gseps-daily').upload_file(laser_save_path, f'{yesterday}/mv_laser_height.csv')
|
|
|
|
#os.remove(laser_save_path)
|
|
|
|
|
|
def check_hour_for_laser(current_time):
|
|
if current_time.minute == 0 and current_time.second == 0:
|
|
return True
|
|
|
|
return False
|
|
|
|
|
|
def main():
|
|
logger.info(f'Cam Initialize.')
|
|
|
|
# initialize library
|
|
peak.Library.Initialize()
|
|
|
|
# add logger error
|
|
if not open_camera():
|
|
# error
|
|
print(1)
|
|
sys.exit(-1)
|
|
|
|
|
|
logger.info(f'CAM Load.')
|
|
|
|
if not prepare_acquisition():
|
|
# error
|
|
print(2)
|
|
sys.exit(-2)
|
|
|
|
|
|
if not set_roi(1208, 1024):
|
|
# error
|
|
print(3)
|
|
sys.exit(-3)
|
|
|
|
if not alloc_and_announce_buffers():
|
|
# error
|
|
print(4)
|
|
sys.exit(-4)
|
|
|
|
|
|
if not start_acquisition():
|
|
# error
|
|
print(5)
|
|
sys.exit(-5)
|
|
|
|
|
|
image_info = {}
|
|
publisher = Publisher()
|
|
|
|
laser_history = pd.DataFrame()
|
|
laser_count = 0
|
|
while True:
|
|
try:
|
|
now_datetime = datetime.datetime.now()
|
|
now_unix = int(now_datetime.timestamp())
|
|
now = now_datetime.strftime("%Y%m%d-%H%M%S%f")[:-3]
|
|
|
|
laser_value = check_distance()
|
|
event_flag = 0
|
|
if 230 <= laser_value < 240:
|
|
laser_count = 0
|
|
event_flag = 1
|
|
logger.info(f"Capture Start at {laser_value}")
|
|
|
|
# Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
|
|
################# CHANGE ####################
|
|
|
|
#cam1
|
|
buffer = m_dataStream.WaitForFinishedBuffer(1000)
|
|
image = ids_peak_ipl.Image.CreateFromSizeAndBuffer(
|
|
buffer.PixelFormat(),
|
|
buffer.BasePtr(),
|
|
buffer.Size(),
|
|
buffer.Width(),
|
|
buffer.Height()
|
|
)
|
|
image_processed = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast)
|
|
# Queue buffer again
|
|
m_dataStream.QueueBuffer(buffer)
|
|
###############################################
|
|
img_numpy = image_processed.get_numpy()
|
|
|
|
# cam2
|
|
buffer = m_dataStream_2.WaitForFinishedBuffer(1000)
|
|
image = ids_peak_ipl.Image.CreateFromSizeAndBuffer(
|
|
buffer.PixelFormat(),
|
|
buffer.BasePtr(),
|
|
buffer.Size(),
|
|
buffer.Width(),
|
|
buffer.Height()
|
|
)
|
|
image_processed_2 = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast)
|
|
# Queue buffer again
|
|
m_dataStream_2.QueueBuffer(buffer)
|
|
###############################################
|
|
img_numpy_2 = image_processed_2.get_numpy()
|
|
|
|
# save image
|
|
cam1_image_name = f'{now}_cam1.jpg'
|
|
cam2_image_name = f'{now}_cam2.jpg'
|
|
image_save_path = info['image_save_path']
|
|
|
|
if not os.path.exists(image_save_path):
|
|
os.makedirs(image_save_path)
|
|
|
|
cam1_image_path = os.path.join(image_save_path, cam1_image_name)
|
|
cam2_image_path = os.path.join(image_save_path, cam2_image_name)
|
|
cv2.imwrite(cam1_image_path, img_numpy)
|
|
cv2.imwrite(cam2_image_path, img_numpy_2)
|
|
#print('cam1 path: ',cam1_image_path)
|
|
#print('cam2 path: ',cam2_image_path)
|
|
|
|
# Upload image to MinIO(inference server)
|
|
image_list = [cam1_image_path, cam2_image_path]
|
|
upload_to_s3('sdt', image_list)
|
|
|
|
# publish
|
|
image_info = {'to': {"cam1_image_name": cam1_image_name,
|
|
"cam2_image_name": cam2_image_name,
|
|
"laser": laser_value,
|
|
"bucket": "gseps-dataset"}}
|
|
|
|
if publisher.check_server_state():
|
|
publisher.pub(image_info)
|
|
#print(image_info)
|
|
|
|
############## CHANGE #######################
|
|
# Logger Contents Change // MinIO => S3 #
|
|
#############################################
|
|
logger.info(f'Successfully Uploaded To S3!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
|
|
# print(f'Successfully Uploaded To Minio!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
|
|
|
|
|
|
if 50 <= laser_value < 500:
|
|
current_laser_info = pd.DataFrame([{"timestamp": now_unix,
|
|
"laser_value": laser_value,
|
|
"event": event_flag}])
|
|
|
|
laser_history = pd.concat([laser_history, current_laser_info], ignore_index=True)
|
|
|
|
|
|
current_time = datetime.datetime.now()
|
|
# if check_hour_for_laser(current_time):
|
|
if current_time.minute == 0:
|
|
today = datetime.date.today()
|
|
laser_save_hour = (current_time - timedelta(hours=1)).strftime('%Y%m%d-%H')
|
|
hour = current_time.hour
|
|
|
|
date = laser_save_hour.split('-')[0]
|
|
folder = f'{date[:4]}-{date[4:6]}-{date[6:]}'
|
|
|
|
if not os.path.exists(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv'):
|
|
laser_history.to_csv(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv', index=False)
|
|
sdt_s3.Bucket('gseps-daily').upload_file(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv', f'{folder}/laser/{laser_save_hour}.csv')
|
|
laser_history = pd.DataFrame()
|
|
|
|
if hour == 0:
|
|
upload_to_s3('gseps')
|
|
|
|
laser_count += 1
|
|
time.sleep(0.3)
|
|
|
|
except Exception as e:
|
|
|
|
logger.error(traceback.format_exc())
|
|
exit()
|
|
|
|
peak.Library.Close()
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|
|
|