Logging
As any other device that is used to record and control an experiment, data streams from Harp devices can also be easily logged in real time. In this case, the decision as to what to log is somewhat easy. Since all the communication between the peripheral and the host is made via a sequence of HarpMessage
objects, this is the only piece of information we need to reconstruct the state of the experiment (as seen by the Harp device at least) at any given point in time.
Moreover, since all Harp messages follow a simple binary protocol, they can be efficiently (both in time and space) logged into disk using a simple flat binary file. The next sections will cover how to do this and discuss current recommendations for logging data streams from Harp devices.
MessageWriter
Since Harp is a binary protocol, any HarpMessage
can be logged by simply saving its raw binary representation. The binary representation (as a byte[]
) can be accessed via the MessageBytes
property. This means we could record the entire raw binary stream by feeding the sequence of message bytes to a binary logger.
The Bonsai.Harp
package provides a dedicated MessageWriter
operator that easily encapsulates this functionality:
Since all logging takes place on top of any HarpMessage
stream, the writers can also be used to log multiple devices in parallel, log filtered streams (e.g. after applying FilterRegister
) or even save host-generated commands (e.g. messages generated by a CreateMessage
operator).
GroupByRegister
While logging all Harp messages to a single binary is very easy, it is not always the most convenient way to log data. For instance, if one is interested in logging only a subset of messages (e.g. only the ADC
messages), then the previous approach would require a post-processing step to filter out the messages of interest.
Furthermore, each address has potentially different data formats (e.g. U8
vs U16
) or even different lengths if array registers are involved. This can make it very tedious to parse and analyze a binary file offline, since we will have to examine the header of each and every message in the file to determine how to extract its contents.
This analysis could be entirely eliminated if we knew that all messages in the binary file had the same format. For any Harp device, the payload stored in a specific register will have a fixed type and length. This means that to ensure our simplifying assumption it is enough to save each message from a specific register into a different file.
The GroupByRegister
operator is designed to automatically split the single Harp message stream into independent sub-streams for each device register address. There are many applications of this advanced operator, but the most common one is to demultiplex register messages before applying the MessageWriter
operator. If MessageWriter
receives a grouped message sequence, it will automatically generate one independent file for each register.
If we do this, we can ensure that all messages on each single file will have the same format and length and can thus be read and parsed in a single bulk operation. A simple implementation of this pattern is shown below:
The single-register log files can then be loaded using the following Python routine:
import numpy as np
import pandas as pd
_SECONDS_PER_TICK = 32e-6
_payloadtypes = {
1 : np.dtype(np.uint8),
2 : np.dtype(np.uint16),
4 : np.dtype(np.uint32),
8 : np.dtype(np.uint64),
129 : np.dtype(np.int8),
130 : np.dtype(np.int16),
132 : np.dtype(np.int32),
136 : np.dtype(np.int64),
68 : np.dtype(np.float32)
}
def read_harp_bin(file):
data = np.fromfile(file, dtype=np.uint8)
if len(data) == 0:
return None
stride = data[1] + 2
length = len(data) // stride
payloadsize = stride - 12
payloadtype = _payloadtypes[data[4] & ~0x10]
elementsize = payloadtype.itemsize
payloadshape = (length, payloadsize // elementsize)
seconds = np.ndarray(length, dtype=np.uint32, buffer=data, offset=5, strides=stride)
ticks = np.ndarray(length, dtype=np.uint16, buffer=data, offset=9, strides=stride)
seconds = ticks * _SECONDS_PER_TICK + seconds
payload = np.ndarray(
payloadshape,
dtype=payloadtype,
buffer=data, offset=11,
strides=(stride, elementsize))
if payload.shape[1] == 1:
ret_pd = pd.DataFrame(payload, index=seconds, columns= ["Value"])
ret_pd.index.names = ['Seconds']
else:
ret_pd = pd.DataFrame(payload, index=seconds)
ret_pd.index.names = ['Seconds']
return ret_pd