Source code for acq400_upload

#!/usr/bin/env python

"""
capture upload test
acq1001_capplot UUT1 [UUT2 ..]
where UUT1 is the ip-address or host name of first uut
example test client runs captures in a loop on one or more uuts

pre-requisite: UUT's are configured and ready to make a transient
capture
eg clk is running. soft trg enabled
eg transient length set.

runs one capture, uploads the data and plots with matplotlib
tested with 2 x 8 channels UUT's (ACQ1014)
matplot will get very congested with more channels.
this is really meant as a demonstration of capture, load to numpy,
it's not really intended as a scope UI.

usage: acq400_upload.py [-h] [--soft_trigger SOFT_TRIGGER]
                    [--trace_upload TRACE_UPLOAD] [--save_data SAVE_DATA]
                    [--plot_data PLOT_DATA] [--capture CAPTURE]
                    [--remote_trigger REMOTE_TRIGGER]
                    [--channels CHANNELS]
                    uuts [uuts ...]

acq400 upload

positional arguments:
  uuts                  uut[s]

optional arguments:
  -h, --help            show this help message and exit
  --soft_trigger SOFT_TRIGGER
                        help use soft trigger on capture
  --trace_upload TRACE_UPLOAD
                        1: verbose upload
  --save_data SAVE_DATA
                        store data to specified directory
  --plot_data PLOT_DATA
                        1: plot data
  --capture CAPTURE     1: capture data, 0: wait for someone else to capture,
                        -1: just upload
  --remote_trigger REMOTE_TRIGGER
                        your function to fire trigger
  --channels CHANNELS   comma separated channel list
"""

import sys
import acq400_hapi
import numpy as np
try:
    import matplotlib.pyplot as plt
    plot_ok = 1
except RuntimeError as e:
    print("Sorry, plotting not available {}".format(e))
    plot_ok = 0

import os
import argparse
import re
import time

from subprocess import call

[docs]class ActionScript: def __init__(self, script_and_args): self.sas = script_and_args.split() print("ActionScript creates {}".format(self.sas)) def __call__(self): print("ActionScript: call()") call(self.sas)
[docs]def upload(args): uuts = [acq400_hapi.Acq400(u) for u in args.uuts] acq400_hapi.cleanup.init() shot_controller = acq400_hapi.ShotController(uuts) if args.remote_trigger: trigger_action = ActionScript(args.remote_trigger) st = None else: trigger_action = None st = SOFT_TRIGGER try: if args.capture > 0: shot_controller.run_shot(soft_trigger = st, remote_trigger = trigger_action) elif args.capture == 0: state = '99' while state != '0': state = uuts[0].s0.state.split()[0] print("state:{}".format(state)) if state == '1': if trigger_action: trigger_action() elif st: uut.s0.soft_trigger = '1' time.sleep(1) if args.save_data: for u in uuts: u.save_data = args.save_data if args.trace_upload: for u in uuts: u.trace = 1 chx, ncol, nchan, nsam = shot_controller.read_channels(eval(args.channels)) # plot ex: 2 x 8 ncol=2 nchan=8 # U1 U2 FIG # 11 21 1 2 # 12 22 3 4 # 13 23 # ... # 18 28 15 16 if plot_ok and args.plot_data: for col in range(ncol): for chn in range(0,nchan): fignum = 1 + col + chn*ncol plt.subplot(nchan, ncol, fignum) plt.plot(chx[col][chn]) plt.show() except acq400_hapi.cleanup.ExitCommand: print("ExitCommand raised and caught") finally: print("Finally, going down")
SOFT_TRIGGER=int(os.getenv("SOFT_TRIGGER", "1")) TRACE_UPLOAD=int(os.getenv("TRACE_UPLOAD", "0")) SAVEDATA=os.getenv("SAVEDATA", None) PLOTDATA=int(os.getenv("PLOTDATA", "0")) CAPTURE=int(os.getenv("CAPTURE", "0")) CHANNELS=os.getenv("CHANNELS", "()")
[docs]def uniq(inp): out = [] for x in inp: if x not in out: out.append(x) return out
[docs]def run_main(): parser = argparse.ArgumentParser(description='acq400 upload') parser.add_argument('--soft_trigger', default=SOFT_TRIGGER, type=int, help="help use soft trigger on capture") parser.add_argument('--trace_upload', default=TRACE_UPLOAD, type=int, help="1: verbose upload") parser.add_argument('--save_data', default=SAVEDATA, type=str, help="store data to specified directory") parser.add_argument('--plot_data', default=PLOTDATA, type=int, help="1: plot data") parser.add_argument('--capture', default=CAPTURE, type=int, help="1: capture data, 0: wait for someone else to capture, -1: just upload") parser.add_argument('--remote_trigger', default=None, type=str, help="your function to fire trigger") parser.add_argument('--channels', default=CHANNELS, type=str, help="comma separated channel list") parser.add_argument('uuts', nargs = '+', help="uut[s]") args = parser.parse_args() # deduplicate (yes, some non-optimal apps call with duplicated uuts, wastes time) args.uuts = uniq(args.uuts) # encourage single ints to become a list if re.search(r'^\d$', args.channels) is not None: args.channels += ',' upload(args)
# execution starts here if __name__ == '__main__': run_main()