Source code for obci_readmanager.signal_processing.buffers.ring_buffer_base

# -*- coding: utf-8 -*-
# Copyright (c) 2016-2018 Braintech Sp. z o.o. [Ltd.] <http://www.braintech.pl>
# All rights reserved.

"""Module provides single class representing base buffer."""
import copy


[docs]class RingBufferBase(object): """Class representing base buffer.""" def __init__(self, size, number_of_channels, copy_on_ret): """Initialize buffer.""" self.size = int(size) self.number_of_channels = int(number_of_channels) self.copy_on_ret = bool(copy_on_ret) self.clear()
[docs] def clear(self): """Clear buffer.""" self.is_full = False self.index = 0 self._init_buffer()
[docs] def add(self, s): """Add sample to buffer.""" self._add(s) if not self.is_full and self.index == self.size - 1: self.is_full = True self.index = (self.index + 1) % self.size
[docs] def get(self, start, length): """Get samples from buffer (start, start + length).""" if not self.is_full: d = self._get_normal(start, start + length) else: if self.index + start + length <= self.size: d = self._get_normal(self.index + start, self.index + start + length) elif self.index + start >= self.size: ind = (self.index + start) % self.size d = self._get_normal(ind, ind + length) else: d = self._get_concat(self.index + start, length - (self.size - (self.index + start))) if self.copy_on_ret: return copy.deepcopy(d) else: return d
def _get_normal(self, start, end): raise Exception("To be implemented!") def _get_concat(self, start, end): raise Exception("To be implemented!") def _add(self, s): raise Exception("To be implemented!") def _init_buffer(self): raise Exception("To be implemented!")