diff --git a/README.md b/README.md index b1a04543..6ad4d7ee 100644 --- a/README.md +++ b/README.md @@ -85,9 +85,6 @@ Camera App ──► [Video Frame] ──┬──► Display App > This repository offers a [DevContainer](https://containers.dev/). > For setting this up and enabling code completion read [eclipse-score/devcontainer/README.md#inside-the-container](https://github.com/eclipse-score/devcontainer/blob/main/README.md#inside-the-container). ->**Note**: -> If you are using Docker on Windows **without `WSL2`** in between, you have to select the alternative container `eclipse-s-core-docker-on-windows`. - ### Building the Project ```bash @@ -141,4 +138,4 @@ We welcome contributions! See our [Contributing Guide](CONTRIBUTING.md) for deta --- -**Note**: This is an open-source project under the Eclipse Foundation. It implements automotive-grade communication middleware suitable for safety-critical applications. \ No newline at end of file +**Note**: This is an open-source project under the Eclipse Foundation. It implements automotive-grade communication middleware suitable for safety-critical applications. diff --git a/score/mw/com/example/ipc_bridge/BUILD b/score/mw/com/example/ipc_bridge/BUILD index 402d261e..c2dbf033 100644 --- a/score/mw/com/example/ipc_bridge/BUILD +++ b/score/mw/com/example/ipc_bridge/BUILD @@ -95,3 +95,13 @@ cc_library( "//score/mw/com/impl/rust:bridge_macros", ], ) + +# Rust code dumps core when run in parallel with unit tests +sh_test( + name = "integration_test_ipc_bridge", + srcs = ["test_ipc_bridge.sh"], + data = [ + ":ipc_bridge_cpp", + ":ipc_bridge_rs", + ], +) diff --git a/score/mw/com/example/ipc_bridge/ipc_bridge.rs b/score/mw/com/example/ipc_bridge/ipc_bridge.rs index 38630689..c6dd8544 100644 --- a/score/mw/com/example/ipc_bridge/ipc_bridge.rs +++ b/score/mw/com/example/ipc_bridge/ipc_bridge.rs @@ -16,7 +16,7 @@ use std::thread::sleep; use std::time::Duration; use clap::{Parser, ValueEnum}; -use futures::{Stream, StreamExt}; +use futures::{future::Either, Stream, StreamExt}; #[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, ValueEnum)] enum Mode { @@ -32,9 +32,23 @@ enum Mode { #[derive(Parser)] struct Arguments { + /// Number of cycles that are executed before determining success or failure. 0 indicates no limit + #[arg( + short, + long, + default_value = "0", + )] + num_cycles: usize, /// Set to either send/skeleton or recv/proxy to determine the role of the process #[arg(value_enum, short, long)] mode: Mode, + /// Cycle time in milliseconds for sending/polling + #[arg( + short, + long, + value_parser = |s: &str| s.parse::().map(Duration::from_millis), + )] + cycle_time: Duration, #[arg( short, long, @@ -43,8 +57,7 @@ struct Arguments { service_instance_manifest: PathBuf, } -const SERVICE_DISCOVERY_SLEEP_DURATION: Duration = Duration::from_secs(1); -const DATA_RECEPTION_COUNT: usize = 100; +const SERVICE_DISCOVERY_SLEEP_DURATION: Duration = Duration::from_millis(10); /// Async function that takes `count` samples from the stream and prints the `x` field of each /// sample that is received. @@ -56,11 +69,16 @@ async fn get_samples< count: usize, ) { let map_api_lanes_stamped = pin!(map_api_lanes_stamped); - let mut limited_map_api_lanes_stamped = map_api_lanes_stamped.take(count); - while let Some(data) = limited_map_api_lanes_stamped.next().await { + + let mut map_api_lanes_stamped_iterator = if count > 0 { + Either::Left(map_api_lanes_stamped.take(count)) + } else { + Either::Right(map_api_lanes_stamped) + }; + while let Some(data) = map_api_lanes_stamped_iterator.next().await { println!("Received sample: {}", data.x); } - println!("Stream ended"); + println!("Unsubscribing..."); } /// Deliberately add Send to ensure that this is a future that can also be run by multithreaded @@ -69,14 +87,14 @@ fn run + Send>(future: F) { futures::executor::block_on(future); } -fn run_recv_mode(instance_specifier: mw_com::InstanceSpecifier) { +fn run_recv_mode(instance_specifier: mw_com::InstanceSpecifier, args: &Arguments) { let handles = loop { let handles = mw_com::proxy::find_service(instance_specifier.clone()) .expect("Instance specifier resolution failed"); if handles.len() > 0 { break handles; } else { - println!("No service found, retrying in 1 second"); + println!("No service found, retrying in 0.5 seconds"); sleep(SERVICE_DISCOVERY_SLEEP_DURATION); } }; @@ -93,38 +111,17 @@ fn run_recv_mode(instance_specifier: mw_com::InstanceSpecifier) { .expect("Failed to convert to stream"); run(get_samples( map_api_lanes_stamped_stream, - DATA_RECEPTION_COUNT, + args.num_cycles, )); } -fn run_send_mode(instance_specifier: mw_com::InstanceSpecifier) { +fn run_send_mode(instance_specifier: mw_com::InstanceSpecifier, args: &Arguments) { let skeleton = ipc_bridge_gen_rs::IpcBridge::Skeleton::new(&instance_specifier) .expect("BigDataSkeleton creation failed"); let skeleton = skeleton.offer_service().expect("Failed offering from rust"); let mut x: u32 = 0; - while x < 10 { - let mut sample: ipc_bridge_gen_rs::MapApiLanesStamped = - ipc_bridge_gen_rs::MapApiLanesStamped::default(); - sample.x = x; - skeleton - .events - .map_api_lanes_stamped_ - .send(sample) - .expect("Failed sending event"); - - println!("published {} sleeping", x); - x += 1; - sleep(Duration::from_millis(100)); - } - - println!("stopping offering and sleeping for 5sec"); - let skeleton = skeleton.stop_offer_service(); - sleep(Duration::from_secs(5)); - - let skeleton = skeleton.offer_service().expect("Reoffering failed"); - x = 0; - while x < 10 { + while x < args.num_cycles as u32 || args.num_cycles == 0 { let mut sample: ipc_bridge_gen_rs::MapApiLanesStamped = ipc_bridge_gen_rs::MapApiLanesStamped::default(); sample.x = x; @@ -134,9 +131,9 @@ fn run_send_mode(instance_specifier: mw_com::InstanceSpecifier) { .send(sample) .expect("Failed sending event"); - println!("published {} sleeping", x); + println!("Sending sample: {}", x); x += 1; - sleep(Duration::from_millis(100)); + sleep(args.cycle_time); } } @@ -158,11 +155,11 @@ fn main() { match args.mode { Mode::Send | Mode::Skeleton => { println!("Running in Send/Skeleton mode"); - run_send_mode(instance_specifier); + run_send_mode(instance_specifier, &args); } Mode::Recv | Mode::Proxy => { println!("Running in Recv/Proxy mode"); - run_recv_mode(instance_specifier); + run_recv_mode(instance_specifier, &args); } } } diff --git a/score/mw/com/example/ipc_bridge/main.cpp b/score/mw/com/example/ipc_bridge/main.cpp index 314bd56c..61f7754c 100644 --- a/score/mw/com/example/ipc_bridge/main.cpp +++ b/score/mw/com/example/ipc_bridge/main.cpp @@ -127,4 +127,5 @@ int main(const int argc, const char** argv) std::cerr << "Unknown mode " << mode << ", terminating." << std::endl; return EXIT_FAILURE; } + return EXIT_SUCCESS; } diff --git a/score/mw/com/example/ipc_bridge/sample_sender_receiver.cpp b/score/mw/com/example/ipc_bridge/sample_sender_receiver.cpp index db24972d..3c6f7e6c 100644 --- a/score/mw/com/example/ipc_bridge/sample_sender_receiver.cpp +++ b/score/mw/com/example/ipc_bridge/sample_sender_receiver.cpp @@ -18,12 +18,12 @@ #include "score/concurrency/notification.h" #include "score/mw/com/impl/proxy_event.h" +#include #include #include #include #include -#include #include #include #include @@ -231,7 +231,7 @@ score::Result GetHandleFromSpecifier(const InstanceSpecifier& handles = std::move(handles_result).value(); if (handles.size() == 0) { - std::this_thread::sleep_for(500ms); + std::this_thread::sleep_for(10ms); } } while (handles.size() == 0); diff --git a/score/mw/com/example/ipc_bridge/sample_sender_receiver.h b/score/mw/com/example/ipc_bridge/sample_sender_receiver.h index 541ad6de..f07f8f0a 100644 --- a/score/mw/com/example/ipc_bridge/sample_sender_receiver.h +++ b/score/mw/com/example/ipc_bridge/sample_sender_receiver.h @@ -20,7 +20,6 @@ #include #include #include -#include #include namespace score::mw::com diff --git a/score/mw/com/example/ipc_bridge/test_ipc_bridge.sh b/score/mw/com/example/ipc_bridge/test_ipc_bridge.sh new file mode 100755 index 00000000..67087507 --- /dev/null +++ b/score/mw/com/example/ipc_bridge/test_ipc_bridge.sh @@ -0,0 +1,144 @@ +#!/bin/bash +set -euxo pipefail + +MANIFEST_LOCATION="/tmp/mw_com_lola_service_manifest.json" + +# this cannot work if both sender and receiver expect 20 samples +# the sender has to send infinite samples and the receiver has to terminate after having received some +CPP_EXAMPLE_CMD="$(find * -name ipc_bridge_cpp) -s $MANIFEST_LOCATION --cycle-time 10" +RUST_EXAMPLE_CMD="$(find * -name ipc_bridge_rs) -s $MANIFEST_LOCATION --cycle-time 10" + +function cleanup_lola() { + # Ensure tests are run in a clean state + + # Linux + rm -rf /dev/shm/lola-*6432* + rm -rf /tmp/mw_com_lola/*/*6432* + rm -rf /tmp/lola-*-*6432*_lock + + # QNX + rm -rf /dev/shmem/lola-*6432* + rm -rf /tmp_discovery/mw_com_lola/*/*6432* + rm -rf /tmp_discovery/lola-*-*6432*_lock +} + +function create_service_manifest() { + local file="$1" + local provider_user_id=$2 + local consumer_user_id=$3 + + cat < "$file" +{ + "serviceTypes": [ + { + "serviceTypeName": "/bmw/adp/MapApiLanesStamped", + "version": { + "major": 1, + "minor": 0 + }, + "bindings": [ + { + "binding": "SHM", + "serviceId": 6432, + "events": [ + { + "eventName": "map_api_lanes_stamped", + "eventId": 1 + }, + { + "eventName": "dummy_data_stamped", + "eventId": 2 + } + ] + } + ] + } + ], + "serviceInstances": [ + { + "instanceSpecifier": "xpad/cp60/MapApiLanesStamped", + "serviceTypeName": "/bmw/adp/MapApiLanesStamped", + "version": { + "major": 1, + "minor": 0 + }, + "instances": [ + { + "instanceId": 1, + "allowedConsumer": { + "QM": [ + $consumer_user_id + ] + }, + "allowedProvider": { + "QM": [ + $provider_user_id + ] + }, + "asil-level": "QM", + "binding": "SHM", + "events": [ + { + "eventName": "map_api_lanes_stamped", + "numberOfSampleSlots": 10, + "maxSubscribers": 3 + } + ] + } + ] + } + ] +} +EOF +} + +function setup() { + create_service_manifest "$MANIFEST_LOCATION" $(id -u) $(id -u) + trap "rm -f $MANIFEST_LOCATION" EXIT +} + +function run_receiver_sender() { + EXAMPLE_CMD_RECV="$1" + EXAMPLE_CMD_SEND="$2" + + tempdir=$(mktemp -d /tmp/ipc_bridge.XXXXXX) + + # Ensure we start with a clean state + cleanup_lola + + # Run examples + $EXAMPLE_CMD_SEND --num-cycles 0 --mode send > "$tempdir/send.log" 2>&1 & + sender_pid=$! + $EXAMPLE_CMD_RECV --num-cycles 2 --mode recv > "$tempdir/recv.log" 2>&1 + + # Check if the receiver received the message + grep -q "Subscrib" "$tempdir/recv.log" + grep -q "Received sample" "$tempdir/recv.log" + rm -rf "$tempdir" + + # Kill sender and check its return code + kill $sender_pid + + set +e + wait $sender_pid + sender_return_code=$? + set -e + [[ "143" == "$sender_return_code" ]] + + # Cleanup due to SIGINT + cleanup_lola +} + +setup + +echo -e "\n\n\nRunning Rust receiver and Rust sender" +run_receiver_sender "$RUST_EXAMPLE_CMD" "$RUST_EXAMPLE_CMD" + +echo -e "\n\n\nRunning C++ receiver and C++ sender" +run_receiver_sender "$CPP_EXAMPLE_CMD" "$CPP_EXAMPLE_CMD" + +echo -e "\n\n\nRunning Rust receiver and C++ sender" +run_receiver_sender "$RUST_EXAMPLE_CMD" "$CPP_EXAMPLE_CMD" + +echo -e "\n\n\nRunning C++ receiver and Rust sender" +run_receiver_sender "$CPP_EXAMPLE_CMD" "$RUST_EXAMPLE_CMD"