|
| 1 | +import re |
| 2 | +import xml.etree.ElementTree as ET |
| 3 | + |
| 4 | +from processpiper import ProcessMap, EventType, ActivityType, GatewayType |
| 5 | +from util_test import get_test_file_path, get_solution_file_path, mock_uuid |
| 6 | + |
| 7 | +namespace = {'bpmn': 'http://www.omg.org/spec/BPMN/20100524/MODEL'} |
| 8 | + |
| 9 | + |
| 10 | +def test_case03(mock_uuid): |
| 11 | + with ProcessMap( |
| 12 | + "Sample Test Process", colour_theme="BLUEMOUNTAIN" |
| 13 | + ) as my_process_map: |
| 14 | + with my_process_map.add_lane("End User") as lane1: |
| 15 | + start = lane1.add_element("Start", EventType.START) |
| 16 | + enter_keyword = lane1.add_element("Enter Keyword", ActivityType.TASK) |
| 17 | + |
| 18 | + with my_process_map.add_pool("System Search") as pool1: |
| 19 | + with pool1.add_lane("Database System") as lane2: |
| 20 | + login = lane2.add_element("Login", ActivityType.TASK) |
| 21 | + search_records = lane2.add_element("Search Records", ActivityType.TASK) |
| 22 | + result_found = lane2.add_element("Result Found?", GatewayType.EXCLUSIVE) |
| 23 | + display_result = lane2.add_element("Display Result", ActivityType.TASK) |
| 24 | + logout = lane2.add_element("Logout", ActivityType.TASK) |
| 25 | + end = lane2.add_element("End", EventType.END) |
| 26 | + |
| 27 | + with pool1.add_lane("Log System") as lane3: |
| 28 | + log_error = lane3.add_element("Log Error", ActivityType.TASK) |
| 29 | + |
| 30 | + start.connect(login, "User \nAuthenticates").connect( |
| 31 | + enter_keyword, "Authenticated" |
| 32 | + ).connect(search_records, "Search Criteria") |
| 33 | + search_records.connect(result_found, "Result").connect(display_result, "Yes") |
| 34 | + display_result.connect(logout).connect(end) |
| 35 | + result_found.connect(log_error, "No").connect(display_result) |
| 36 | + |
| 37 | + my_process_map.set_footer("Generated by ProcessPiper") |
| 38 | + my_process_map.draw() |
| 39 | + |
| 40 | + # Define paths for the generated BPMN file and the solution BPMN file |
| 41 | + generated_bpmn_path = get_test_file_path("test_bpmn_01.bpmn") |
| 42 | + solution_bpmn_path = get_solution_file_path("test_bpmn_01.bpmn") |
| 43 | + |
| 44 | + # Save the BPMN file and the image |
| 45 | + my_process_map.save(get_test_file_path("test_bpmn_01.png")) |
| 46 | + my_process_map.export_to_bpmn(generated_bpmn_path) |
| 47 | + |
| 48 | + # read bpmn file as xml and check if it is valid |
| 49 | + with open(generated_bpmn_path, "r") as generated_bpmn: |
| 50 | + generated_xml = generated_bpmn.read() |
| 51 | + |
| 52 | + tree = ET.ElementTree(ET.fromstring(generated_xml)) |
| 53 | + root = tree.getroot() |
| 54 | + |
| 55 | + # Does every start event have at least one outgoing element? |
| 56 | + start_events = root.findall(".//bpmn:startEvent", namespace) |
| 57 | + for start_event in start_events: |
| 58 | + outgoing_count, incoming_count = _count_outgoing_incoming(start_event) |
| 59 | + assert outgoing_count >= 1, "Start event does not have outgoing element" |
| 60 | + assert incoming_count == 0, "Start event has incoming element" |
| 61 | + |
| 62 | + # Does every end event have at least one incoming element? |
| 63 | + end_events = root.findall(".//bpmn:endEvent", namespace) |
| 64 | + for end_event in end_events: |
| 65 | + outgoing_count, incoming_count = _count_outgoing_incoming(end_event) |
| 66 | + assert outgoing_count == 0, "End event has outgoing element" |
| 67 | + assert incoming_count >= 1, "End event does not have incoming element" |
| 68 | + |
| 69 | + # Does intermediate event have at least one outgoing and incoming element? |
| 70 | + intermediate_events = root.findall(".//bpmn:intermediateCatchEvent", namespace) |
| 71 | + for intermediate_event in intermediate_events: |
| 72 | + outgoing_count, incoming_count = _count_outgoing_incoming(intermediate_event) |
| 73 | + assert outgoing_count >= 1, "Intermediate event does not have outgoing element" |
| 74 | + assert incoming_count >= 1, "Intermediate event does not have incoming element" |
| 75 | + |
| 76 | + # Does every task have at least one incoming and outgoing element? |
| 77 | + tasks = root.findall(".//bpmn:task", namespace) |
| 78 | + for task in tasks: |
| 79 | + outgoing_count, incoming_count = _count_outgoing_incoming(task) |
| 80 | + assert outgoing_count >= 1, "Task does not have outgoing element" |
| 81 | + assert incoming_count >= 1, "Task does not have incoming element" |
| 82 | + |
| 83 | + # Does every sub-process have at least one incoming and outgoing element? |
| 84 | + sub_processes = root.findall(".//bpmn:subProcess", namespace) |
| 85 | + for sub_process in sub_processes: |
| 86 | + outgoing_count, incoming_count = _count_outgoing_incoming(sub_process) |
| 87 | + assert outgoing_count >= 1, "Sub-process does not have outgoing element" |
| 88 | + assert incoming_count >= 1, "Sub-process does not have incoming element" |
| 89 | + |
| 90 | + # Does every service task have at least one incoming and outgoing element? |
| 91 | + service_tasks = root.findall(".//bpmn:serviceTask", namespace) |
| 92 | + for service_task in service_tasks: |
| 93 | + outgoing_count, incoming_count = _count_outgoing_incoming(service_task) |
| 94 | + assert outgoing_count >= 1, "Service task does not have outgoing element" |
| 95 | + assert incoming_count >= 1, "Service task does not have incoming element" |
| 96 | + |
| 97 | + # Does every exclusive gateway have at least one incoming and outgoing element? |
| 98 | + exclusive_gateways = root.findall(".//bpmn:exclusiveGateway", namespace) |
| 99 | + for exclusive_gateway in exclusive_gateways: |
| 100 | + outgoing_count, incoming_count = _count_outgoing_incoming(exclusive_gateway) |
| 101 | + assert outgoing_count >= 1, "Exclusive gateway does not have outgoing element" |
| 102 | + assert incoming_count >= 1, "Exclusive gateway does not have incoming element" |
| 103 | + |
| 104 | + # Does every inclusive gateway have at least one incoming and outgoing element? |
| 105 | + inclusive_gateways = root.findall(".//bpmn:inclusiveGateway", namespace) |
| 106 | + for inclusive_gateway in inclusive_gateways: |
| 107 | + outgoing_count, incoming_count = _count_outgoing_incoming(inclusive_gateway) |
| 108 | + assert outgoing_count >= 1, "Inclusive gateway does not have outgoing element" |
| 109 | + assert incoming_count >= 1, "Inclusive gateway does not have incoming element" |
| 110 | + |
| 111 | + # Does every parallel gateway have at least one incoming and outgoing element? |
| 112 | + parallel_gateways = root.findall(".//bpmn:parallelGateway", namespace) |
| 113 | + for parallel_gateway in parallel_gateways: |
| 114 | + outgoing_count, incoming_count = _count_outgoing_incoming(parallel_gateway) |
| 115 | + assert outgoing_count >= 1, "Parallel gateway does not have outgoing element" |
| 116 | + assert incoming_count >= 1, "Parallel gateway does not have incoming element" |
| 117 | + |
| 118 | + # Remove following attributes as they are always different: |
| 119 | + # - sourceRef |
| 120 | + # - targetRef |
| 121 | + # - id |
| 122 | + generated_xml = re.sub(r'sourceRef=".*?"', '', generated_xml) |
| 123 | + generated_xml = re.sub(r'targetRef=".*?"', '', generated_xml) |
| 124 | + generated_xml = re.sub(r'id=".*?"', '', generated_xml) |
| 125 | + generated_bpmn.close() |
| 126 | + |
| 127 | + # Remove following attributes as they are always different: |
| 128 | + # - sourceRef |
| 129 | + # - targetRef |
| 130 | + # - id |
| 131 | + with open(solution_bpmn_path, "r") as solution_bpmn: |
| 132 | + solution_xml = solution_bpmn.read() |
| 133 | + solution_xml = re.sub(r'sourceRef=".*?"', '', solution_xml) |
| 134 | + solution_xml = re.sub(r'targetRef=".*?"', '', solution_xml) |
| 135 | + solution_xml = re.sub(r'id=".*?"', '', solution_xml) |
| 136 | + solution_bpmn.close() |
| 137 | + |
| 138 | + # Compare the generated BPMN file with the solution BPMN file |
| 139 | + assert generated_xml == solution_xml, "Generated BPMN file does not match the solution BPMN file" |
| 140 | + |
| 141 | + |
| 142 | +def _count_outgoing_incoming(event): |
| 143 | + outgoing_count = len(event.findall("bpmn:outgoing", namespace)) |
| 144 | + incoming_count = len(event.findall("bpmn:incoming", namespace)) |
| 145 | + return outgoing_count, incoming_count |
0 commit comments