Source code for mgtdramshot
#!/usr/bin/env python
""" mgtdramshot.py Capture to MGTDRAM
- optional capture to mgtdram
- manage upload
- optional validation
assumes that clocking has been pre-assigned.
example usage::
./mgtdramshot.py --loop=100 --simulate=1 --validate=validate-6x480 \
--captureblocks=2000 --offloadblocks=0-1999 acq2106_007
usage::
mgtdramshot.py [-h] [--pre PRE] [--post POST] [--clk CLK] [--trg TRG]
[--sim SIM] [--trace TRACE] [--loop LOOP]
[--captureblocks CAPTUREBLOCKS]
[--offloadblocks OFFLOADBLOCKS] [--validate VALIDATE]
[--wait_user WAIT_USER]
uut
acq2106 mgtdram test
positional arguments:
uut uut
optional arguments:
-h, --help show this help message and exit
--pre PRE pre-trigger samples
--post POST post-trigger samples
--clk CLK int|ext|zclk|xclk,fpclk,SR,[FIN]
--trg TRG int|ext,rising|falling
--sim SIM s1[,s2,s3..] list of sites to run in simulate mode
--trace TRACE 1 : enable command tracing
--loop LOOP loop count
--captureblocks CAPTUREBLOCKS
number of 4MB blocks to capture
--offloadblocks OFFLOADBLOCKS
block list to upload nnn-nnn
--validate VALIDATE program to validate data
--wait_user WAIT_USER
1: force user input each shot
"""
import sys
import datetime
import acq400_hapi
from acq400_hapi import awg_data
import argparse
from subprocess import call
import re
from future import builtins
from builtins import input
import os
LOG = None
[docs]def write_console(message):
# explicit flush needed to avoid lockup on Windows.
sys.stdout.write(message)
sys.stdout.flush()
[docs]class UploadFilter:
def __init__(self):
self.okregex = re.compile(r"axi0 start OK ([0-9]{4}) OK")
self.line = 0
def __call__ (self, st):
st = st.rstrip()
LOG.write("{}\n".format(st))
if self.okregex.search(st) != None:
if self.line%10 != 0:
write_console('.')
else:
write_console("{}".format(self.line/10))
self.line += 1
if self.line > 100:
write_console('\n')
self.line = 0
else:
if self.line != 0:
write_console('\n')
write_console(">{}\n".format(st))
self.line = 0
[docs]def run_shot(uut, args):
# always capture over. The offload is zero based anyway, so add another one
if args.captureblocks:
uut.s14.mgt_run_shot = str(int(args.captureblocks) + 2)
uut.run_mgt()
uut.s14.mgt_offload = args.offloadblocks if args.offloadblocks != 'capture' \
else '0-{}'.format(args.captureblocks)
t1 = datetime.datetime.now()
uut.run_mgt(UploadFilter())
ttime = datetime.datetime.now()-t1
mb = args.captureblocks*4
print("upload {} MB done in {} seconds, {} MB/s\n".\
format(mb, ttime, mb/ttime.seconds))
if args.validate != 'no':
cmd = "{} {}".format(args.validate, uut.uut)
print("run \"{}\"".format(cmd))
rc = call(cmd, shell=True, stdin=0, stdout=1, stderr=2)
if rc != 0:
print("ERROR called process {} returned {}".format(args.validate, rc))
exit(1)
[docs]def run_shots(args):
global LOG
LOG = open("mgtdramshot-{}.log".format(args.uut[0]), "w")
uut = acq400_hapi.Acq2106(args.uut[0], has_mgtdram=True)
acq400_hapi.Acq400UI.exec_args(uut, args)
uut.s14.mgt_taskset = '1'
try:
for ii in range(0, args.loop):
t1 = datetime.datetime.now()
print("shot: {} {}".format(ii, t1.strftime("%Y%m%d %H:%M:%S")))
run_shot(uut, args)
t2 = datetime.datetime.now()
print("shot: {} done in {} seconds\n\n".format(ii, (t2-t1).seconds))
if args.wait_user:
input("hit return to continue")
except KeyboardInterrupt:
print("Keyboard Interrupt, take it all down NOW")
os._exit(1)
os._exit(0)
[docs]def run_main():
parser = argparse.ArgumentParser(description='acq2106 mgtdram test')
acq400_hapi.Acq400UI.add_args(parser)
parser.add_argument('--loop', type=int, default=1, help="loop count")
parser.add_argument('--captureblocks', type=int, default="2000", help='number of 4MB blocks to capture')
parser.add_argument('--offloadblocks', type=str, default="capture", help='block list to upload nnn-nnn')
parser.add_argument('--validate', type=str, default='no', help='program to validate data')
parser.add_argument('--wait_user', type=int, default=0, help='1: force user input each shot')
parser.add_argument('uut', nargs=1, help="uut ")
run_shots(parser.parse_args())
# execution starts here
if __name__ == '__main__':
run_main()