Source code for chemios.utils
'''
/*
*
* Useful Utilities
*
*/
'''
import numpy as np
import json
import logging
import datetime
import time
import sys
import glob
import serial
import datetime
import arrow
[docs]def convert_to_lists(df):
'''Convert data frame to list of lists'''
output = []
for i in range(len(list(df))):
column = df.iloc[:,i]
output.append(column.values.tolist())
for i in output:
for j in i:
try:
x = output[i][j]
output[i][j] = x.item()
except Exception:
pass
return output
[docs]def import_module(name):
'''Import a module from a string'''
components = name.split('.')
mod = __import__(components[0])
for comp in components[1:]:
mod = getattr(mod, comp)
return mod
#Useful functions for serial
[docs]def serial_write(ser, cmd):
""" General Serial Writing Method
Args:
Ser (:object:): Serial object from pyserial
cmd (str): String being sent
handle(str): Function handle (needed for error reporting)
output(bool): If true, fucntion returns output/errors from serial defvece
"""
ser.flushOutput()
ser.write(cmd.encode())
logging.debug('Sent serial cmd ' + cmd)
[docs]def sio_write(sio, cmd,
output=False, exp = None, ctx = 'Device',
timeout = 2):
""" General Serial Writing Method with reading response
Args:
sio (:object:): io.Wrapper object from pyserial
cmd (str): String being sent
exp (str): Expected response (optional)
ctx (str): The device being communicated with. Used for debug messages (optional)
output(bool): If true, function returns output/errors from serial device. Defaults to false.
timeout (int): Timeout in seconds. Defaults to 2 seconds.
Returns:
Response from the serial buffer.
"""
#Add carriage return at the end of the string if it was forgotten
cmd = cmd + '\x0D'
try:
sio.write(cmd)
except TypeError:
try:
sio.write(cmd.encode())
except Exception:
logging.warning("sio_write cannot send message: {}".format(cmd))
return 1
logging.debug('Sent serial cmd ' + cmd)
if output:
start = arrow.utcnow()
timeout = datetime.timedelta(seconds = timeout)
response_buffer = ''
while True:
sio.flush()
time.sleep(0.1)
response = sio.readline()
response = response.strip('\n')
response = response.strip('\r')
if response != '':
response_buffer = str(response) + response_buffer
else:
if response != exp and exp:
logging.warning('Did not receive expected response of {} from command {}. '
'{} might not be connected.'
.format(exp, cmd, ctx))
return response_buffer
elapsed_time = arrow.utcnow() - start
if elapsed_time > timeout:
logging.debug('chemios.utils.sio_write timeout after {} seconds.'.format(str(elapsed_time.seconds)))
if response != exp and exp:
logging.warning('Did not receive expected response of {} from command {}. '
'{} might not be connected.'
.format(exp, cmd, ctx))
return response_buffer
[docs]class SerialTestClass(object):
"""A serial port test class using a mock port """
def __init__(self):
self._port = "loop://"
self._timeout = 0
self._baudrate = 9600
self.ser = \
serial.serial_for_url(url=self._port,
timeout=self._timeout,
baudrate=self._baudrate)
[docs]def discover_serial_ports():
""" Lists serial port names
:raises EnvironmentError:
On unsupported or unknown platforms
:returns:
A list of the serial ports available on the system
"""
#For windows
if sys.platform.startswith('win'):
ports = ['COM%s' % (i + 1) for i in range(256)]
#for linux/cygwin
elif sys.platform.startswith('linux') or sys.platform.startswith('cygwin'):
# this excludes your current terminal "/dev/tty"
ports = glob.glob('/dev/ttyUSB*') + glob.glob('/dev/ttyACM*')
#for mac
elif sys.platform.startswith('darwin'):
ports = glob.glob('/dev/tty.*')
ports = ports + glob.glob('/dev/cu.usb*')
print(ports)
else:
raise EnvironmentError('Unsupported platform')
#Remove any ports that we don't want
ports_to_remove = ['/dev/ttyS0', '/dev/ttyAMA0', '/dev/ttyprintk','/dev/tty.Bluetooth-Incoming-Port']
for port in ports_to_remove:
try:
ports.remove(port)
except Exception:
pass
result = []
for port in ports:
try:
s = serial.Serial(port)
s.close()
result.append(port)
except (OSError, serial.SerialException):
pass
return result
[docs]def write_i2c(string, bus, address):
"""Method for writing via i2c"""
converted = []
for b in string:
#Convert string to bytes
converted.append(ord(b))
#Write converted string to given address with 0 offset
bus.write_i2c_block_data(address,0, converted)
return -1