# -*- coding: utf-8 -*-
'''
PyCampbellCR1000.client
-----------------------
Allows data query of Campbell CR1000-type devices.
:copyright: Copyright 2012 Salem Harrache and contributors, see AUTHORS.
:license: GNU GPL v3.
'''
from __future__ import division, unicode_literals
import time
from datetime import datetime, timedelta
from pylink import link_from_url
from .logger import LOGGER
from .pakbus import PakBus
from .exceptions import NoDeviceException
from .compat import xrange, is_py3
from .utils import cached_property, ListDict, Dict, nsec_to_time, time_to_nsec, bytes_to_hex
[docs]class CR1000(object):
'''Communicates with the datalogger by sending commands, reads the binary
data and parses it into usable scalar values.
:param link: A `PyLink` connection.
:param dest_addr: Destination physical address (12-bit int) (default dest)
:param dest: Destination node ID (12-bit int) (default 0x001)
:param src_addr: Source physical address (12-bit int) (default src)
:param src: Source node ID (12-bit int) (default 0x802)
:param security_code: 16-bit security code (default 0x0000)
'''
connected = False
def __init__(self, link, dest_addr=None, dest=0x001, src_addr=None,
src=0x802, security_code=0x0000):
link.open()
LOGGER.info("init client")
self.pakbus = PakBus(link, dest_addr, dest, src_addr, src, security_code)
self.pakbus.wait_packet()
# try ping the datalogger
for i in xrange(20):
LOGGER.info('%d'%(i))
try:
if self.ping_node():
self.connected = True
break
except NoDeviceException:
self.pakbus.link.close()
self.pakbus.link.open()
if not self.connected:
raise NoDeviceException()
[docs] @classmethod
def from_url(cls, url, timeout=10, dest_addr=None, dest=0x001,
src_addr=None, src=0x802, security_code=0x0000):
''' Get device from url.
:param url: A `PyLink` connection URL.
:param timeout: Set a read timeout value.
:param dest_addr: Destination physical address (12-bit int) (default dest)
:param dest: Destination node ID (12-bit int) (default 0x001)
:param src_addr: Source physical address (12-bit int) (default src)
:param src: Source node ID (12-bit int) (default 0x802)
:param security_code: 16-bit security code (default 0x0000)
'''
link = link_from_url(url)
link.settimeout(timeout)
return cls(link, dest_addr, dest, src_addr, src, security_code) #EGC Add security code to the constructor call
[docs] def send_wait(self, cmd):
'''Send command and wait for response packet.'''
packet, transac_id = cmd
begin = time.time()
self.pakbus.write(packet)
# wait response packet
response = self.pakbus.wait_packet(transac_id)
end = time.time()
send_time = timedelta(seconds=int((end - begin) / 2))
return response[0], response[1], send_time
[docs] def ping_node(self):
'''Check if remote host is available.'''
# send hello command and wait for response packet
hdr, msg, send_time = self.send_wait(self.pakbus.get_hello_cmd())
if not (hdr and msg):
raise NoDeviceException()
return True
[docs] def gettime(self):
'''Return the current datetime.'''
self.ping_node()
LOGGER.info('Try gettime')
# send clock command and wait for response packet
hdr, msg, send_time = self.send_wait(self.pakbus.get_clock_cmd())
# remove transmission time
return nsec_to_time(msg['Time']) - send_time
[docs] def settime(self, dtime):
'''Sets the given `dtime` and returns the new current datetime'''
LOGGER.info('Try settime')
current_time = self.gettime()
self.ping_node()
diff = dtime - current_time
diff = diff.total_seconds()
# settime (OldTime in response)
hdr, msg, sdt1 = self.send_wait(self.pakbus.get_clock_cmd((diff, 0)))
# gettime (NewTime in response)
hdr, msg, sdt2 = self.send_wait(self.pakbus.get_clock_cmd())
# remove transmission time
return nsec_to_time(msg['Time']) - (sdt1 + sdt2)
@cached_property
def settings(self):
'''Get device settings as ListDict'''
LOGGER.info('Try get settings')
self.ping_node()
# send getsettings command and wait for response packet
hdr, msg, send_time = self.send_wait(self.pakbus.get_getsettings_cmd())
# remove transmission time
settings = ListDict()
for item in msg["Settings"]:
settings.append(Dict(dict(item)))
return settings
[docs] def getfile(self, filename):
'''Get the file content from the datalogger.'''
LOGGER.info('Try get file')
self.ping_node()
data = []
# Send file upload command packets until no more data is returned
offset = 0x00000000
transac_id = None
while True:
# Upload chunk from file starting at offset
cmd = self.pakbus.get_fileupload_cmd(filename,
offset=offset,
closeflag=0x00,
transac_id=transac_id)
transac_id = cmd[1]
hdr, msg, send_time = self.send_wait(cmd)
try:
if msg['RespCode'] == 1:
raise ValueError("Permission denied")
# End loop if no more data is returned
if not msg['FileData']:
break
# Append file data
data.append(msg['FileData'])
offset += len(msg['FileData'])
except KeyError:
break
return b"".join(data)
def sendfile(self, data, filename):
'''Upload a file to the datalogger.'''
LOGGER.info('Try send file')
raise NotImplementedError('Filedownload transaction is not implemented'
' yet')
def list_files(self):
'''List the files available in the datalogger.'''
data = self.getfile('.DIR')
# List files in directory
filedir = self.pakbus.parse_filedir(data)
return [item['FileName'] for item in filedir['files']]
@cached_property
def table_def(self):
'''Return table definition.'''
data = self.getfile('.TDF')
# List tables
tabledef = self.pakbus.parse_tabledef(data)
return tabledef
[docs] def list_tables(self):
'''List the tables available in the datalogger.'''
return [item['Header']['TableName'] for item in self.table_def]
def _collect_data(self, tablename, start_date=None, stop_date=None):
'''Collect fragment data from `tablename` from `start_date` to
`stop_date` as ListDict.'''
LOGGER.info('Send collect_data cmd')
if start_date is not None:
mode = 0x07 # collect from p1 to p2 (nsec)
p1 = time_to_nsec(start_date)
p2 = time_to_nsec(stop_date or datetime.now())
else:
mode = 0x03 # collect all
p1 = 0
p2 = 0
tabledef = self.table_def
# Get table number
tablenbr = None
if is_py3:
tablename = bytes(tablename, encoding="utf-8")
for i, item in enumerate(tabledef):
if item['Header']['TableName'] == tablename:
tablenbr = i + 1
break
if tablenbr is None:
raise StandardError('table %s not found' % tablename)
# Get table definition signature
tabledefsig = tabledef[tablenbr - 1]['Signature']
# Send collect data request
cmd = self.pakbus.get_collectdata_cmd(tablenbr, tabledefsig, mode,
p1, p2)
hdr, msg, send_time = self.send_wait(cmd)
more = True
data, more = self.pakbus.parse_collectdata(msg['RecData'], tabledef)
# Return parsed record data and flag if more records exist
return data, more
[docs] def get_data(self, tablename, start_date=None, stop_date=None):
'''Get all data from `tablename` from `start_date` to `stop_date` as
ListDict. By default the entire contents of the data will be
downloaded.
:param tablename: Table name that contains the data.
:param start_date: The beginning datetime record.
:param stop_date: The stopping datetime record.'''
records = ListDict()
for items in self.get_data_generator(tablename, start_date, stop_date):
records.extend(items)
return records
[docs] def get_data_generator(self, tablename, start_date=None, stop_date=None):
'''Get all data from `tablename` from `start_date` to `stop_date` as
generator. The data can be fragmented into multiple packets, this
generator can return parsed data from each packet before receiving
the next one.
:param tablename: Table name that contains the data.
:param start_date: The beginning datetime record.
:param stop_date: The stopping datetime record.
'''
self.ping_node()
start_date = start_date or datetime(1990, 1, 1, 0, 0, 1)
stop_date = stop_date or datetime.now()
more = True
while more:
records = ListDict()
data, more = self._collect_data(tablename, start_date, stop_date)
for i, rec in enumerate(data):
if not rec["NbrOfRecs"]:
more = False
break
for j, item in enumerate(rec['RecFrag']):
if start_date <= item['TimeOfRec'] <= stop_date:
start_date = item['TimeOfRec']
# for no duplicate record
if more and ((j == (len(rec['RecFrag']) - 1))
and (i == (len(data) - 1))):
break
new_rec = Dict()
new_rec["Datetime"] = item['TimeOfRec']
new_rec["RecNbr"] = item['RecNbr']
for key in item['Fields']:
new_rec["%s" % key] = item['Fields'][key]
records.append(new_rec)
if records:
records = records.sorted_by('Datetime')
yield records.sorted_by('Datetime')
else:
more = False
def get_raw_packets(self, tablename):
'''Get all raw packets from table `tablename`.
:param tablename: Table name that contains the data.
'''
self.ping_node()
more = True
records = ListDict()
while more:
packets, more = self._collect_data(tablename)
for rec in packets:
records.append(rec)
return records
[docs] def getprogstat(self):
'''Get programming statistics as dict.'''
LOGGER.info('Try get programming statistics')
self.ping_node()
hdr, msg, send_time = self.send_wait(self.pakbus.get_getprogstat_cmd())
# remove transmission time
data = Dict(dict(msg['Stats']))
if data:
data['CompTime'] = nsec_to_time(data['CompTime'])
return data
[docs] def bye(self):
'''Send a bye command.'''
LOGGER.info("Send bye command")
if self.connected:
packet, transac_id = self.pakbus.get_bye_cmd()
self.pakbus.write(packet)
self.connected = False
def __del__(self):
'''Send bye cmd when object is deleted.'''
self.bye()