Source code for opendaq.experiment

#!/usr/bin/env python

# Copyright 2015
# Ingen10 Ingenieria SL
#
# This file is part of opendaq.
#
# opendaq is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# opendaq is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with opendaq.  If not, see <http://www.gnu.org/licenses/>.

from enum import IntEnum
from collections import deque
from threading import Lock


[docs]class ExpMode(IntEnum): """Valid experiment modes.""" ANALOG_IN = 0 ANALOG_OUT = 1 DIGITAL_IN = 2 DIGITAL_OUT = 3 COUNTER_IN = 4 CAPTURE_IN = 5
[docs]class Trigger(IntEnum): """Valid trigger modes.""" SW = 0 DIN1 = 1 DIN2 = 2 DIN3 = 3 DIN4 = 4 DIN5 = 5 DIN6 = 6 ABIG = 10 ASML = 20
[docs]class DAQExperiment(object):
[docs] def analog_setup(self, pinput=1, ninput=0, gain=1, nsamples=20): """Configure a channel for a generic stream experiment. """ if not 0 <= nsamples < 255: raise ValueError("samples number out of range") self.pinput = pinput self.ninput = ninput self.gain = gain self.nsamples = nsamples self.signal_data = [] self.signal_offs = 0
[docs] def trigger_setup(self, mode=Trigger.SW, value=0): """Channge the trigger mode of datachannel. :param mode: Trigger mode (use :class:`.Trigger`). :param value: Value of the trigger mode. :raises: ValueError """ if not type(mode) is Trigger: raise ValueError("Invalid trigger mode") if 1 <= mode <= 6 and value not in [0, 1]: raise ValueError("Invalid value of digital trigger") self.trg_mode = mode self.trg_value = value
[docs] def get_params(self): """Return gain, pinput and ninput.""" return self.gain, self.pinput, self.ninput
[docs] def get_mode(self): """Return mode.""" return self.mode
[docs] def get_preload_data(self): """Return preload_data and preload_offset. """ return self.signal_data, self.signal_offs
[docs] def load_signal(self, data, offset=0): if not 1 <= len(data) <= 400: raise ValueError('Invalid data length') self.signal_data = data self.signal_offs = offset
[docs] def add_points(self, points): """Write a single point into the ring buffer.""" self.mutex_ring_buffer.acquire() self.ring_buffer.extend(points) self.mutex_ring_buffer.release()
[docs] def read(self): """Return all available points from the ring buffer.""" self.mutex_ring_buffer.acquire() ret = list(self.ring_buffer) self.ring_buffer.clear() self.mutex_ring_buffer.release() return ret
[docs]class DAQStream(DAQExperiment): """ Stream experiment. :param mode: Define data source or destination (use :class:`.ExpMode`). :param period: Period of the stream experiment (milliseconds) [1:65536] :param npoints: Total number of points for the experiment [0:65536] (0 indicates continuous acquisition). :param continuous: Indicates if experiment is continuous (True) or one-shot (False). :param buffersize: Buffer size. :raises: LengthError (too many experiments at the same time), ValueError (values out of range) """ def __init__(self, mode, number, period, npoints=10, continuous=False, buffersize=1000): if not 1 <= number <= 4: raise ValueError('Invalid number') if mode == 1 and number != 4: raise ValueError('Analog output must use DataChannel 4') if not 1 <= period <= 65535: raise ValueError('Invalid period') if type(mode) == int and not 0 <= mode <= 5: raise ValueError('Invalid mode') if not 0 <= npoints < 65536: raise ValueError('npoints out of range') if not 1 <= buffersize <= 20000: raise ValueError('Invalid buffer size') self.number = number self.period = period self.mode = mode self.npoints = npoints self.continuous = continuous self.ring_buffer = deque(maxlen=buffersize) self.mutex_ring_buffer = Lock() self.analog_setup() self.trigger_setup()
[docs]class DAQExternal(DAQExperiment): """External experiment. :param mode: Define data source or destination (use :class:`.ExpMode`). :param clock_input: Digital input used as external clock :param edge: New data on rising (1) or falling (0) edges [0:1] :param npoints: Total number of points for the experiment [0:65536] :param continuous: Indicates if the experiment is continuous (False: run once, True: continuous). :param buffersize: Buffer size :raises: LengthError (too many experiments at the same time, ValueError (values out of range) """ def __init__(self, mode, clock_input, edge=1, npoints=10, continuous=False, buffersize=1000): if not 1 <= clock_input <= 4: raise ValueError('Invalid clock_input') if edge not in [0, 1]: raise ValueError('Invalid edge') if type(mode) == int and not 0 <= mode <= 5: raise ValueError('Invalid mode') if not 0 <= npoints < 65536: raise ValueError('npoints out of range') if not 1 <= buffersize <= 20000: raise ValueError('Invalid buffer size') if mode == 1 and clock_input != 4: raise ValueError('Analog output must use DataChannel 4') self.number = clock_input self.edge = edge self.mode = mode self.npoints = npoints self.continuous = continuous self.ring_buffer = deque(maxlen=buffersize) self.mutex_ring_buffer = Lock() self.analog_setup() self.trigger_setup()
[docs]class DAQBurst(DAQExperiment): """Burst experiment. :param mode: Define data source or destination (use :class:`.ExpMode`). :param period: Period of the stream experiment (microseconds) [1:65536] :param npoints: Total number of points for the experiment [0:65536] :param continuous: Indicates if the experiment is continuous (False: run once, True: continuous). :param buffersize: Buffer size :raises: LengthError (too many experiments at the same time), ValueError (values out of range) """ def __init__(self, mode, period, npoints=10, continuous=False, buffersize=4000): if not 100 <= period <= 65535: raise ValueError('Invalid period') if not 0 <= npoints < 65536: raise ValueError('npoints out of range') if type(mode) == int and not 0 <= mode <= 1: raise ValueError('Invalid mode') self.number = 1 self.period = period self.npoints = npoints self.continuous = continuous self.mode = mode self.ring_buffer = deque(maxlen=buffersize) self.mutex_ring_buffer = Lock() self.analog_setup() self.trigger_setup()