Source code for pymepix.util.tcpsampler

##############################################################################
##
# This file is part of Pymepix
#
# https://arxiv.org/abs/1905.07999
#
#
# Pymepix is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Pymepix is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with Pymepix.  If not, see <http://www.gnu.org/licenses/>.
#
##############################################################################
import ctypes
import multiprocessing
import socket
import time
from multiprocessing.sharedctypes import Value

from pymepix.core.log import ProcessLogger

# from pymepix.processing.basepipeline import BasePipelineObject
from pymepix.processing.rawtodisk import Raw2Disk


[docs]class TcpSampler(multiprocessing.Process, ProcessLogger): """Recieves tcp packets The same as UdpSampler just with TCP """ def __init__( self, address, longtime, chunk_size=10_000, flush_timeout=0.3, input_queue=None, create_output=True, num_outputs=1, shared_output=None, ): name = "TcpSampler" ProcessLogger.__init__(self, name) multiprocessing.Process.__init__(self) self.init_param = { "address": address, "chunk_size": chunk_size, "flush_timeout": flush_timeout, "longtime": longtime, } self._record = Value(ctypes.c_bool, False) self._enable = Value(ctypes.c_bool, True) self._close_file = Value(ctypes.c_bool, False) self.loop_count = 0 self._sock, self._conn, self._addr = None, None, None
[docs] def init_new_process(self): try: self.createConnection(self.init_param["address"]) self._chunk_size = self.init_param["chunk_size"] * 8192 self._flush_timeout = self.init_param["flush_timeout"] self._packets_collected = 0 self._packet_buffer_list = [ bytearray(2 * self._chunk_size) for i in range(5) ] # ring buffer to put received data in self._buffer_list_idx = 0 self._packet_buffer_view = memoryview( self._packet_buffer_list[self._buffer_list_idx] ) self._recv_bytes = 0 self._total_time = 0.0 self._longtime = self.init_param["longtime"] except Exception as e: self.error("Exception occured in init!!!") self.error(e, exc_info=True) raise
[docs] def createConnection(self, address): """Establishes a TCP connection""" self.info("Establishing connection to : {}".format(address)) self._sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self._sock.bind(address) self._sock.listen(1)
[docs] def get_useful_packets(self, packet): # Get the header header = ((packet & 0xF000000000000000) >> 60) & 0xF subheader = ((packet & 0x0F00000000000000) >> 56) & 0xF pix_filter = (header == 0xA) | (header == 0xB) trig_filter = ((header == 0x4) | (header == 0x6)) & (subheader == 0xF) tpx_filter = pix_filter | trig_filter tpx_packets = packet[tpx_filter] return tpx_packets
@property def enable(self): """Enables processing Determines whether the class will perform processing, this has the result of signalling the process to terminate. If there are objects ahead of it then they will stop receiving data if an input queue is required then it will get from the queue before checking processing This is done to prevent the queue from growing when a process behind it is still working Parameters ----------- value : bool Enable value Returns ----------- bool: Whether the process is enabled or not """ return bool(self._enable.value) @enable.setter def enable(self, value): self.debug("Setting enabled flag to {}".format(value)) self._enable.value = bool(value) @property def record(self): """Enables saving data to disk Determines whether the class will perform processing, this has the result of signalling the process to terminate. If there are objects ahead of it then they will stop recieving data if an input queue is required then it will get from the queue before checking processing This is done to prevent the queue from growing when a process behind it is still working Parameters ----------- value : bool Enable value Returns ----------- bool: Whether the process should record and write to disk or not """ return bool(self._record.value) @record.setter def record(self, value): self.debug(f"Setting record flag to {value}") self._record.value = bool(value) @property def close_file(self): return bool(self._close_file.value) @close_file.setter def close_file(self, value): self.debug(f"Setting close_file to {value}") self._close_file.value = bool(value) @property def outfile_name(self): return self._outfile_name @outfile_name.setter def outfile_name(self, fileN): self.info(f"Setting file name flag to {fileN}") if self.write2disk.open_file(fileN): self.info(f"file {fileN} opened") else: self.error("Huston, here's a problem, file cannot be created.")
[docs] def pre_run(self): self.init_new_process() print("pre-run create raw2disk") self.write2disk = Raw2Disk() self._last_update = time.time()
[docs] def post_run(self): if self._recv_bytes > 1: bytes_to_send = self._recv_bytes self._recv_bytes = 0 curr_list_idx = self._buffer_list_idx self._buffer_list_idx = (self._buffer_list_idx + 1) % len( self._packet_buffer_list ) self._packet_buffer_view = memoryview( self._packet_buffer_list[self._buffer_list_idx] ) self.write2disk.my_sock.send( self._packet_buffer_list[curr_list_idx][:bytes_to_send], copy=False ) if self.write2disk.writing: self.write2disk.my_sock.send( b"EOF" ) # we should get a response here, this ends up in nirvana at this point self.debug("post_run: closed file") # return MessageType.RawData, ( # self._packet_buffer_list[curr_list_idx][:bytes_to_send], self._longtime.value) return None, None else: if self.write2disk.writing: self.debug("post_run: close file") self.write2disk.my_sock.send( b"EOF" ) # we should get a response here, but the socket is elsewhere... self.debug("post_run: closed file") return None, None
[docs] def run(self): self.pre_run() enabled = True start = time.time() total_bytes_received = 0 self._conn, self._addr = self._sock.accept() while True: # enabled = self.enable if self.loop_count > 1_000_000: enabled = False self.loop_count += 1 if enabled: try: self._recv_bytes += self._conn.recv_into( self._packet_buffer_view[self._recv_bytes :] ) except socket.timeout: # put close file here to get the cases where there's no data coming and file should be closed # mainly there for test to succeed if self.close_file: self.close_file = 0 self.post_run() else: self.debug("Socket timeout") except socket.error: self.debug("socket error") # self.debug('Read {}'.format(raw_packet)) # self._packets_collected += 1 end = time.time() self._total_time += end - start # if self._packets_collected % 1000 == 0: # self.debug('Packets collected {}'.format(self._packets_collected)) # self.debug('Total time {} s'.format(self._total_time)) flush_time = end - self._last_update if (self._recv_bytes > self._chunk_size) or ( flush_time > self._flush_timeout ): # packet = np.frombuffer(self._packet_buffer_view[:self._recv_bytes], dtype=np.uint64) # TODO: put this in packet processor # print(packet) # tpx_packets = self.get_useful_packets(packet) if self.record: self.write2disk.my_sock.send( self._packet_buffer_list[self._buffer_list_idx][ : self._recv_bytes ], copy=False, ) elif self.close_file: self.close_file = 0 self.debug("received close file") self.write2disk.my_sock.send( self._packet_buffer_list[self._buffer_list_idx][ : self._recv_bytes ], copy=False, ) self.write2disk.my_sock.send(b"EOF") # bytes_to_send = self._recv_bytes total_bytes_received += self._recv_bytes self._recv_bytes = 0 curr_list_idx = self._buffer_list_idx # print('curr idx', curr_list_idx) self._buffer_list_idx = (self._buffer_list_idx + 1) % len( self._packet_buffer_list ) self._packet_buffer_view = memoryview( self._packet_buffer_list[self._buffer_list_idx] ) self._last_update = time.time() # if len(packet) > 1: # if not curr_list_idx % 20: # return MessageType.RawData, (self._packet_buffer_list[curr_list_idx][:bytes_to_send], self._longtime.value) # else: # return None, None # else: # return None, None # else: # return None, None else: self.debug("I AM LEAVING") break stop = time.time() dt = stop - start print(f"loop count {self.loop_count}") print(f"time for 1M packets: {dt:.2f}, MByte/s {total_bytes_received*1e-6/dt}") self.post_run() self._conn.close()
[docs] def stopRaw2Disk(self): # TODO: this doesn't work for now. should probably go to post_run """ self.debug('Stopping Raw2Disk') self.write2disk.close() self.write2disk.my_sock.send_string('SHUTDOWN') # print(write2disk.my_sock.recv()) self.write2disk.write_thr.join() self.debug('Raw2Disk stopped') """
[docs]def main(): # Create the logger import logging import time from multiprocessing import Process import numpy as np import zmq logging.basicConfig( level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) def send_data(packets, chunk_size, start=0, sleep=0.0001): ############ # send data sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.connect(("127.0.0.1", 50000)) test_data = np.arange( start, start + packets * chunk_size, dtype=np.uint64 ) # chunk size 135 -> max number=1_012_500 test_data_view = memoryview(test_data) time.sleep(1) # seems to be necessary if this function get called as a Process # first packet 0...134, second packet 135...269 and so on start = time.time() for i in range(0, len(test_data_view), chunk_size): sock.send(test_data_view[0 : 0 + chunk_size]) # time.sleep(sleep) # if there's no sleep, packets get lost stop = time.time() dt = stop - start print( f"time to send {dt:.2f}", f"time to send 1M: {dt/packets*1_000_002:.2f}s, " # f'bytes: {len(test_data_view.tobytes())}, ' f"MBytes: {len(test_data_view.tobytes()) * 1e-6:.1f}, " f"{len(test_data_view.tobytes()) * 1e-6 / dt:.2f} MByte/s", ) return test_data ctx = zmq.Context.instance() z_sock = ctx.socket(zmq.PAIR) z_sock.bind("tcp://127.0.0.1:40000") # z_sock.send_string('hallo') # print(z_sock.recv_string()) sampler = TcpSampler(("127.0.0.1", 50000), 1) time.sleep(1) # give thread time to start # send data packets = 2_500_000 chunk_size = 135 # test_data = np.arange(0, packets * chunk_size, dtype=np.uint64) # test_data = send_data(packets=10_000, chunk_size=135, start=15000, sleep=1e-4) p = Process(target=send_data, args=(packets, chunk_size, 0, 0)) start = time.time() sampler.start() time.sleep(1) p.start() p.join() stop = time.time() z_sock.send_string("SHUTDOWN") z_sock.close() sampler.join() print(f"took {stop - start:.2f}s")
if __name__ == "__main__": main()