Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,9 +85,6 @@ Camera App ──► [Video Frame] ──┬──► Display App
> This repository offers a [DevContainer](https://containers.dev/).
> For setting this up and enabling code completion read [eclipse-score/devcontainer/README.md#inside-the-container](https://github.com/eclipse-score/devcontainer/blob/main/README.md#inside-the-container).

>**Note**:
> If you are using Docker on Windows **without `WSL2`** in between, you have to select the alternative container `eclipse-s-core-docker-on-windows`.

### Building the Project

```bash
Expand Down Expand Up @@ -141,4 +138,4 @@ We welcome contributions! See our [Contributing Guide](CONTRIBUTING.md) for deta

---

**Note**: This is an open-source project under the Eclipse Foundation. It implements automotive-grade communication middleware suitable for safety-critical applications.
**Note**: This is an open-source project under the Eclipse Foundation. It implements automotive-grade communication middleware suitable for safety-critical applications.
10 changes: 10 additions & 0 deletions score/mw/com/example/ipc_bridge/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,13 @@ cc_library(
"//score/mw/com/impl/rust:bridge_macros",
],
)

# Rust code dumps core when run in parallel with unit tests
sh_test(
name = "integration_test_ipc_bridge",
srcs = ["test_ipc_bridge.sh"],
data = [
":ipc_bridge_cpp",
":ipc_bridge_rs",
],
)
69 changes: 33 additions & 36 deletions score/mw/com/example/ipc_bridge/ipc_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::thread::sleep;
use std::time::Duration;

use clap::{Parser, ValueEnum};
use futures::{Stream, StreamExt};
use futures::{future::Either, Stream, StreamExt};

#[derive(Copy, Clone, Debug, Eq, PartialEq, PartialOrd, Ord, ValueEnum)]
enum Mode {
Expand All @@ -32,9 +32,23 @@ enum Mode {

#[derive(Parser)]
struct Arguments {
/// Number of cycles that are executed before determining success or failure. 0 indicates no limit
#[arg(
short,
long,
default_value = "0",
)]
num_cycles: usize,
/// Set to either send/skeleton or recv/proxy to determine the role of the process
#[arg(value_enum, short, long)]
mode: Mode,
/// Cycle time in milliseconds for sending/polling
#[arg(
short,
long,
value_parser = |s: &str| s.parse::<u64>().map(Duration::from_millis),
)]
cycle_time: Duration,
#[arg(
short,
long,
Expand All @@ -43,8 +57,7 @@ struct Arguments {
service_instance_manifest: PathBuf,
}

const SERVICE_DISCOVERY_SLEEP_DURATION: Duration = Duration::from_secs(1);
const DATA_RECEPTION_COUNT: usize = 100;
const SERVICE_DISCOVERY_SLEEP_DURATION: Duration = Duration::from_millis(10);

/// Async function that takes `count` samples from the stream and prints the `x` field of each
/// sample that is received.
Expand All @@ -56,11 +69,16 @@ async fn get_samples<
count: usize,
) {
let map_api_lanes_stamped = pin!(map_api_lanes_stamped);
let mut limited_map_api_lanes_stamped = map_api_lanes_stamped.take(count);
while let Some(data) = limited_map_api_lanes_stamped.next().await {

let mut map_api_lanes_stamped_iterator = if count > 0 {
Either::Left(map_api_lanes_stamped.take(count))
} else {
Either::Right(map_api_lanes_stamped)
};
while let Some(data) = map_api_lanes_stamped_iterator.next().await {
println!("Received sample: {}", data.x);
}
println!("Stream ended");
println!("Unsubscribing...");
}

/// Deliberately add Send to ensure that this is a future that can also be run by multithreaded
Expand All @@ -69,14 +87,14 @@ fn run<F: std::future::Future<Output = ()> + Send>(future: F) {
futures::executor::block_on(future);
}

fn run_recv_mode(instance_specifier: mw_com::InstanceSpecifier) {
fn run_recv_mode(instance_specifier: mw_com::InstanceSpecifier, args: &Arguments) {
let handles = loop {
let handles = mw_com::proxy::find_service(instance_specifier.clone())
.expect("Instance specifier resolution failed");
if handles.len() > 0 {
break handles;
} else {
println!("No service found, retrying in 1 second");
println!("No service found, retrying in 0.5 seconds");
sleep(SERVICE_DISCOVERY_SLEEP_DURATION);
}
};
Expand All @@ -93,38 +111,17 @@ fn run_recv_mode(instance_specifier: mw_com::InstanceSpecifier) {
.expect("Failed to convert to stream");
run(get_samples(
map_api_lanes_stamped_stream,
DATA_RECEPTION_COUNT,
args.num_cycles,
));
}

fn run_send_mode(instance_specifier: mw_com::InstanceSpecifier) {
fn run_send_mode(instance_specifier: mw_com::InstanceSpecifier, args: &Arguments) {
let skeleton = ipc_bridge_gen_rs::IpcBridge::Skeleton::new(&instance_specifier)
.expect("BigDataSkeleton creation failed");

let skeleton = skeleton.offer_service().expect("Failed offering from rust");
let mut x: u32 = 0;
while x < 10 {
let mut sample: ipc_bridge_gen_rs::MapApiLanesStamped =
ipc_bridge_gen_rs::MapApiLanesStamped::default();
sample.x = x;
skeleton
.events
.map_api_lanes_stamped_
.send(sample)
.expect("Failed sending event");

println!("published {} sleeping", x);
x += 1;
sleep(Duration::from_millis(100));
}

println!("stopping offering and sleeping for 5sec");
let skeleton = skeleton.stop_offer_service();
sleep(Duration::from_secs(5));

let skeleton = skeleton.offer_service().expect("Reoffering failed");
x = 0;
while x < 10 {
while x < args.num_cycles as u32 || args.num_cycles == 0 {
let mut sample: ipc_bridge_gen_rs::MapApiLanesStamped =
ipc_bridge_gen_rs::MapApiLanesStamped::default();
sample.x = x;
Expand All @@ -134,9 +131,9 @@ fn run_send_mode(instance_specifier: mw_com::InstanceSpecifier) {
.send(sample)
.expect("Failed sending event");

println!("published {} sleeping", x);
println!("Sending sample: {}", x);
x += 1;
sleep(Duration::from_millis(100));
sleep(args.cycle_time);
}
}

Expand All @@ -158,11 +155,11 @@ fn main() {
match args.mode {
Mode::Send | Mode::Skeleton => {
println!("Running in Send/Skeleton mode");
run_send_mode(instance_specifier);
run_send_mode(instance_specifier, &args);
}
Mode::Recv | Mode::Proxy => {
println!("Running in Recv/Proxy mode");
run_recv_mode(instance_specifier);
run_recv_mode(instance_specifier, &args);
}
}
}
1 change: 1 addition & 0 deletions score/mw/com/example/ipc_bridge/main.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,5 @@ int main(const int argc, const char** argv)
std::cerr << "Unknown mode " << mode << ", terminating." << std::endl;
return EXIT_FAILURE;
}
return EXIT_SUCCESS;
}
4 changes: 2 additions & 2 deletions score/mw/com/example/ipc_bridge/sample_sender_receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
#include "score/concurrency/notification.h"

#include "score/mw/com/impl/proxy_event.h"
#include <random>
#include <score/assert.hpp>
#include <score/hash.hpp>
#include <score/optional.hpp>

#include <cstring>
#include <fstream>
#include <iostream>
#include <sstream>
#include <string>
Expand Down Expand Up @@ -231,7 +231,7 @@ score::Result<impl::HandleType> GetHandleFromSpecifier(const InstanceSpecifier&
handles = std::move(handles_result).value();
if (handles.size() == 0)
{
std::this_thread::sleep_for(500ms);
std::this_thread::sleep_for(10ms);
}
} while (handles.size() == 0);

Expand Down
1 change: 0 additions & 1 deletion score/mw/com/example/ipc_bridge/sample_sender_receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#include <atomic>
#include <chrono>
#include <mutex>
#include <random>
#include <vector>

namespace score::mw::com
Expand Down
144 changes: 144 additions & 0 deletions score/mw/com/example/ipc_bridge/test_ipc_bridge.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
#!/bin/bash
set -euxo pipefail

MANIFEST_LOCATION="/tmp/mw_com_lola_service_manifest.json"

# this cannot work if both sender and receiver expect 20 samples
# the sender has to send infinite samples and the receiver has to terminate after having received some
CPP_EXAMPLE_CMD="$(find * -name ipc_bridge_cpp) -s $MANIFEST_LOCATION --cycle-time 10"
RUST_EXAMPLE_CMD="$(find * -name ipc_bridge_rs) -s $MANIFEST_LOCATION --cycle-time 10"

function cleanup_lola() {
# Ensure tests are run in a clean state

# Linux
rm -rf /dev/shm/lola-*6432*
rm -rf /tmp/mw_com_lola/*/*6432*
rm -rf /tmp/lola-*-*6432*_lock

# QNX
rm -rf /dev/shmem/lola-*6432*
rm -rf /tmp_discovery/mw_com_lola/*/*6432*
rm -rf /tmp_discovery/lola-*-*6432*_lock
}

function create_service_manifest() {
local file="$1"
local provider_user_id=$2
local consumer_user_id=$3

cat <<EOF > "$file"
{
"serviceTypes": [
{
"serviceTypeName": "/bmw/adp/MapApiLanesStamped",
"version": {
"major": 1,
"minor": 0
},
"bindings": [
{
"binding": "SHM",
"serviceId": 6432,
"events": [
{
"eventName": "map_api_lanes_stamped",
"eventId": 1
},
{
"eventName": "dummy_data_stamped",
"eventId": 2
}
]
}
]
}
],
"serviceInstances": [
{
"instanceSpecifier": "xpad/cp60/MapApiLanesStamped",
"serviceTypeName": "/bmw/adp/MapApiLanesStamped",
"version": {
"major": 1,
"minor": 0
},
"instances": [
{
"instanceId": 1,
"allowedConsumer": {
"QM": [
$consumer_user_id
]
},
"allowedProvider": {
"QM": [
$provider_user_id
]
},
"asil-level": "QM",
"binding": "SHM",
"events": [
{
"eventName": "map_api_lanes_stamped",
"numberOfSampleSlots": 10,
"maxSubscribers": 3
}
]
}
]
}
]
}
EOF
}

function setup() {
create_service_manifest "$MANIFEST_LOCATION" $(id -u) $(id -u)
trap "rm -f $MANIFEST_LOCATION" EXIT
}

function run_receiver_sender() {
EXAMPLE_CMD_RECV="$1"
EXAMPLE_CMD_SEND="$2"

tempdir=$(mktemp -d /tmp/ipc_bridge.XXXXXX)

# Ensure we start with a clean state
cleanup_lola

# Run examples
$EXAMPLE_CMD_SEND --num-cycles 0 --mode send > "$tempdir/send.log" 2>&1 &
sender_pid=$!
$EXAMPLE_CMD_RECV --num-cycles 2 --mode recv > "$tempdir/recv.log" 2>&1

# Check if the receiver received the message
grep -q "Subscrib" "$tempdir/recv.log"
grep -q "Received sample" "$tempdir/recv.log"
rm -rf "$tempdir"

# Kill sender and check its return code
kill $sender_pid

set +e
wait $sender_pid
sender_return_code=$?
set -e
[[ "143" == "$sender_return_code" ]]

# Cleanup due to SIGINT
cleanup_lola
}

setup

echo -e "\n\n\nRunning Rust receiver and Rust sender"
run_receiver_sender "$RUST_EXAMPLE_CMD" "$RUST_EXAMPLE_CMD"

echo -e "\n\n\nRunning C++ receiver and C++ sender"
run_receiver_sender "$CPP_EXAMPLE_CMD" "$CPP_EXAMPLE_CMD"

echo -e "\n\n\nRunning Rust receiver and C++ sender"
run_receiver_sender "$RUST_EXAMPLE_CMD" "$CPP_EXAMPLE_CMD"

echo -e "\n\n\nRunning C++ receiver and Rust sender"
run_receiver_sender "$CPP_EXAMPLE_CMD" "$RUST_EXAMPLE_CMD"