From f3bda6d0e892b2fd7b0466b1a1e630674fd58357 Mon Sep 17 00:00:00 2001 From: "junie-eap[bot]" Date: Mon, 28 Apr 2025 11:42:46 +0000 Subject: [PATCH] chore(junie): Junie Implement Actor Model pattern changes from the task: #3 --- actor-model/README.md | 105 +++++++++++++++++- .../java/com/iluwatar/actormodel/Actor.java | 43 ++++++- .../com/iluwatar/actormodel/ActorSystem.java | 71 +++++++++++- .../java/com/iluwatar/actormodel/App.java | 55 +++++++-- .../com/iluwatar/actormodel/ExampleActor.java | 25 ++++- .../iluwatar/actormodel/ExampleActor2.java | 14 ++- .../{actor => actormodel}/ActorModelTest.java | 61 ++++++++-- 7 files changed, 343 insertions(+), 31 deletions(-) rename actor-model/src/test/java/com/iluwatar/{actor => actormodel}/ActorModelTest.java (53%) diff --git a/actor-model/README.md b/actor-model/README.md index be8065ffefef..4440d0fe77ed 100644 --- a/actor-model/README.md +++ b/actor-model/README.md @@ -65,21 +65,61 @@ public abstract class Actor implements Runnable { @Setter @Getter private String actorId; private final BlockingQueue mailbox = new LinkedBlockingQueue<>(); private volatile boolean active = true; + @Setter @Getter private ActorSystem actorSystem; - + /** + * Sends a message to this actor. + */ public void send(Message message) { mailbox.add(message); } + /** + * Stops this actor. + */ public void stop() { active = false; } + /** + * Creates a new child actor supervised by this actor. + * This implements the capability for actors to create new actors. + */ + protected String createChildActor(Actor childActor) { + if (actorSystem == null) { + throw new IllegalStateException("Actor system not set"); + } + childActor.setActorSystem(actorSystem); + return actorSystem.startChildActor(childActor, actorId); + } + + /** + * Handles errors that occur during message processing. + * By default, it restarts the actor, but child classes can override this. + */ + protected void handleError(Throwable error) { + if (actorSystem != null) { + actorSystem.restartActor(actorId, error); + } + } + @Override public void run() { - + while (active) { + try { + Message message = mailbox.take(); // Wait for a message + try { + onReceive(message); // Process it + } catch (Exception e) { + handleError(e); // Handle any errors + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } + // Child classes must define what to do with a message protected abstract void onReceive(Message message); } @@ -102,16 +142,74 @@ public class Message { ```java public class ActorSystem { - public void startActor(Actor actor) { + private final ExecutorService executor = Executors.newCachedThreadPool(); + private final ConcurrentHashMap actorRegister = new ConcurrentHashMap<>(); + private final ConcurrentHashMap actorHierarchy = new ConcurrentHashMap<>(); + private final AtomicInteger idCounter = new AtomicInteger(0); + + /** + * Starts an actor without a parent (top-level actor). + */ + public String startActor(Actor actor) { String actorId = "actor-" + idCounter.incrementAndGet(); // Generate a new and unique ID actor.setActorId(actorId); // assign the actor it's ID actorRegister.put(actorId, actor); // Register and save the actor with it's ID executor.submit(actor); // Run the actor in a thread + return actorId; + } + + /** + * Starts an actor with a parent actor (child actor). + * The parent actor will supervise this child actor. + */ + public String startChildActor(Actor actor, String parentId) { + String actorId = startActor(actor); + actorHierarchy.put(actorId, parentId); // Record parent-child relationship + return actorId; } + + /** + * Gets an actor by its ID. + */ public Actor getActorById(String actorId) { return actorRegister.get(actorId); // Find by Id } + /** + * Gets the parent actor of an actor. + */ + public Actor getParentActor(String childId) { + String parentId = actorHierarchy.get(childId); + return parentId != null ? actorRegister.get(parentId) : null; + } + + /** + * Restarts an actor that has failed. + * If the actor has a parent, the parent will be notified. + */ + public void restartActor(String actorId, Throwable error) { + Actor actor = actorRegister.get(actorId); + if (actor != null) { + // Stop the current actor + actor.stop(); + + // Notify parent if exists + String parentId = actorHierarchy.get(actorId); + if (parentId != null) { + Actor parent = actorRegister.get(parentId); + if (parent != null) { + parent.send(new Message("Child actor " + actorId + " failed: " + error.getMessage(), "system")); + } + } + + // Restart the actor + executor.submit(actor); + } + } + + /** + * Shuts down the actor system, stopping all actors. + */ public void shutdown() { executor.shutdownNow(); // Stop all threads } @@ -198,4 +296,3 @@ public class App { - *Reactive Design Patterns*, Roland Kuhn - *The Actor Model in 10 Minutes*, [InfoQ Article](https://www.infoq.com/articles/actor-model/) - [Akka Documentation](https://doc.akka.io/docs/akka/current/index.html) - diff --git a/actor-model/src/main/java/com/iluwatar/actormodel/Actor.java b/actor-model/src/main/java/com/iluwatar/actormodel/Actor.java index 6e2aaccd1937..99db5b343fd4 100644 --- a/actor-model/src/main/java/com/iluwatar/actormodel/Actor.java +++ b/actor-model/src/main/java/com/iluwatar/actormodel/Actor.java @@ -38,20 +38,61 @@ public abstract class Actor implements Runnable { // rather than being cached in a thread's local memory. To make it consistent to all Actors + @Setter @Getter private ActorSystem actorSystem; + + /** + * Sends a message to this actor. + * + * @param message The message to send + */ public void send(Message message) { mailbox.add(message); // Add message to queue } + /** + * Stops this actor. + */ public void stop() { active = false; // Stop the actor loop } + /** + * Creates a new child actor supervised by this actor. + * This implements the capability for actors to create new actors. + * + * @param childActor The actor to create as a child + * @return The ID of the created child actor + */ + protected String createChildActor(Actor childActor) { + if (actorSystem == null) { + throw new IllegalStateException("Actor system not set"); + } + childActor.setActorSystem(actorSystem); + return actorSystem.startChildActor(childActor, actorId); + } + + /** + * Handles errors that occur during message processing. + * By default, it restarts the actor, but child classes can override this. + * + * @param error The error that occurred + */ + protected void handleError(Throwable error) { + if (actorSystem != null) { + actorSystem.restartActor(actorId, error); + } + } + @Override public void run() { while (active) { try { Message message = mailbox.take(); // Wait for a message - onReceive(message); // Process it + try { + onReceive(message); // Process it + } catch (Exception e) { + handleError(e); // Handle any errors + } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } diff --git a/actor-model/src/main/java/com/iluwatar/actormodel/ActorSystem.java b/actor-model/src/main/java/com/iluwatar/actormodel/ActorSystem.java index db7c21cb6088..0f1d8eacd985 100644 --- a/actor-model/src/main/java/com/iluwatar/actormodel/ActorSystem.java +++ b/actor-model/src/main/java/com/iluwatar/actormodel/ActorSystem.java @@ -32,19 +32,88 @@ public class ActorSystem { private final ExecutorService executor = Executors.newCachedThreadPool(); private final ConcurrentHashMap actorRegister = new ConcurrentHashMap<>(); + private final ConcurrentHashMap actorHierarchy = new ConcurrentHashMap<>(); private final AtomicInteger idCounter = new AtomicInteger(0); - public void startActor(Actor actor) { + /** + * Starts an actor without a parent (top-level actor). + * + * @param actor The actor to start + * @return The ID of the started actor + */ + public String startActor(Actor actor) { String actorId = "actor-" + idCounter.incrementAndGet(); // Generate a new and unique ID actor.setActorId(actorId); // assign the actor it's ID actorRegister.put(actorId, actor); // Register and save the actor with it's ID executor.submit(actor); // Run the actor in a thread + return actorId; } + /** + * Starts an actor with a parent actor (child actor). + * The parent actor will supervise this child actor. + * + * @param actor The actor to start + * @param parentId The ID of the parent actor + * @return The ID of the started actor + */ + public String startChildActor(Actor actor, String parentId) { + String actorId = startActor(actor); + actorHierarchy.put(actorId, parentId); // Record parent-child relationship + return actorId; + } + + /** + * Gets an actor by its ID. + * + * @param actorId The ID of the actor to get + * @return The actor with the given ID, or null if not found + */ public Actor getActorById(String actorId) { return actorRegister.get(actorId); // Find by Id } + /** + * Gets the parent actor of an actor. + * + * @param childId The ID of the child actor + * @return The parent actor, or null if the actor has no parent + */ + public Actor getParentActor(String childId) { + String parentId = actorHierarchy.get(childId); + return parentId != null ? actorRegister.get(parentId) : null; + } + + /** + * Restarts an actor that has failed. + * If the actor has a parent, the parent will be notified. + * + * @param actorId The ID of the actor to restart + * @param error The error that caused the actor to fail + */ + public void restartActor(String actorId, Throwable error) { + Actor actor = actorRegister.get(actorId); + if (actor != null) { + // Stop the current actor + actor.stop(); + + // Notify parent if exists + String parentId = actorHierarchy.get(actorId); + if (parentId != null) { + Actor parent = actorRegister.get(parentId); + if (parent != null) { + parent.send(new Message("Child actor " + actorId + " failed: " + error.getMessage(), "system")); + } + } + + // Restart the actor + executor.submit(actor); + } + } + + /** + * Shuts down the actor system, stopping all actors. + */ public void shutdown() { executor.shutdownNow(); // Stop all threads } diff --git a/actor-model/src/main/java/com/iluwatar/actormodel/App.java b/actor-model/src/main/java/com/iluwatar/actormodel/App.java index 79fe79e48a6f..aadcfc3dd040 100644 --- a/actor-model/src/main/java/com/iluwatar/actormodel/App.java +++ b/actor-model/src/main/java/com/iluwatar/actormodel/App.java @@ -44,21 +44,56 @@ */ package com.iluwatar.actormodel; +import lombok.extern.slf4j.Slf4j; + +@Slf4j public class App { + /** + * Main method to demonstrate the Actor Model pattern. + * + * @param args command line arguments (not used) + * @throws InterruptedException if thread is interrupted + */ public static void main(String[] args) throws InterruptedException { + // Create the actor system ActorSystem system = new ActorSystem(); - Actor srijan = new ExampleActor(system); - Actor ansh = new ExampleActor2(system); - system.startActor(srijan); - system.startActor(ansh); - ansh.send(new Message("Hello ansh", srijan.getActorId())); - srijan.send(new Message("Hello srijan!", ansh.getActorId())); + // Create and start parent actors + ExampleActor parent = new ExampleActor(system); + String parentId = system.startActor(parent); + LOGGER.info("Started parent actor with ID: {}", parentId); + + // Parent creates a child actor + String childId = parent.createChildExampleActor(); + LOGGER.info("Parent created child actor with ID: {}", childId); + + // Get the child actor + ExampleActor2 child = (ExampleActor2) system.getActorById(childId); + + // Basic message passing + child.send(new Message("Hello from parent", parent.getActorId())); + parent.send(new Message("Hello from child", child.getActorId())); + + Thread.sleep(500); // Give time for messages to process + + // Demonstrate supervision - send a message that will cause an error + LOGGER.info("Sending message that will cause an error to demonstrate supervision"); + child.send(new Message("This will cause an error", parent.getActorId())); + + Thread.sleep(1000); // Give time for error handling and supervision + + // Create another actor directly through the system + Actor anotherActor = new ExampleActor2(system); + system.startActor(anotherActor); + anotherActor.send(new Message("Hello from the system", "system")); - Thread.sleep(1000); // Give time for messages to process + Thread.sleep(500); // Give time for messages to process - srijan.stop(); // Stop the actor gracefully - ansh.stop(); - system.shutdown(); // Stop the actor system + // Graceful shutdown + LOGGER.info("Shutting down actors and system"); + parent.stop(); + child.stop(); + anotherActor.stop(); + system.shutdown(); } } diff --git a/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor.java b/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor.java index fd49325f44bd..1d716e21ec59 100644 --- a/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor.java +++ b/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor.java @@ -31,23 +31,38 @@ @Slf4j public class ExampleActor extends Actor { - private final ActorSystem actorSystem; @Getter private final List receivedMessages = new ArrayList<>(); public ExampleActor(ActorSystem actorSystem) { - this.actorSystem = actorSystem; + setActorSystem(actorSystem); } - // Logger log = Logger.getLogger(getClass().getName()); - @Override protected void onReceive(Message message) { LOGGER.info( "[{}]Received : {} from : [{}]", getActorId(), message.getContent(), message.getSenderId()); - Actor sender = actorSystem.getActorById(message.getSenderId()); // sender actor id + receivedMessages.add(message.getContent()); + + // Check if this is a system message about a child actor failure + if (message.getSenderId().equals("system") && message.getContent().contains("failed")) { + LOGGER.info("[{}] Handling child failure: {}", getActorId(), message.getContent()); + return; + } + + Actor sender = getActorSystem().getActorById(message.getSenderId()); // sender actor id // Reply of the message if (sender != null && !message.getSenderId().equals(getActorId())) { sender.send(new Message("I got your message ", getActorId())); } } + + /** + * Creates a child actor that will be supervised by this actor. + * + * @return The ID of the created child actor + */ + public String createChildExampleActor() { + ExampleActor2 childActor = new ExampleActor2(getActorSystem()); + return createChildActor(childActor); + } } diff --git a/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor2.java b/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor2.java index 037f96716558..81e60fb5ec81 100644 --- a/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor2.java +++ b/actor-model/src/main/java/com/iluwatar/actormodel/ExampleActor2.java @@ -31,16 +31,26 @@ @Slf4j public class ExampleActor2 extends Actor { - private final ActorSystem actorSystem; @Getter private final List receivedMessages = new ArrayList<>(); public ExampleActor2(ActorSystem actorSystem) { - this.actorSystem = actorSystem; + setActorSystem(actorSystem); } @Override protected void onReceive(Message message) { receivedMessages.add(message.getContent()); LOGGER.info("[{}]Received : {}", getActorId(), message.getContent()); + + // Simulate an error occasionally to demonstrate supervision + if (message.getContent().contains("error")) { + throw new RuntimeException("Simulated error in ExampleActor2"); + } + } + + @Override + protected void handleError(Throwable error) { + LOGGER.error("[{}] Error occurred: {}", getActorId(), error.getMessage()); + super.handleError(error); } } diff --git a/actor-model/src/test/java/com/iluwatar/actor/ActorModelTest.java b/actor-model/src/test/java/com/iluwatar/actormodel/ActorModelTest.java similarity index 53% rename from actor-model/src/test/java/com/iluwatar/actor/ActorModelTest.java rename to actor-model/src/test/java/com/iluwatar/actormodel/ActorModelTest.java index a4a0dee569ab..fe8d2a0e499f 100644 --- a/actor-model/src/test/java/com/iluwatar/actor/ActorModelTest.java +++ b/actor-model/src/test/java/com/iluwatar/actormodel/ActorModelTest.java @@ -22,16 +22,12 @@ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN * THE SOFTWARE. */ -package com.iluwatar.actor; +package com.iluwatar.actormodel; import static org.junit.jupiter.api.Assertions.*; -import com.iluwatar.actormodel.ActorSystem; -import com.iluwatar.actormodel.App; -import com.iluwatar.actormodel.ExampleActor; -import com.iluwatar.actormodel.ExampleActor2; -import com.iluwatar.actormodel.Message; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Assertions; public class ActorModelTest { @Test @@ -49,15 +45,64 @@ public void testMessagePassing() throws InterruptedException { system.startActor(srijan); system.startActor(ansh); - // Ansh recieves a message from Srijan + // Ansh receives a message from Srijan ansh.send(new Message("Hello ansh", srijan.getActorId())); // Wait briefly to allow async processing Thread.sleep(200); - // Check that Srijan received the message + // Check that Ansh received the message assertTrue( ansh.getReceivedMessages().contains("Hello ansh"), "ansh should receive the message from Srijan"); } + + @Test + public void testActorCreation() throws InterruptedException { + ActorSystem system = new ActorSystem(); + + // Create parent actor + ExampleActor parent = new ExampleActor(system); + system.startActor(parent); + + // Parent creates a child actor + String childId = parent.createChildExampleActor(); + + // Get the child actor + Actor child = system.getActorById(childId); + + // Verify child was created + assertNotNull(child, "Child actor should be created"); + assertTrue(child instanceof ExampleActor2, "Child should be an ExampleActor2"); + + // Verify parent-child relationship + Actor parentOfChild = system.getParentActor(childId); + assertEquals(parent.getActorId(), parentOfChild.getActorId(), + "Parent of child should be the parent actor"); + } + + @Test + public void testSupervision() throws InterruptedException { + ActorSystem system = new ActorSystem(); + + // Create parent actor + ExampleActor parent = new ExampleActor(system); + system.startActor(parent); + + // Parent creates a child actor + String childId = parent.createChildExampleActor(); + ExampleActor2 child = (ExampleActor2) system.getActorById(childId); + + // Send a message that will cause an error + child.send(new Message("This will cause an error", parent.getActorId())); + + // Wait for error handling and supervision + Thread.sleep(500); + + // Verify parent received notification about child failure + boolean parentNotified = parent.getReceivedMessages().stream() + .anyMatch(msg -> msg.contains("failed")); + + assertTrue(parentNotified, "Parent should be notified about child failure"); + } }