From cbc88ac621cd7d416857660857c8884589268234 Mon Sep 17 00:00:00 2001 From: David Schleef Date: Wed, 4 Jun 2003 02:53:45 +0000 Subject: [PATCH] Add python examples from Luc Lefebvre --- demo/python/README | 27 ++++++ demo/python/cmd.py | 148 ++++++++++++++++++++++++++++++++ demo/python/info.py | 161 +++++++++++++++++++++++++++++++++++ demo/python/mmap.py | 199 ++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 535 insertions(+) create mode 100644 demo/python/README create mode 100755 demo/python/cmd.py create mode 100755 demo/python/info.py create mode 100755 demo/python/mmap.py diff --git a/demo/python/README b/demo/python/README new file mode 100644 index 0000000..6d45f98 --- /dev/null +++ b/demo/python/README @@ -0,0 +1,27 @@ +I wrote the python scripts using Bryan Cole's comedi wrappers. I +followed the instructions provided by him to generate the comedi +python module. + +I then produced the three following scripts: + +info.py: + + This script emulates the comedilib/demo/info program + +cmd.py: + + This script emulates the comedilib/demo/cmd program + +mmap.py: + + This script emulates the comedilib/demo/mmap program with the + added twist that the output is saved to a binary log file. + This file can be accessed using a variety of means including a + sample octave script which is provided. + +As usual, comments are always welcome. + +Luc Lefebvre +McGill University + +29 May, 2003 diff --git a/demo/python/cmd.py b/demo/python/cmd.py new file mode 100755 index 0000000..739311b --- /dev/null +++ b/demo/python/cmd.py @@ -0,0 +1,148 @@ +## It emulates the program "cmd" which is distributed with +## the comedilib software +## Copyright (C) May 2003 Luc Lefebvre +## luc.lefebvre@mcgill.ca +## This program is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License +## as published by the Free Software Foundation; either version 2 +## of the License, or (at your option) any later version. + +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + + +#set the paths so python can find the comedi module +import sys, os, string, struct, time +sys.path.append('./build/lib.linux-i586-2.1') + +import comedi as c + +#open a comedi device +dev=c.comedi_open('/dev/comedi0') +if not dev: raise "Error openning Comedi device" + +#get a file-descriptor for use later +fd = c.comedi_fileno(dev) +if fd<=0: raise "Error obtaining Comedi device file descriptor" + +BUFSZ = 10000 +freq=1000 # as defined in demo/common.c +subdevice=0 #as defined in demo/common.c +nscans=8000 #specify total number of scans + +#three lists containing the chans, gains and referencing +#the lists must all have the same length +chans=[0,1,2,3] +gains=[0,0,0,0] +aref =[c.AREF_GROUND, c.AREF_GROUND, c.AREF_GROUND, c.AREF_GROUND] + +cmdtest_messages = [ + "success", + "invalid source", + "source conflict", + "invalid argument", + "argument conflict", + "invalid chanlist"] + +nchans = len(chans) #number of channels + +#wrappers include a "chanlist" object (just an Unsigned Int array) for holding the chanlist information +mylist = c.chanlist(nchans) #create a chanlist of length nchans + +#now pack the channel, gain and reference information into the chanlist object +#N.B. the CR_PACK and other comedi macros are now python functions +for index in range(nchans): + mylist[index]=c.cr_pack(chans[index], gains[index], aref[index]) + +def dump_cmd(cmd): + print "---------------------------" + print "command structure contains:" + print "cmd.subdev : ", cmd.subdev + print "cmd.flags : ", cmd.flags + print "cmd.start :\t", cmd.start_src, "\t", cmd.start_arg + print "cmd.scan_beg :\t", cmd.scan_begin_src, "\t", cmd.scan_begin_arg + print "cmd.convert :\t", cmd.convert_src, "\t", cmd.convert_arg + print "cmd.scan_end :\t", cmd.scan_end_src, "\t", cmd.scan_end_arg + print "cmd.stop :\t", cmd.stop_src, "\t", cmd.stop_arg + print "cmd.chanlist : ", cmd.chanlist + print "cmd.chanlist_len : ", cmd.chanlist_len + print "cmd.data : ", cmd.data + print "cmd.data_len : ", cmd.data_len + print "---------------------------" + +## ret = c.comedi_get_buffer_size(dev, subdevice) +## if ret==-1: +## raise "Error fetching comedi buffer size" +## else: +## print "buffer size = ", ret +## ret = c.comedi_get_max_buffer_size(dev, subdevice) +## if ret==-1: +## raise "Error fetching comedi max buff size" +## else: +## print "max buff size = ", ret +#construct a comedi command +cmd = c.comedi_cmd_struct() + +ret = c.comedi_get_cmd_generic_timed(dev,subdevice,cmd,1.0e9/freq) +if ret: raise "Error comedi_get_cmd_generic failed" + +cmd.chanlist = mylist # adjust for our particular context +cmd.chanlist_len = nchans +cmd.scan_end_arg = nchans +if cmd.stop_src==c.TRIG_COUNT: cmd.stop_arg=nscans + +print "command before testing" +dump_cmd(cmd) + +#test our comedi command a few times. +ret = c.comedi_command_test(dev,cmd) +print "first cmd test returns ", ret, cmdtest_messages[ret] +if ret<0: raise "comedi_command_test failed" +dump_cmd(cmd) +ret = c.comedi_command_test(dev,cmd) +print "second test returns ", ret, cmdtest_messages[ret] +if ret<0: raise "comedi_command_test failed" +if ret !=0: + dump_cmd(cmd) + raise "Error preparing command" + +#execute the command! +## ret = c.comedi_command(dev,cmd) +## if ret !=0: raise "comedi_command failed..." + +datastr = () +t0 = time.time() +ret = c.comedi_command(dev,cmd) +if ret !=0: raise "comedi_command failed..." +while (1): + #ret = c.comedi_poll(dev,subdevice) + #print "poll ret = ", ret + data = os.read(fd,BUFSZ) + #print "len(data) = ", len(data) + if len(data)==0: + break + n = len(data)/2 # 2 bytes per 'H' + format = `n`+'H' + #print "format = ", format + #bytes = struct.calcsize(format) + #print "bytes = ", bytes + #nbytes = c.comedi_get_buffer_contents(dev,subdevice) + #print "n = ", n, " nbytes = ", nbytes + datastr = datastr + struct.unpack(format,data) + +t1 = time.time() +print "start time : ", t0 +print "end time : ", t1 +print "time : ", t1 - t0, " seconds" + +count = 0 +while count < len(datastr): + for i in range(4): + print "%d\t" % datastr[count+i], + print "\n" + count = count + 4 + +ret = c.comedi_close(dev) +if ret !=0: raise "comedi_close failed..." diff --git a/demo/python/info.py b/demo/python/info.py new file mode 100755 index 0000000..fa26c06 --- /dev/null +++ b/demo/python/info.py @@ -0,0 +1,161 @@ +#!/usr/bin/env python +## an adaptation of the info.c program bundled with the comedilib package +## the info.c file is found in the package demo directory +## Copyright (C) May 2003 Luc Lefebvre +## luc.lefebvre@mcgill.ca +## This program is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License +## as published by the Free Software Foundation; either version 2 +## of the License, or (at your option) any later version. + +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + +#set the paths so python can find the comedi module +import sys, os, string +sys.path.append('./build/lib.linux-i586-2.1') + +import comedi as c + +subdevice_types = {c.COMEDI_SUBD_UNUSED:"unused", + c.COMEDI_SUBD_AI:"analog input", + c.COMEDI_SUBD_AO:"analog output", + c.COMEDI_SUBD_DI:"digital input", + c.COMEDI_SUBD_DO:"digital output", + c.COMEDI_SUBD_DIO:"digital I/O", + c.COMEDI_SUBD_COUNTER:"counter", + c.COMEDI_SUBD_TIMER:"timer", + c.COMEDI_SUBD_MEMORY:"memory", + c.COMEDI_SUBD_CALIB:"calibration", + c.COMEDI_SUBD_PROC:"processor"} +#open a comedi device +dev=c.comedi_open('/dev/comedi0') +if not dev: raise "Error openning Comedi device" + +version_code = c.comedi_get_version_code(dev) +if not version_code: raise "Error reading version_code" +print "version code is: ", version_code + +driver_name = c.comedi_get_driver_name(dev) +if not driver_name: raise "Error reading driver_name" +print "driver name is: ", driver_name + +board_name = c.comedi_get_board_name(dev) +if not board_name: raise "Error reading board_name" +print "board name is: ", board_name + +n_subdevices = c.comedi_get_n_subdevices(dev) +if not n_subdevices: raise "Error reading n_subdevices" +print "number of subdevices is: ", n_subdevices + +def comedi_get_cmd_fast_1chan(dev,s,cmd): + ret = c.comedi_get_cmd_src_mask(dev,s,cmd) + if (ret<0): return ret; + cmd.chanlist_len = 1 + cmd.scan_end_src = c.TRIG_COUNT + cmd.scan_end_arg = 1 + if (cmd.convert_src & c.TRIG_TIMER): + if (cmd.scan_begin_src & c.TRIG_FOLLOW): + cmd.convert_src=c.TRIG_TIMER + cmd.scan_begin=c.TRIG_FOLLOW + else: + cmd.convert_src=c.TRIG_TIMER + cmd.scan_begin=c.TRIG_TIMER + else: + print "can't do timed!?!" + return -1 + if (cmd.stop_src & c.TRIG_COUNT): + cmd.stop_src=c.TRIG_COUNT + cmd.stop_arg=2 + elif (cmd.stop_src & c.TRIG_NONE): + cmd.stop_src=c.TRING_NONE + cmd.stop_arg=0 + else: + print "can't find a good stop_src" + return -1 + ret = c.comedi_command_test(dev,cmd) + if (ret==3): + ret = c.comedi_command_test(dev,cmd) + if ((ret==4) or (ret==0)): + return 0 + return -1 + +def probe_max_1chan(dev,s): + buf="" + cmd=c.comedi_cmd_struct() + print "\tcommand fast 1chan:" + if(c.comedi_get_cmd_generic_timed(dev,s,cmd,1)<0): + print "\t\tnot supported" + else: + print "\tstart: %s %d" % (cmd_src(cmd.start_src,buf),cmd.start_arg) + print "\tscan_begin: %s %d" % (cmd_src(cmd.scan_begin_src,buf),cmd.scan_begin_arg) + print "\tconvert begin: %s %d" % (cmd_src(cmd.convert_src,buf),cmd.convert_arg) + print "\tscan_end: %s %d" % (cmd_src(cmd.scan_end_src,buf),cmd.scan_end_arg) + print "\tstop: %s %d" % (cmd_src(cmd.stop_src,buf),cmd.stop_arg) + +def cmd_src(src,buf): + buf="" + if(src & c.TRIG_NONE): buf=buf+"none|" + if(src & c.TRIG_NOW): buf=buf+"now|" + if(src & c.TRIG_FOLLOW): buf=buf+"follow|" + if(src & c.TRIG_TIME): buf=buf+"time|" + if(src & c.TRIG_TIMER): buf=buf+"timer|" + if(src & c.TRIG_COUNT): buf=buf+"count|" + if(src & c.TRIG_EXT): buf=buf+"ext|" + if(src & c.TRIG_INT): buf=buf+"int|" + if len(buf)==0: + print "unknown" + else: + buf = buf[:-1] # trim trailing "|" + return buf + +def get_command_stuff(dev,s): + buf = "" + cmd = c.comedi_cmd_struct() + if (c.comedi_get_cmd_src_mask(dev,s,cmd)<0): + print "\tnot supported" + else: + print "\tstart: %s" % (cmd_src(cmd.start_src,buf)) + print "\tscan_begin: %s" % (cmd_src(cmd.scan_begin_src,buf)) + print "\tconvert begin: %s" % (cmd_src(cmd.convert_src,buf)) + print "\tscan_end: %s" % (cmd_src(cmd.scan_end_src,buf)) + print "\tstop: %s" % (cmd_src(cmd.stop_src,buf)) + probe_max_1chan(dev,s) + +print "-----subdevice characteristics-----" +for i in range(n_subdevices): + print "subdevice %d:" % (i) + type = c.comedi_get_subdevice_type(dev,i) + print "\ttype: %d (%s)" % (type,subdevice_types[type]) + if (type == c.COMEDI_SUBD_UNUSED): + continue + n_chans = c.comedi_get_n_channels(dev,i) + print "\tnumber of channels: %d" % ( n_chans) + if not(c.comedi_maxdata_is_chan_specific(dev,i)): + print "\tmax data value: %d" % (c.comedi_get_maxdata(dev,i,0)) + else: + print "max data value is channel specific" + for j in range(n_chans): + print "\tchan: %d: %d" % (j,c.comedi_get_maxdata(dev,i,j)) + print "\tranges: " + if not(c.comedi_range_is_chan_specific(dev,i)): + n_ranges = c.comedi_get_n_ranges(dev,i,0) + print "\t\tall chans:" + for j in range(n_ranges): + rng = c.comedi_get_range(dev,i,0,j) + print "\t\t[%g,%g]" % (rng.min, rng.max) + else: + for chan in range(n_chans): + n_ranges = c.comedi_get_n_ranges(dev,i,chan) + print "\t\tchan: %d" % (chan) + for j in range(n_ranges): + rng = c.comedi_get_range(dev,i,chan,j) + print "\t\t[%g,%g]" % (rng.min, rng.max) + print "\tcommand:" + get_command_stuff(dev,i) + + + + diff --git a/demo/python/mmap.py b/demo/python/mmap.py new file mode 100755 index 0000000..00c9623 --- /dev/null +++ b/demo/python/mmap.py @@ -0,0 +1,199 @@ +## A test-application to demonstrate using the Comedilib API +## and streaming acquisition in particular, from Python. + +## It emulates the program "mmap" which is distributed with +## the comedilib software +## Copyright (C) May 2003 Luc Lefebvre +## luc.lefebvre@mcgill.ca +## This program is free software; you can redistribute it and/or +## modify it under the terms of the GNU General Public License +## as published by the Free Software Foundation; either version 2 +## of the License, or (at your option) any later version. + +## This program is distributed in the hope that it will be useful, +## but WITHOUT ANY WARRANTY; without even the implied warranty of +## MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +## GNU General Public License for more details. + + +#set the paths so python can find the comedi module +import sys, os, string, struct, time, mmap, array +sys.path.append('./build/lib.linux-i586-2.1') + +import comedi as c + +#open a comedi device +dev=c.comedi_open('/dev/comedi0') +if not dev: raise "Error openning Comedi device" + +#get a file-descriptor for use later +fd = c.comedi_fileno(dev) +if fd<=0: raise "Error obtaining Comedi device file descriptor" + +#BUFSZ = 10000 +freq=1000 # as defined in demo/common.c +subdevice=0 #as defined in demo/common.c +nscans=8000 #specify total number of scans +secs = 10 # used to stop scan after "secs" seconds + +#three lists containing the chans, gains and referencing +#the lists must all have the same length +chans=[0,1,2,3] +gains=[0,0,0,0] +aref =[c.AREF_GROUND, c.AREF_GROUND, c.AREF_GROUND, c.AREF_GROUND] + +cmdtest_messages = [ + "success", + "invalid source", + "source conflict", + "invalid argument", + "argument conflict", + "invalid chanlist"] + +nchans = len(chans) #number of channels + +#wrappers include a "chanlist" object (just an Unsigned Int array) for holding the chanlist information +mylist = c.chanlist(nchans) #create a chanlist of length nchans + +#now pack the channel, gain and reference information into the chanlist object +#N.B. the CR_PACK and other comedi macros are now python functions +for index in range(nchans): + mylist[index]=c.cr_pack(chans[index], gains[index], aref[index]) + +size = c.comedi_get_buffer_size(dev, subdevice) +print "buffer size is ", size +map = mmap.mmap(fd, size, mmap.MAP_SHARED, mmap.PROT_READ) +print "map = ", map + +def dump_cmd(cmd): + print "---------------------------" + print "command structure contains:" + print "cmd.subdev : ", cmd.subdev + print "cmd.flags : ", cmd.flags + print "cmd.start :\t", cmd.start_src, "\t", cmd.start_arg + print "cmd.scan_beg :\t", cmd.scan_begin_src, "\t", cmd.scan_begin_arg + print "cmd.convert :\t", cmd.convert_src, "\t", cmd.convert_arg + print "cmd.scan_end :\t", cmd.scan_end_src, "\t", cmd.scan_end_arg + print "cmd.stop :\t", cmd.stop_src, "\t", cmd.stop_arg + print "cmd.chanlist : ", cmd.chanlist + print "cmd.chanlist_len : ", cmd.chanlist_len + print "cmd.data : ", cmd.data + print "cmd.data_len : ", cmd.data_len + print "---------------------------" + +def prepare_cmd(dev, subdev, C): + #global cmd + + C.subdev = subdev + C.flags = 0 + C.start_src = c.TRIG_NOW + C.start_arg = 0 + C.scan_begin_src = c.TRIG_TIMER + C.scan_begin_arg = 1e9/freq + C.convert_src = c.TRIG_TIMER + C.convert_arg = 1 + C.scan_end_src = c.TRIG_COUNT + C.scan_end_arg = nchans + C.stop_src = c.TRIG_NONE + #C.stop_src = c.TRIG_COUNT + C.stop_arg = 0 + #C.stop_arg = 1000 + C.chanlist = mylist + C.chanlist_len = nchans + +## ret = c.comedi_get_buffer_size(dev, subdevice) +## if ret==-1: +## raise "Error fetching comedi buffer size" +## else: +## print "buffer size = ", ret +## ret = c.comedi_get_max_buffer_size(dev, subdevice) +## if ret==-1: +## raise "Error fetching comedi max buff size" +## else: +## print "max buff size = ", ret +#construct a comedi command +cmd = c.comedi_cmd_struct() + + + +cmd.chanlist = mylist # adjust for our particular context +cmd.chanlist_len = nchans +cmd.scan_end_arg = nchans + +prepare_cmd(dev,subdevice,cmd) + +print "command before testing" +dump_cmd(cmd) + +#test our comedi command a few times. +ret = c.comedi_command_test(dev,cmd) +print "first cmd test returns ", ret, cmdtest_messages[ret] +if ret<0: + raise "comedi_command_test failed" +dump_cmd(cmd) + +ret = c.comedi_command_test(dev,cmd) +print "second test returns ", ret, cmdtest_messages[ret] +if ret<0: + raise "comedi_command_test failed" +if ret !=0: + dump_cmd(cmd) + raise "ERROR preparing command" +dump_cmd(cmd) + +ret = c.comedi_command(dev,cmd) +if ret<0: + raise "error executing comedi_command" + +front = 0 +back = 0 + +of = open("stream_log.bin","wb") + +format = "H" + +flag = 1 + +time_limit = nchans*freq*2*secs # stop scan after "secs" seconds +t0 = time.time() + +while flag: + front += c.comedi_get_buffer_contents(dev,subdevice) +## print "front = ", front + if front > time_limit: + flag = 0 + t1 = time.time() # reached "secs" seconds + if (front