gseps-image-acquisition/main.py

504 lines
18 KiB
Python
Raw Permalink Normal View History

2024-05-30 05:11:20 +00:00
#!/usr/bin/python3
import os
import sys
import json
import time
import datetime
from datetime import timedelta
import traceback
import logging.handlers
import cv2
import glob
import pandas as pd
import pika
import boto3
import serial
os.environ['GENICAM_GENTL64_PATH'] = '/opt/ids-peak_2.9.0.0-48_amd64/lib/ids/cti/'
from ids_peak import ids_peak as peak
from ids_peak_ipl import ids_peak_ipl
###############################################
# Logger Setting #
###############################################
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
log_fileHandler = logging.handlers.RotatingFileHandler(
filename=f"./logs/gseps.log",
maxBytes=1024000,
backupCount=3,
mode='a')
log_fileHandler.setFormatter(formatter)
logger.addHandler(log_fileHandler)
###############################################
# Config #
###############################################
with open('./acquisition_config.json', 'r') as f:
info = json.load(f)
LASER_SAVE_PATH = info['laser_save_path']
LASER_DEVICE_PATH = info['laser_device_path']
S3_BUCKET = info['S3BucketName']
GSEPS_STAGE_BUCKET = info['gseps_stage_bucket']
S3_UPLOAD_KEY = info['BucketKey']
s3 = boto3.resource('s3')
sdt_s3 = boto3.resource('s3',
aws_access_key_id=info['AccessKey'],
aws_secret_access_key=info['SecretKey'],
region_name=info['Boto3RegionName'])
###############################################
# Camera Variable #
###############################################
m_device = None
m_dataStream = None
m_node_map_remote_device = None
m_device_2 = None
m_dataStream_2 = None
m_node_map_remote_device_2 = None
class Publisher:
def __init__(self):
self.__url = info['amqp_url']
self.__port = info['amqp_port']
self.__vhost = info['amqp_vhost']
self.__cred = pika.PlainCredentials(info['amqp_id'], info['amqp_pw'])
self.__queue = info['amqp_queue']
self.__ReadyQ = info['amqp_ReadyQ']
def check_server_state(self):
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
self.__port,
self.__vhost,
self.__cred))
chan = conn.channel()
method, properties, body = chan.basic_get(queue=self.__ReadyQ,
auto_ack=True)
if method:
chan.queue_purge(queue=self.__ReadyQ)
conn.close()
return True
conn.close()
return False
def pub(self, body: dict):
try:
conn = pika.BlockingConnection(pika.ConnectionParameters(self.__url,
self.__port,
self.__vhost,
self.__cred))
chan = conn.channel()
chan.basic_publish(exchange='',
routing_key=self.__queue,
body=json.dumps(body))
conn.close()
return
except Exception as e:
# add error alarm
logger.error(traceback.format_exc())
def open_camera():
global m_device, m_node_map_remote_device, m_device_2, m_node_map_remote_device_2
try:
# Create instance of the device manager
device_manager = peak.DeviceManager.Instance()
# Update the device manager
device_manager.Update()
# Return if no device was found
if device_manager.Devices().empty():
return False
# open the first openable device in the device manager's device list
device_count = device_manager.Devices().size()
if device_manager.Devices()[0].IsOpenable() and device_manager.Devices()[1].IsOpenable():
m_device = device_manager.Devices()[1].OpenDevice(peak.DeviceAccessType_Control)
# Get NodeMap of the RemoteDevice for all accesses to the GenICam NodeMap tree
m_node_map_remote_device = m_device.RemoteDevice().NodeMaps()[0]
m_device_2 = device_manager.Devices()[0].OpenDevice(peak.DeviceAccessType_Control)
m_node_map_remote_device_2 = m_device_2.RemoteDevice().NodeMaps()[0]
return True
except Exception as e:
print(e)
logger.error(traceback.format_exc())
return False
def prepare_acquisition():
global m_dataStream, m_dataStream_2
try:
data_streams = m_device.DataStreams()
data_streams_2 = m_device_2.DataStreams()
if data_streams.empty() or data_streams_2.empty():
# no data streams available
return False
m_dataStream = m_device.DataStreams()[0].OpenDataStream()
m_dataStream_2 = m_device_2.DataStreams()[0].OpenDataStream()
return True
except Exception as e:
print(e)
logger.error(traceback.format_exc())
return False
def set_roi(width, height):
try:
offset_x = int((4512 - width)/2)
offset_y = int((4512 - height)/2)
# Get the minimum ROI and set it. After that there are no size restrictions anymore
x_min = m_node_map_remote_device.FindNode("OffsetX").Minimum()
y_min = m_node_map_remote_device.FindNode("OffsetY").Minimum()
w_min = m_node_map_remote_device.FindNode("Width").Minimum()
h_min = m_node_map_remote_device.FindNode("Height").Minimum()
m_node_map_remote_device.FindNode("OffsetX").SetValue(x_min)
m_node_map_remote_device.FindNode("OffsetY").SetValue(y_min)
m_node_map_remote_device.FindNode("Width").SetValue(w_min)
m_node_map_remote_device.FindNode("Height").SetValue(h_min)
# Get the maximum ROI values
x_max = m_node_map_remote_device.FindNode("OffsetX").Maximum()
y_max = m_node_map_remote_device.FindNode("OffsetY").Maximum()
w_max = m_node_map_remote_device.FindNode("Width").Maximum()
h_max = m_node_map_remote_device.FindNode("Height").Maximum()
if (offset_x < x_min) or (offset_y < y_min) or (offset_x > x_max) or (offset_y > y_max):
return False
elif (width < w_min) or (height < h_min) or ((offset_x + width) > w_max) or ((offset_y + height) > h_max):
return False
else:
# Now, set final AOI
m_node_map_remote_device.FindNode("OffsetX").SetValue(offset_x)
m_node_map_remote_device.FindNode("OffsetY").SetValue(offset_y)
m_node_map_remote_device.FindNode("Width").SetValue(width)
m_node_map_remote_device.FindNode("Height").SetValue(height)
return True
except Exception as e:
logger.error(traceback.format_exc())
return False
def alloc_and_announce_buffers():
try:
if m_dataStream and m_dataStream_2:
# cam1
# Flush queue and prepare all buffers for revoking
m_dataStream.Flush(peak.DataStreamFlushMode_DiscardAll)
# Clear all old buffers
for buffer in m_dataStream.AnnouncedBuffers():
m_dataStream.RevokeBuffer(buffer)
payload_size = m_node_map_remote_device.FindNode("PayloadSize").Value()
# Get number of minimum required buffers
num_buffers_min_required = m_dataStream.NumBuffersAnnouncedMinRequired()
# Alloc buffers
for count in range(num_buffers_min_required):
buffer = m_dataStream.AllocAndAnnounceBuffer(payload_size)
m_dataStream.QueueBuffer(buffer)
# cam2
m_dataStream_2.Flush(peak.DataStreamFlushMode_DiscardAll)
for buffer in m_dataStream_2.AnnouncedBuffers():
m_dataStream_2.RevokeBuffer(buffer)
payload_size_2 = m_node_map_remote_device_2.FindNode("PayloadSize").Value()
num_buffers_min_required_2 = m_dataStream_2.NumBuffersAnnouncedMinRequired()
for count in range(num_buffers_min_required_2):
buffer = m_dataStream_2.AllocAndAnnounceBuffer(payload_size_2)
m_dataStream_2.QueueBuffer(buffer)
return True
except Exception as e:
logger.error(traceback.format_exc())
return False
def start_acquisition():
try:
m_dataStream.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
m_node_map_remote_device.FindNode("TLParamsLocked").SetValue(1)
m_node_map_remote_device.FindNode("AcquisitionStart").Execute()
# m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute()
m_dataStream_2.StartAcquisition(peak.AcquisitionStartMode_Default, peak.DataStream.INFINITE_NUMBER)
m_node_map_remote_device_2.FindNode("TLParamsLocked").SetValue(1)
m_node_map_remote_device_2.FindNode("AcquisitionStart").Execute()
# m_node_map_remote_device.FindNode("DeviceRegistersStreamingStart").Execute()
return True
except Exception as e:
logger.error(traceback.format_exc())
return False
# check alarm
def check_distance():
while True:
try:
ser = serial.Serial(LASER_DEVICE_PATH, baudrate=19200, timeout=1)
req = b'\x01\x04\x00\x00\x00\x01\x31\xca'
ser.write(req)
result = ser.read(6)
data_length = result[2]
hex_distance = result[3:3+data_length]
distance = int((int.from_bytes(hex_distance, byteorder='big'))/10)
logger.info(f"laser_value : {distance}mm")
ser.close()
return distance
except KeyboardInterrupt:
ser.close()
exit()
except Exception as e:
print(e)
ser.close()
#logger.error(traceback.format_exc())
# raise
#print(e)
logger.error(f"Error : {e}")
continue
def upload_to_s3(company, image_list=None):
today = datetime.date.today()
if company == 'sdt':
for path in image_list:
image_name = path.split('/')[-1]
date = image_name.split('-')[0]
folder = f'{date[:4]}-{date[4:6]}-{date[6:]}'
sdt_s3.Bucket(S3_BUCKET).upload_file(path, f'{folder}/{image_name}')
os.remove(path)
elif company == 'gseps':
yesterday = today - timedelta(days=1)
file_name = f'{yesterday.year:04d}{yesterday.month:02d}{yesterday.day:02d}'
laser_csv = sorted(glob.glob(f'{LASER_SAVE_PATH}/{file_name}*.csv'))
total_laser_info = pd.DataFrame()
for path in laser_csv:
try:
csv = pd.read_csv(path)
total_laser_info = pd.concat([total_laser_info, csv], ignore_index=True)
#os.remove(path)
except pd.errors.EmptyDataError as e:
logger.error(f"Error: {path} pandas empty data error, {e}")
laser_save_path = f'{LASER_SAVE_PATH}/{yesterday}-mv_laser_height.csv'
total_laser_info.to_csv(laser_save_path, index=False)
s3.Bucket(GSEPS_STAGE_BUCKET).upload_file(laser_save_path, f'{S3_UPLOAD_KEY}/year={yesterday.year}/month={yesterday.month:02d}/day={yesterday.day: 02d}/mv_laser_height.csv')
sdt_s3.Bucket('gseps-daily').upload_file(laser_save_path, f'{yesterday}/mv_laser_height.csv')
#os.remove(laser_save_path)
def check_hour_for_laser(current_time):
if current_time.minute == 0 and current_time.second == 0:
return True
return False
def main():
logger.info(f'Cam Initialize.')
# initialize library
peak.Library.Initialize()
# add logger error
if not open_camera():
# error
print(1)
sys.exit(-1)
logger.info(f'CAM Load.')
if not prepare_acquisition():
# error
print(2)
sys.exit(-2)
if not set_roi(1208, 1024):
# error
print(3)
sys.exit(-3)
if not alloc_and_announce_buffers():
# error
print(4)
sys.exit(-4)
if not start_acquisition():
# error
print(5)
sys.exit(-5)
image_info = {}
publisher = Publisher()
laser_history = pd.DataFrame()
laser_count = 0
while True:
try:
now_datetime = datetime.datetime.now()
now_unix = int(now_datetime.timestamp())
now = now_datetime.strftime("%Y%m%d-%H%M%S%f")[:-3]
laser_value = check_distance()
event_flag = 0
if 50 <= laser_value < 500:
laser_count = 0
event_flag = 1
logger.info(f"Capture Start at {laser_value}")
# Get buffer from device's DataStream. Wait 5000 ms. The buffer is automatically locked until it is queued again.
################# CHANGE ####################
#cam1
buffer = m_dataStream.WaitForFinishedBuffer(1000)
image = ids_peak_ipl.Image.CreateFromSizeAndBuffer(
buffer.PixelFormat(),
buffer.BasePtr(),
buffer.Size(),
buffer.Width(),
buffer.Height()
)
image_processed = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast)
# Queue buffer again
m_dataStream.QueueBuffer(buffer)
###############################################
img_numpy = image_processed.get_numpy()
# cam2
buffer = m_dataStream_2.WaitForFinishedBuffer(1000)
image = ids_peak_ipl.Image.CreateFromSizeAndBuffer(
buffer.PixelFormat(),
buffer.BasePtr(),
buffer.Size(),
buffer.Width(),
buffer.Height()
)
image_processed_2 = image.ConvertTo(ids_peak_ipl.PixelFormatName_BGRa8, ids_peak_ipl.ConversionMode_Fast)
# Queue buffer again
m_dataStream_2.QueueBuffer(buffer)
###############################################
img_numpy_2 = image_processed_2.get_numpy()
# save image
cam1_image_name = f'{now}_cam1.jpg'
cam2_image_name = f'{now}_cam2.jpg'
image_save_path = info['image_save_path']
if not os.path.exists(image_save_path):
os.makedirs(image_save_path)
cam1_image_path = os.path.join(image_save_path, cam1_image_name)
cam2_image_path = os.path.join(image_save_path, cam2_image_name)
cv2.imwrite(cam1_image_path, img_numpy)
cv2.imwrite(cam2_image_path, img_numpy_2)
print('cam1 path: ',cam1_image_path)
print('cam2 path: ',cam2_image_path)
# Upload image to MinIO(inference server)
image_list = [cam1_image_path, cam2_image_path]
upload_to_s3('sdt', image_list)
# publish
image_info = {'to': {"cam1_image_name": cam1_image_name,
"cam2_image_name": cam2_image_name,
"laser": laser_value,
"bucket": "gseps-dataset"}}
if publisher.check_server_state():
publisher.pub(image_info)
print(image_info)
############## CHANGE #######################
# Logger Contents Change // MinIO => S3 #
#############################################
logger.info(f'Successfully Uploaded To S3!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
# print(f'Successfully Uploaded To Minio!. cam1: {cam1_image_name}, cam2: {cam2_image_name}')
if 50 <= laser_value < 500:
current_laser_info = pd.DataFrame([{"timestamp": now_unix,
"laser_value": laser_value,
"event": event_flag}])
laser_history = pd.concat([laser_history, current_laser_info], ignore_index=True)
current_time = datetime.datetime.now()
# if check_hour_for_laser(current_time):
if current_time.minute == 0:
today = datetime.date.today()
laser_save_hour = (current_time - timedelta(hours=1)).strftime('%Y%m%d-%H')
hour = current_time.hour
date = laser_save_hour.split('-')[0]
folder = f'{date[:4]}-{date[4:6]}-{date[6:]}'
if not os.path.exists(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv'):
laser_history.to_csv(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv', index=False)
sdt_s3.Bucket('gseps-daily').upload_file(f'{LASER_SAVE_PATH}/{laser_save_hour}.csv', f'{folder}/laser/{laser_save_hour}.csv')
laser_history = pd.DataFrame()
#if hour == 0:
# upload_to_s3('gseps')
laser_count += 1
time.sleep(0.3)
except Exception as e:
logger.error(traceback.format_exc())
exit()
peak.Library.Close()
sys.exit(0)
if __name__ == '__main__':
main()