1515 */
1616package io .micronaut .scheduling .processor ;
1717
18- import io .micronaut .context .ApplicationContext ;
1918import io .micronaut .context .BeanContext ;
20- import io .micronaut .context .Qualifier ;
2119import io .micronaut .context .bind .DefaultExecutableBeanContextBinder ;
2220import io .micronaut .context .bind .ExecutableBeanContextBinder ;
2321import io .micronaut .context .event .StartupEvent ;
2422import io .micronaut .context .exceptions .NoSuchBeanException ;
2523import io .micronaut .context .processor .ExecutableMethodProcessor ;
2624import io .micronaut .core .annotation .AnnotationValue ;
25+ import io .micronaut .core .annotation .Internal ;
2726import io .micronaut .core .bind .BoundExecutable ;
2827import io .micronaut .core .convert .ConversionService ;
2928import io .micronaut .core .type .Argument ;
4645
4746import java .io .Closeable ;
4847import java .time .Duration ;
48+ import java .util .ArrayList ;
4949import java .util .List ;
50- import java .util .Map ;
5150import java .util .Optional ;
5251import java .util .Queue ;
53- import java .util .concurrent .ConcurrentHashMap ;
5452import java .util .concurrent .ConcurrentLinkedDeque ;
5553import java .util .concurrent .ExecutorService ;
5654import java .util .concurrent .ScheduledExecutorService ;
5755import java .util .concurrent .ScheduledFuture ;
58- import java .util .concurrent .atomic .AtomicBoolean ;
5956
6057/**
6158 * A {@link ExecutableMethodProcessor} for the {@link Scheduled} annotation.
6259 *
6360 * @author graemerocher
6461 * @since 1.0
6562 */
63+ @ Internal
6664@ Singleton
67- public class ScheduledMethodProcessor implements ExecutableMethodProcessor <Scheduled >, Closeable {
65+ public final class ScheduledMethodProcessor implements ExecutableMethodProcessor <Scheduled >, Closeable {
6866
6967 private static final Logger LOG = LoggerFactory .getLogger (TaskScheduler .class );
7068 private static final String MEMBER_FIXED_RATE = "fixedRate" ;
@@ -78,50 +76,38 @@ public class ScheduledMethodProcessor implements ExecutableMethodProcessor<Sched
7876 private final BeanContext beanContext ;
7977 private final ConversionService conversionService ;
8078 private final Queue <ScheduledFuture <?>> scheduledTasks = new ConcurrentLinkedDeque <>();
81- private final Map <ScheduledDefinition , Runnable > scheduledMethods = new ConcurrentHashMap <>();
79+ private final List <ScheduledDefinition <?>> scheduledMethodsDefinitions = new ArrayList <>();
8280 private final TaskExceptionHandler <?, ?> taskExceptionHandler ;
83- private volatile boolean started = false ;
8481
8582 /**
8683 * @param beanContext The bean context for DI of beans annotated with @Inject
8784 * @param conversionService To convert one type to another
8885 * @param taskExceptionHandler The default task exception handler
8986 */
90- @ SuppressWarnings ("OptionalUsedAsFieldOrParameterType" )
91- public ScheduledMethodProcessor (BeanContext beanContext , Optional <ConversionService > conversionService , TaskExceptionHandler <?, ?> taskExceptionHandler ) {
87+ public ScheduledMethodProcessor (BeanContext beanContext , ConversionService conversionService , TaskExceptionHandler <?, ?> taskExceptionHandler ) {
9288 this .beanContext = beanContext ;
93- this .conversionService = conversionService . orElse ( ConversionService . SHARED ) ;
89+ this .conversionService = conversionService ;
9490 this .taskExceptionHandler = taskExceptionHandler ;
9591 }
9692
9793 @ Override
98- public void process (BeanDefinition <?> beanDefinition , ExecutableMethod <?, ?> method ) {
99- if (beanContext instanceof ApplicationContext ) {
100- ScheduledDefinition scheduledDefinition = new ScheduledDefinition (beanDefinition , method );
101- Runnable runnable = new ScheduleTaskRunnable (scheduledDefinition );
102- // process may be called during or after scheduleTasks. we need to guard against that.
103- if (scheduledMethods .putIfAbsent (scheduledDefinition , runnable ) == null && started ) {
104- runnable .run ();
105- }
106- }
94+ public <B > void process (BeanDefinition <B > beanDefinition , ExecutableMethod <B , ?> method ) {
95+ scheduledMethodsDefinitions .add (new ScheduledDefinition (beanDefinition , method ));
10796 }
10897
10998 /**
11099 * On startup event listener that schedules the active tasks.
111- * @param startupEvent The startup event.
100+ * @param ignore The startup event.
112101 */
113102 @ EventListener
114- void scheduleTasks (@ SuppressWarnings ("unused" ) StartupEvent startupEvent ) {
115- started = true ;
116- for (Runnable runnable : scheduledMethods .values ()) {
117- runnable .run ();
118- }
103+ void scheduleTasks (StartupEvent ignore ) {
104+ scheduledMethodsDefinitions .parallelStream ().forEach (this ::scheduleTask );
105+ scheduledMethodsDefinitions .clear ();
119106 }
120107
121- @ SuppressWarnings ("unchecked" )
122- private void scheduleTask (ScheduledDefinition scheduledDefinition ) {
123- ExecutableMethod <?, ?> method = scheduledDefinition .method ();
124- BeanDefinition <?> beanDefinition = scheduledDefinition .definition ();
108+ private <B > void scheduleTask (ScheduledDefinition <B > scheduledDefinition ) {
109+ ExecutableMethod <B , ?> method = scheduledDefinition .method ();
110+ BeanDefinition <B > beanDefinition = scheduledDefinition .definition ();
125111 List <AnnotationValue <Scheduled >> scheduledAnnotations = method .getAnnotationValuesByType (Scheduled .class );
126112 for (AnnotationValue <Scheduled > scheduledAnnotation : scheduledAnnotations ) {
127113 String fixedRate = scheduledAnnotation .stringValue (MEMBER_FIXED_RATE ).orElse (null );
@@ -148,26 +134,26 @@ private void scheduleTask(ScheduledDefinition scheduledDefinition) {
148134 Runnable task = () -> {
149135 try {
150136 ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder ();
151- BoundExecutable <? , ?> boundExecutable = binder .bind (method , beanContext );
152- Object bean = beanContext .getBean (( Argument < Object >) beanDefinition . asArgument (), ( Qualifier < Object >) beanDefinition . getDeclaredQualifier () );
137+ BoundExecutable <B , ?> boundExecutable = binder .bind (method , beanContext );
138+ B bean = beanContext .getBean (beanDefinition );
153139 AnnotationValue <Scheduled > finalAnnotationValue = scheduledAnnotation ;
154140 if (finalAnnotationValue instanceof EvaluatedAnnotationValue <Scheduled > evaluated ) {
155141 finalAnnotationValue = evaluated .withArguments (bean , boundExecutable .getBoundArguments ());
156142 }
157143 boolean shouldRun = finalAnnotationValue .booleanValue (MEMBER_CONDITION ).orElse (true );
158144 if (shouldRun ) {
159145 try {
160- (( BoundExecutable < Object , Object >) boundExecutable ) .invoke (bean );
146+ boundExecutable .invoke (bean );
161147 } catch (Throwable e ) {
162- handleException (( Class < Object >) beanDefinition .getBeanType (), bean , e );
148+ handleException (beanDefinition .getBeanType (), bean , e );
163149 }
164150 }
165151 } catch (NoSuchBeanException noSuchBeanException ) {
166152 // ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context
167153 // is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure.
168154 LOG .debug ("Scheduled job skipped for context shutdown: {}.{}" , beanDefinition .getBeanType ().getSimpleName (), method .getDescription (true ));
169155 } catch (Exception e ) {
170- TaskExceptionHandler finalHandler = findHandler (beanDefinition .getBeanType (), e );
156+ TaskExceptionHandler < B , Throwable > finalHandler = findHandler (beanDefinition .getBeanType (), e );
171157 finalHandler .handleCreationFailure (beanDefinition , e );
172158 }
173159 };
@@ -201,7 +187,6 @@ private void scheduleTask(ScheduledDefinition scheduledDefinition) {
201187 new SchedulerConfigurationException (method , "Invalid fixed delay definition: " + fixedDelay )
202188 );
203189
204-
205190 if (LOG .isDebugEnabled ()) {
206191 LOG .debug ("Scheduling fixed delay task [{}] for method: {}" , duration , method );
207192 }
@@ -218,13 +203,13 @@ private void scheduleTask(ScheduledDefinition scheduledDefinition) {
218203 }
219204 }
220205
221- private void handleException (Class <Object > beanType , Object bean , Throwable e ) {
222- TaskExceptionHandler <Object , Throwable > finalHandler = findHandler (beanType , e );
206+ private < B > void handleException (Class <B > beanType , B bean , Throwable e ) {
207+ TaskExceptionHandler <B , Throwable > finalHandler = findHandler (beanType , e );
223208 finalHandler .handle (bean , e );
224209 }
225210
226211 @ SuppressWarnings ("unchecked" )
227- private TaskExceptionHandler <Object , Throwable > findHandler (Class <? > beanType , Throwable e ) {
212+ private < B > TaskExceptionHandler <B , Throwable > findHandler (Class <B > beanType , Throwable e ) {
228213 return beanContext .findBean (Argument .of (TaskExceptionHandler .class , beanType , e .getClass ()))
229214 .orElse (this .taskExceptionHandler );
230215 }
@@ -239,31 +224,12 @@ public void close() {
239224 }
240225 }
241226 } finally {
242- this .scheduledTasks .clear ();
243- this .scheduledMethods .clear ();
227+ scheduledTasks .clear ();
244228 }
245229 }
246230
247- private record ScheduledDefinition (
248- BeanDefinition <?> definition ,
249- ExecutableMethod <?, ?> method ) { }
250-
251- /**
252- * This Runnable calls {@link #scheduleTask(ScheduledDefinition)} exactly once, even if invoked
253- * multiple times from multiple threads.
254- */
255- private class ScheduleTaskRunnable extends AtomicBoolean implements Runnable {
256- private final ScheduledDefinition definition ;
257-
258- ScheduleTaskRunnable (ScheduledDefinition definition ) {
259- this .definition = definition ;
260- }
261-
262- @ Override
263- public void run () {
264- if (compareAndSet (false , true )) {
265- scheduleTask (definition );
266- }
267- }
231+ private record ScheduledDefinition <B >(BeanDefinition <B > definition ,
232+ ExecutableMethod <B , ?> method ) {
268233 }
234+
269235}
0 commit comments