Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ final class MapperMethodProcessor implements ExecutableMethodProcessor<Mapper> {
}

@Override
public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
public <B> void process(BeanDefinition<B> beanDefinition, ExecutableMethod<B, ?> method) {
Class<?>[] argumentTypes = method.getArgumentTypes();
if (method.hasDeclaredAnnotation(Mapper.class) && argumentTypes.length == 1) {
Class<Object> toType = (Class<Object>) method.getReturnType().getType();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
*/
package io.micronaut.scheduling.processor;

import io.micronaut.context.ApplicationContext;
import io.micronaut.context.BeanContext;
import io.micronaut.context.Qualifier;
import io.micronaut.context.bind.DefaultExecutableBeanContextBinder;
import io.micronaut.context.bind.ExecutableBeanContextBinder;
import io.micronaut.context.event.StartupEvent;
import io.micronaut.context.exceptions.NoSuchBeanException;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.bind.BoundExecutable;
import io.micronaut.core.convert.ConversionService;
import io.micronaut.core.type.Argument;
Expand All @@ -46,23 +45,22 @@

import java.io.Closeable;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* A {@link ExecutableMethodProcessor} for the {@link Scheduled} annotation.
*
* @author graemerocher
* @since 1.0
*/
@Internal
@Singleton
public class ScheduledMethodProcessor implements ExecutableMethodProcessor<Scheduled>, Closeable {

Expand All @@ -78,50 +76,38 @@ public class ScheduledMethodProcessor implements ExecutableMethodProcessor<Sched
private final BeanContext beanContext;
private final ConversionService conversionService;
private final Queue<ScheduledFuture<?>> scheduledTasks = new ConcurrentLinkedDeque<>();
private final Map<ScheduledDefinition, Runnable> scheduledMethods = new ConcurrentHashMap<>();
private final List<ScheduledDefinition<?>> scheduledMethodsDefinitions = new ArrayList<>();
private final TaskExceptionHandler<?, ?> taskExceptionHandler;
private volatile boolean started = false;

/**
* @param beanContext The bean context for DI of beans annotated with @Inject
* @param conversionService To convert one type to another
* @param taskExceptionHandler The default task exception handler
*/
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
public ScheduledMethodProcessor(BeanContext beanContext, Optional<ConversionService> conversionService, TaskExceptionHandler<?, ?> taskExceptionHandler) {
public ScheduledMethodProcessor(BeanContext beanContext, ConversionService conversionService, TaskExceptionHandler<?, ?> taskExceptionHandler) {
this.beanContext = beanContext;
this.conversionService = conversionService.orElse(ConversionService.SHARED);
this.conversionService = conversionService;
this.taskExceptionHandler = taskExceptionHandler;
}

@Override
public void process(BeanDefinition<?> beanDefinition, ExecutableMethod<?, ?> method) {
if (beanContext instanceof ApplicationContext) {
ScheduledDefinition scheduledDefinition = new ScheduledDefinition(beanDefinition, method);
Runnable runnable = new ScheduleTaskRunnable(scheduledDefinition);
// process may be called during or after scheduleTasks. we need to guard against that.
if (scheduledMethods.putIfAbsent(scheduledDefinition, runnable) == null && started) {
runnable.run();
}
}
public <B> void process(BeanDefinition<B> beanDefinition, ExecutableMethod<B, ?> method) {
scheduledMethodsDefinitions.add(new ScheduledDefinition(beanDefinition, method));
}

/**
* On startup event listener that schedules the active tasks.
* @param startupEvent The startup event.
* @param ignore The startup event.
*/
@EventListener
void scheduleTasks(@SuppressWarnings("unused") StartupEvent startupEvent) {
started = true;
for (Runnable runnable : scheduledMethods.values()) {
runnable.run();
}
void scheduleTasks(StartupEvent ignore) {
scheduledMethodsDefinitions.parallelStream().forEach(this::scheduleTask);
scheduledMethodsDefinitions.clear();
}

@SuppressWarnings("unchecked")
private void scheduleTask(ScheduledDefinition scheduledDefinition) {
ExecutableMethod<?, ?> method = scheduledDefinition.method();
BeanDefinition<?> beanDefinition = scheduledDefinition.definition();
private <B> void scheduleTask(ScheduledDefinition<B> scheduledDefinition) {
ExecutableMethod<B, ?> method = scheduledDefinition.method();
BeanDefinition<B> beanDefinition = scheduledDefinition.definition();
List<AnnotationValue<Scheduled>> scheduledAnnotations = method.getAnnotationValuesByType(Scheduled.class);
for (AnnotationValue<Scheduled> scheduledAnnotation : scheduledAnnotations) {
String fixedRate = scheduledAnnotation.stringValue(MEMBER_FIXED_RATE).orElse(null);
Expand All @@ -148,26 +134,26 @@ private void scheduleTask(ScheduledDefinition scheduledDefinition) {
Runnable task = () -> {
try {
ExecutableBeanContextBinder binder = new DefaultExecutableBeanContextBinder();
BoundExecutable<?, ?> boundExecutable = binder.bind(method, beanContext);
Object bean = beanContext.getBean((Argument<Object>) beanDefinition.asArgument(), (Qualifier<Object>) beanDefinition.getDeclaredQualifier());
BoundExecutable<B, ?> boundExecutable = binder.bind(method, beanContext);
B bean = beanContext.getBean(beanDefinition);
AnnotationValue<Scheduled> finalAnnotationValue = scheduledAnnotation;
if (finalAnnotationValue instanceof EvaluatedAnnotationValue<Scheduled> evaluated) {
finalAnnotationValue = evaluated.withArguments(bean, boundExecutable.getBoundArguments());
}
boolean shouldRun = finalAnnotationValue.booleanValue(MEMBER_CONDITION).orElse(true);
if (shouldRun) {
try {
((BoundExecutable<Object, Object>) boundExecutable).invoke(bean);
boundExecutable.invoke(bean);
} catch (Throwable e) {
handleException((Class<Object>) beanDefinition.getBeanType(), bean, e);
handleException(beanDefinition.getBeanType(), bean, e);
}
}
} catch (NoSuchBeanException noSuchBeanException) {
// ignore: a timing issue can occur when the context is being shutdown. If a scheduled job runs and the context
// is shutdown and available beans cleared then the bean is no longer available. The best thing to do here is just ignore the failure.
LOG.debug("Scheduled job skipped for context shutdown: {}.{}", beanDefinition.getBeanType().getSimpleName(), method.getDescription(true));
} catch (Exception e) {
TaskExceptionHandler finalHandler = findHandler(beanDefinition.getBeanType(), e);
TaskExceptionHandler<B, Throwable> finalHandler = findHandler(beanDefinition.getBeanType(), e);
finalHandler.handleCreationFailure(beanDefinition, e);
}
};
Expand Down Expand Up @@ -201,7 +187,6 @@ private void scheduleTask(ScheduledDefinition scheduledDefinition) {
new SchedulerConfigurationException(method, "Invalid fixed delay definition: " + fixedDelay)
);


if (LOG.isDebugEnabled()) {
LOG.debug("Scheduling fixed delay task [{}] for method: {}", duration, method);
}
Expand All @@ -218,13 +203,13 @@ private void scheduleTask(ScheduledDefinition scheduledDefinition) {
}
}

private void handleException(Class<Object> beanType, Object bean, Throwable e) {
TaskExceptionHandler<Object, Throwable> finalHandler = findHandler(beanType, e);
private <B> void handleException(Class<B> beanType, B bean, Throwable e) {
TaskExceptionHandler<B, Throwable> finalHandler = findHandler(beanType, e);
finalHandler.handle(bean, e);
}

@SuppressWarnings("unchecked")
private TaskExceptionHandler<Object, Throwable> findHandler(Class<?> beanType, Throwable e) {
private <B> TaskExceptionHandler<B, Throwable> findHandler(Class<B> beanType, Throwable e) {
return beanContext.findBean(Argument.of(TaskExceptionHandler.class, beanType, e.getClass()))
.orElse(this.taskExceptionHandler);
}
Expand All @@ -239,31 +224,12 @@ public void close() {
}
}
} finally {
this.scheduledTasks.clear();
this.scheduledMethods.clear();
scheduledTasks.clear();
}
}

private record ScheduledDefinition(
BeanDefinition<?> definition,
ExecutableMethod<?, ?> method) { }

/**
* This Runnable calls {@link #scheduleTask(ScheduledDefinition)} exactly once, even if invoked
* multiple times from multiple threads.
*/
private class ScheduleTaskRunnable extends AtomicBoolean implements Runnable {
private final ScheduledDefinition definition;

ScheduleTaskRunnable(ScheduledDefinition definition) {
this.definition = definition;
}

@Override
public void run() {
if (compareAndSet(false, true)) {
scheduleTask(definition);
}
}
private record ScheduledDefinition<B>(BeanDefinition<B> definition,
ExecutableMethod<B, ?> method) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ BeanMethodElement executable() {
default @NonNull
BeanMethodElement executable(boolean processOnStartup) {
annotate(Executable.class, (builder) ->
builder.member("processOnStartup", processOnStartup)
builder.member(Executable.MEMBER_PROCESS_ON_STARTUP, processOnStartup)
);
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@
import io.micronaut.context.annotation.Executable;
import io.micronaut.context.annotation.Property;
import io.micronaut.context.annotation.Value;
import io.micronaut.context.processor.ExecutableMethodProcessor;
import io.micronaut.core.annotation.AnnotationClassValue;
import io.micronaut.core.annotation.AnnotationMetadata;
import io.micronaut.core.annotation.AnnotationUtil;
import io.micronaut.core.annotation.AnnotationValue;
import io.micronaut.core.annotation.Internal;
import io.micronaut.core.annotation.NextMajorVersion;
import io.micronaut.core.annotation.NonNull;
Expand Down Expand Up @@ -75,6 +77,28 @@ class DeclaredBeanElementCreator extends AbstractBeanElementCreator {
protected DeclaredBeanElementCreator(ClassElement classElement, VisitorContext visitorContext, boolean isAopProxy) {
super(classElement, visitorContext);
this.isAopProxy = isAopProxy;

wantOfIncorrectUseOfExecutableMethodProcessor(classElement, visitorContext);
}

private void wantOfIncorrectUseOfExecutableMethodProcessor(ClassElement classElement, VisitorContext visitorContext) {
Map<String, ClassElement> processor = classElement.getTypeArguments(ExecutableMethodProcessor.class);
if (processor == null || processor.isEmpty()) {
return;
}
ClassElement annotation = processor.get("A");
if (annotation != null) {
AnnotationValue<Executable> executable = annotation.getAnnotation(Executable.class);
if (executable != null && executable.booleanValue(Executable.MEMBER_PROCESS_ON_STARTUP).orElse(false)) {
return; // Correct ExecutableMethodProcessor should have @Executable(processOnStartup=true)
}
}
String message = "ExecutableMethodProcessor is supposed to be used with an annotation that has @Executable(processOnStartup = true). In the future version this will be an error.";
visitorContext.warn(
message,
classElement
);
classElement.annotate(Deprecated.class, builder -> builder.value(message));
}

@Override
Expand Down Expand Up @@ -395,7 +419,7 @@ private boolean visitAopAndExecutableMethod(BeanDefinitionVisitor visitor, Metho
return false;
}
// This method requires pre-processing. See Executable#processOnStartup()
boolean preprocess = methodElement.isTrue(Executable.class, "processOnStartup");
boolean preprocess = methodElement.isTrue(Executable.class, Executable.MEMBER_PROCESS_ON_STARTUP);
if (preprocess) {
visitor.setRequiresMethodProcessing(true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,7 @@ private void configureInjectionPoints(BeanDefinitionVisitor beanDefinitionWriter
executableMethod,
visitorContext
);
if (executableMethod.getAnnotationMetadata().isTrue(Executable.class, "processOnStartup")) {
if (executableMethod.getAnnotationMetadata().isTrue(Executable.class, Executable.MEMBER_PROCESS_ON_STARTUP)) {
beanDefinitionWriter.setRequiresMethodProcessing(true);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import io.micronaut.context.annotation.EachBean;
import io.micronaut.context.annotation.EachProperty;
import io.micronaut.context.annotation.Infrastructure;
import io.micronaut.context.annotation.Executable;
import io.micronaut.context.annotation.InjectScope;
import io.micronaut.context.annotation.Parallel;
import io.micronaut.context.annotation.Parameter;
Expand Down Expand Up @@ -632,6 +633,7 @@ public final class BeanDefinitionWriter implements ClassOutputWriter, BeanDefini
private static final Constructor<AbstractExecutableMethod> ABSTRACT_EXECUTABLE_METHOD_CONSTRUCTOR = ReflectionUtils.getRequiredInternalConstructor(AbstractExecutableMethod.class, Class.class, String.class);
private static final Method GET_TYPE_PARAMETERS_METHOD = ReflectionUtils.getRequiredInternalMethod(TypeVariableResolver.class, "getTypeParameters");
private static final Method ARGUMENT_OF_METHOD = ReflectionUtils.getRequiredInternalMethod(Argument.class, "of", Class.class);
private static final Method BD_GET_INDEXES_OF_EXECUTABLE_METHODS_FOR_PROCESSING = ReflectionUtils.getRequiredInternalMethod(AbstractInitializableBeanDefinition.class, "getIndexesOfExecutableMethodsForProcessing");
private static final Method GET_INDEXES_METHOD = ReflectionUtils.getRequiredMethod(BeanDefinitionReference.class, "getIndexes");
private static final Method IS_PARALLEL_METHOD = ReflectionUtils.getRequiredMethod(BeanDefinitionReference.class, "isParallel");
private static final Method IS_ASSIGNABLE_METHOD = ReflectionUtils.getRequiredMethod(Class.class, "isAssignableFrom", Class.class);
Expand Down Expand Up @@ -1330,6 +1332,19 @@ public void visitBeanDefinitionEnd() {

loadTypeMethods.values().forEach(classDefBuilder::addMethod);

if (requiresMethodProcessing() && executableMethodsDefinitionWriter != null) {
int methodsCount = executableMethodsDefinitionWriter.getMethodsCount();
List<ExpressionDef> expressions = new ArrayList<>(methodsCount);
for (int i = 0; i < methodsCount; i++) {
MethodElement method = executableMethodsDefinitionWriter.getMethodByIndex(i);
if (method.booleanValue(Executable.class, Executable.MEMBER_PROCESS_ON_STARTUP).orElse(false)) {
expressions.add(TypeDef.Primitive.INT.constant(i));
}
}
classDefBuilder.addMethod(MethodDef.override(BD_GET_INDEXES_OF_EXECUTABLE_METHODS_FOR_PROCESSING)
.build((aThis, methodParameters) -> TypeDef.Primitive.INT.array().instantiate(expressions).returning()));
}

output = new LinkedHashMap<>();
// Generate the bytecode in the round it's being invoked
generateFiles(classDefBuilder.build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -457,4 +457,25 @@ private static String toTypeString(ClassElement p) {
}
return name;
}

/**
* Retrieves the total count of methods.
*
* @return The number of methods available.
* @since 5.0
*/
public int getMethodsCount() {
return methodDispatchWriter.getDispatchTargets().size();
}

/**
* Retrieves the `MethodElement` at the specified index.
*
* @param index The index of the method to retrieve.
* @return The `MethodElement` corresponding to the specified index.
* @since 5.0
*/
public MethodElement getMethodByIndex(int index) {
return methodDispatchWriter.getDispatchTargets().get(index).getMethodElement();
}
}
Loading
Loading