SkillAgentSearch skills...

Crayfish23

Benchmarking Machine Learning Model Inference in Data Streaming Solutions

Install / Use

/learn @soniahorchidan/Crayfish23
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

Crayfish: Navigating the Labyrinth of Machine Learning Inference in Stream Processing Systems

This repository includes Crayfish, an extensible benchmarking framework that facilitates designing and executing comprehensive evaluation studies of streaming inference pipelines. Crayfish is described in our EDBT'24 paper, which also includes the first systematic performance evaluation study of model serving integration tools in Stream Processing Frameworks.

You can cite the paper using the BibTeX below:

@inproceedings{horchidan2024crayfish,
  title={Crayfish: Navigating the Labyrinth of Machine Learning Inference in Stream Processing Systems.},
  author={Horchidan, Sonia and Chen, Po-Hao and Kritharakis, Emmanouil and Carbone, Paris and Kalavri, Vasiliki},
  booktitle={EDBT},
  pages={676--689},
  year={2024}
}

Project Structure

.
|-- core                   # Crayfish Java core components and abstractions.
|-- crayfish-java          # Crayfish adapters (i.e., FLink, Spark Structured Streaming, Kafka Streams).
|-- experiments-driver     # Experiments testbed and configurations.
|-- input-producer         # Input Producer Component. Contains a random input generator.      
|-- output-consumer        # Output Consumer Component. Writes latency measurements to persistent storage.   
|-- rayfish                # Crayfish Ray Adapter.   
|-- resources              # Pre-trained models and training scripts.  
`-- results-analysis       # Notebooks used to analyze the results.

Supported Tools

Pre-trained Models

<table> <caption></caption> <thead> <tr> <th colspan="2"></th> <th>FNN</th> <th>ResNet50</th> </tr> </thead> <tbody> <tr> <td colspan="2">Input Size</td> <td>28x28</td> <td>224x224x3</td> </tr> <tr> <td colspan="2">Output Size</td> <td>10x1</td> <td>1000x1</td> </tr> <tr> <td colspan="2">Parameters Number</td> <td>28K</td> <td>23M</td> </tr> <tr> <td rowspan="4">Model Size</td> <td>ONNX</td> <td>113 KB</td> <td>97 MB</td> </tr> <tr> <td>Torch</td> <td>115 KB</td> <td>98 MB</td> </tr> <tr> <td>H5</td> <td>133 KB</td> <td>98 MB</td> </tr> <tr> <td>SavedModel</td> <td>508 KB</td> <td>101 MB</td> </tr> </tbody> </table>

Stream Processors

  • Apache Flink 1.15.2
  • Apache Kafka Streams 3.2.3
  • Spark Structured Streaming 3.3.2
  • Ray 2.4

Embedded Serving Frameworks

  • ONNX 1.12.1
  • DeepLearning4j 1.0.0-M2.1
  • TensorFlow Java (SavedModel) 1.13.1

External Serving Frameworks

  • TorchServe 0.7.1
  • TensorFlow Serving 2.11.1

Quick Start

Prerequisites

Environment

  1. Unix-like environment
  2. Python 3
  3. Maven
  4. Java 8
  5. Docker installation

Configuration

Before running the experiments, the models must be located under resources/. To train the models and save them in the required formats, run resources/training/convert_ffnn.py for the FFNN model and resources/training/convert_resnet50.py for the ResNet50 model.

Crayfish provides two execution modes: local, to test the experiments on a single node, and cluster, to deploy the Crayfish components on a cluster of machines. For each deployment, the configuration files experiments-driver/configs/global-configs-local.properties and experiments-driver/configs/global-configs-cluster.properties must be updated respectively.

NOTE! Do not forget to update the property files above to point to the local Apache Flink and Spark Structured Streaming installations.

Quick start

Building and packaging

The first step in executing the experiments is to compile and package the project. To do so, run the following command in the Crayfish home directory.

mvn clean install

Preparing the Docker images

NOTE! Make sure Docker is running before.

Make sure to download the needed Docker images:

docker pull confluentinc/cp-zookeeper:latest
docker pull confluentinc/cp-kafka:latest
docker pull tensorflow/serving:latest

Experiments

The main entry-point for the experiments is the run.sh script which can be found inside the experiments-driver directory.

run.sh has the following options:

Arguments:
                                       [!] NOTE: Configs in 'experiments-driver/configs/exp-configs/[-ec]/[-e]' will be run.
-e     | --experiments                 Independent variable sets to run: a=all, i=input rate, b=batch size, s=scalability, 
                                       r=bursty rate.
-ec    | --experiments-control         Controlled variable sets to run: s=small, l=large, d=debug.
                                         - small: Run input rate 256 for the scalability experiment.
                                           [!] NOTE: ResNet50 is recommended for this option due to the large
                                                     model size and limited memory.
                                         - large: Run input rate 30000 for the scalability experiment.
                                         - debug: Run simple experiment configs in the debug folder.
-sp    | --stream-processors           Stream processor to test:
                                         a=all, f=Apache Flink, k=Kafka Streams, s=Spark Streaming, r=Ray.
-m     | --models                      Served models: a=all, f=ffnn, r=resnet50, v=vgg19.
-msm   | --embedded-model-servers      Embedded model serving alternative:
                                         x=none, a=all (w/o noop and nd4j), n=nd4j, d=dl4j, o=onnx, t=tf-savedmodel, k=noop.
                                         [!] NOTE: noop will execute input rate and batch size experiments.
-msx   | --external-model-servers      External model serving alternative: x=none, a=all, t=tf-serving, s=torchserve.
-pm    | --parallelism-model           Valid only for Flink. Parallelism model alternative d=data parallel, t=task parallel."
-em    | --execution-mode              Execution mode: l=local, c=cluster.
-d     | --default-configs             Print default configs.
-h     | --help                        Help.

Depending on the experiment required to be executed, the script can be used as follows:

./experiments-driver/run.sh -e i -ec l -sp fk -m f -msm od -msx x -em l

In this case, Apache Flink and Kafka Streams will be chosen as stream processor. The FFNN model will be served using Deep Learning 4j and ONNX. The test bed will execute the large experiments, with high input rates. No external server will be tested.

If executed in local mode (-em l), the run.sh script will start all the needed daemon processes for the following components: the input producer, the data processor, the data component, and the external server.

In cluster mode (-em c), the run.sh script assumes that Kafka is already running on a cluster remotely, and that the daemons have been started and have accessible endpoints. The script experiments-driver/scripts/start-daemon.sh is used to start the daemons. The following command start a daemon process that spawns the data processor component:

./experiments-driver/scripts/start-daemon.sh -p dp -em l

start-daemon.sh has the following options:

Arguments:
-p     | --process                     Process which will be handled by the daemon: dp=data processor, 
                                           kp=kafka producer, es-tor=external server torchserve, 
                                           es-tfs=external server tf-serving, es-rs=external ray serve.
-em    | --execution-mode              Execution mode: l=local, c=cluster.

After the set of experiments complete, the resulting files will be written under results/. The notebooks inside results-analysis can then be used to plot the throughput and latency measurements.

NOTE: A containerized version will soon be available to faciltate deployments on a cluster.

Kafka Overhead Experiments

The experiments measuring the overhead introduced by Kafka in the pipeline do not use the Crayfish pipeline, as they employ a standalone FLink implementation. To run these experiments, the following script can be used:

./experiments-driver/run-standalone.sh -em l -ec l

run-standalone.sh has the following options:

Arguments:
-ec    | --experiments-control         Controlled variable sets to run: s=small, l=large, d=debug.
                                         - small: Run input rate 256 for the scalability experiment.
                                           [!] NOTE: ResNet50 is recommended for this option due to the large
                                                     model size and limited memory.
                                         - large: Run input rate 30000 for the scalability experiment.
                                         - debug: Run simple experiment configs in the debug folder.
-em    | --execution-mode              Execution mode: l=local, c=cluster.

Development Notes

If the experiments are stopped before they complete, please be aware that multiple background threads might be still hanging (e.g., the daemon threads waiting for wakeup signals). In this case, please make sure to kill all of them manually. You can use the ps command to find their PIDs.

Extending Crayfish

Crayfish provides a set of interfaces that allow developers to extend the benchmarking frameworks with other stream processing systems, model serving tools, and pre-trained models. We showcase examples of how to do so for each type of system.

New Stream Processors

New stream processors can be added through abstractions similar to adapters. These adapters need to extend the Crayfish interface and provide functionalities for input reading, model scoring, and output writing along

Related Skills

View on GitHub
GitHub Stars10
CategoryEducation
Updated4mo ago
Forks3

Languages

Java

Security Score

87/100

Audited on Nov 28, 2025

No findings