getPaths() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public String toString() {
+ return toStringHelper(this).add("serviceName", serviceName).toString();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 1fac05012fe8..27a5ce3b7b79 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -101,6 +101,7 @@
import org.apache.iotdb.db.schemaengine.schemaregion.attribute.update.GeneralRegionAttributeSecurityService;
import org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.template.ClusterTemplateManager;
+import org.apache.iotdb.db.service.externalservice.ExternalServiceManagementService;
import org.apache.iotdb.db.service.metrics.DataNodeMetricsHelper;
import org.apache.iotdb.db.service.metrics.IoTDBInternalLocalReporter;
import org.apache.iotdb.db.storageengine.StorageEngine;
@@ -468,6 +469,8 @@ private void pullAndCheckSystemConfigurations() throws StartupException {
* 5. All Pipe information
*
*
6. All TTL information
+ *
+ *
7. All ExternalService information
*/
protected void storeRuntimeConfigurations(
List configNodeLocations, TRuntimeConfiguration runtimeConfiguration)
@@ -489,6 +492,12 @@ protected void storeRuntimeConfigurations(
/* Store triggerInformationList */
getTriggerInformationList(runtimeConfiguration.getAllTriggerInformation());
+ /* Store externalServiceEntryList */
+ resourcesInformationHolder.setExternalServiceEntryList(
+ runtimeConfiguration.isSetAllUserDefinedServiceInfo()
+ ? runtimeConfiguration.getAllUserDefinedServiceInfo()
+ : Collections.emptyList());
+
/* Store pipeInformationList */
getPipeInformationList(runtimeConfiguration.getAllPipeInformation());
@@ -1179,6 +1188,26 @@ private void getTriggerInformationList(List allTriggerInformation) {
}
}
+ private void prepareExternalServiceResources() throws StartupException {
+ long startTime = System.currentTimeMillis();
+ if (resourcesInformationHolder.getExternalServiceEntryList() == null
+ || resourcesInformationHolder.getExternalServiceEntryList().isEmpty()) {
+ return;
+ }
+
+ try {
+ ExternalServiceManagementService.getInstance()
+ .restoreUserDefinedServices(resourcesInformationHolder.getExternalServiceEntryList());
+ ExternalServiceManagementService.getInstance().restoreRunningServiceInstance();
+ } catch (Exception e) {
+ throw new StartupException(e);
+ }
+
+ logger.info(
+ "Prepare external-service resources successfully, which takes {} ms.",
+ System.currentTimeMillis() - startTime);
+ }
+
private void preparePipeResources() throws StartupException {
long startTime = System.currentTimeMillis();
PipeDataNodeAgent.runtime().preparePipeResources(resourcesInformationHolder);
@@ -1287,6 +1316,7 @@ private void initProtocols() throws StartupException {
if (IoTDBRestServiceDescriptor.getInstance().getConfig().isEnableRestService()) {
registerManager.register(RestService.getInstance());
}
+ prepareExternalServiceResources();
if (PipeConfig.getInstance().getPipeAirGapReceiverEnabled()) {
registerManager.register(PipeDataNodeAgent.receiver().airGap());
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
index 22d35777f378..d127f9da898f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/ResourcesInformationHolder.java
@@ -19,6 +19,7 @@
package org.apache.iotdb.db.service;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
import org.apache.iotdb.commons.pipe.agent.plugin.meta.PipePluginMeta;
import org.apache.iotdb.commons.trigger.TriggerInformation;
import org.apache.iotdb.commons.udf.UDFInformation;
@@ -37,6 +38,8 @@ public class ResourcesInformationHolder {
/** store the list when registering in config node for preparing pipe plugin related resources */
private List pipePluginMetaList;
+ private List externalServiceEntryList;
+
public static int getJarNumOfOneRpc() {
return JAR_NUM_OF_ONE_RPC;
}
@@ -64,4 +67,12 @@ public List getPipePluginMetaList() {
public void setPipePluginMetaList(List pipePluginMetaList) {
this.pipePluginMetaList = pipePluginMetaList;
}
+
+ public void setExternalServiceEntryList(List externalServiceEntryList) {
+ this.externalServiceEntryList = externalServiceEntryList;
+ }
+
+ public List getExternalServiceEntryList() {
+ return externalServiceEntryList;
+ }
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
new file mode 100644
index 000000000000..7bed3ffe4711
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/BuiltinExternalServices.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.externalservice;
+
+import java.util.function.Supplier;
+
+public enum BuiltinExternalServices {
+ MQTT(
+ "MQTT",
+ "org.apache.iotdb.externalservice.Mqtt",
+ // IoTDBDescriptor.getInstance().getConfig()::isEnableMQTTService
+ () -> false),
+ REST(
+ "REST",
+ "org.apache.iotdb.externalservice.Rest",
+ // IoTDBRestServiceDescriptor.getInstance().getConfig()::isEnableRestService
+ () -> false);
+
+ private final String serviceName;
+ private final String className;
+ private final Supplier enabledFunction;
+
+ BuiltinExternalServices(String serviceName, String className, Supplier enabledFunction) {
+ this.serviceName = serviceName;
+ this.className = className;
+ this.enabledFunction = enabledFunction;
+ }
+
+ public String getServiceName() {
+ return serviceName;
+ }
+
+ public String getClassName() {
+ return className;
+ }
+
+ public boolean isEnabled() {
+ return enabledFunction.get();
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceClassLoader.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceClassLoader.java
new file mode 100644
index 000000000000..976fecfa4112
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceClassLoader.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.externalservice;
+
+import org.apache.iotdb.commons.file.SystemFileFactory;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class ExternalServiceClassLoader extends URLClassLoader {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ExternalServiceClassLoader.class);
+
+ private final String libRoot;
+
+ public ExternalServiceClassLoader(String libRoot) throws IOException {
+ super(new URL[0]);
+ this.libRoot = libRoot;
+ LOGGER.info("External Service lib root: {}", libRoot);
+ addURLs();
+ }
+
+ private void addURLs() throws IOException {
+ try (Stream pathStream =
+ Files.walk(SystemFileFactory.INSTANCE.getFile(libRoot).toPath())) {
+ for (Path path :
+ pathStream.filter(path -> !path.toFile().isDirectory()).collect(Collectors.toList())) {
+ super.addURL(path.toUri().toURL());
+ }
+ }
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementException.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementException.java
new file mode 100644
index 000000000000..dc259ec3c64c
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.externalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+
+public class ExternalServiceManagementException extends IoTDBRuntimeException {
+
+ public ExternalServiceManagementException(TSStatus status) {
+ super(status);
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
new file mode 100644
index 000000000000..71f0df341a68
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/externalservice/ExternalServiceManagementService.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.service.externalservice;
+
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceEntry;
+import org.apache.iotdb.common.rpc.thrift.TExternalServiceListResp;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.client.exception.ClientManagerException;
+import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
+import org.apache.iotdb.commons.externalservice.ServiceInfo;
+import org.apache.iotdb.confignode.rpc.thrift.TCreateExternalServiceReq;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClient;
+import org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
+import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.externalservice.api.IExternalService;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.thrift.TException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.stream.Collectors;
+
+import static com.google.common.base.Preconditions.checkState;
+import static org.apache.iotdb.commons.externalservice.ServiceInfo.State.RUNNING;
+import static org.apache.iotdb.commons.externalservice.ServiceInfo.State.STOPPED;
+
+public class ExternalServiceManagementService {
+ @GuardedBy("lock")
+ private final Map serviceInfos;
+
+ private final String libRoot;
+
+ private final ReadWriteLock lock = new ReentrantReadWriteLock();
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(ExternalServiceManagementService.class);
+
+ private ExternalServiceManagementService(String libRoot) {
+ this.serviceInfos = new HashMap<>();
+ restoreBuiltInServices();
+ this.libRoot = libRoot;
+ }
+
+ public Iterator showService(int dataNodeId)
+ throws ClientManagerException, TException {
+ try {
+ lock.readLock().lock();
+
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TExternalServiceListResp resp = client.showExternalService(dataNodeId);
+ if (resp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(resp.getStatus().message, resp.getStatus().code);
+ }
+ return resp.getExternalServiceInfos().iterator();
+ }
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public void createService(String serviceName, String className)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ if (serviceInfos.containsKey(serviceName)) {
+ TSStatus status = new TSStatus(TSStatusCode.EXTERNAL_SERVICE_ALREADY_EXIST.getStatusCode());
+ status.setMessage(
+ String.format("Failed to create External Service %s, it already exists!", serviceName));
+ throw new ExternalServiceManagementException(status);
+ }
+
+ // 2. persist on CN
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TSStatus status =
+ client.createExternalService(
+ new TCreateExternalServiceReq(QueryId.getDataNodeId(), serviceName, className));
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+ }
+
+ // 3. modify memory info
+ serviceInfos.put(
+ serviceName,
+ new ServiceInfo(serviceName, className, ServiceInfo.ServiceType.USER_DEFINED));
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void startService(String serviceName) throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ TSStatus status = new TSStatus(TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode());
+ status.setMessage(
+ String.format(
+ "Failed to start External Service %s, because it is not existed!", serviceName));
+ throw new ExternalServiceManagementException(status);
+ }
+
+ // 2. call start method of ServiceInstance, create if Instance was not created
+ if (serviceInfo.getState() == RUNNING) {
+ return;
+ } else {
+ // The state is STOPPED
+ if (serviceInfo.getServiceInstance() == null) {
+ // lazy create Instance
+ serviceInfo.setServiceInstance(
+ createExternalServiceInstance(serviceName, serviceInfo.getClassName()));
+ }
+ serviceInfo.getServiceInstance().start();
+ }
+
+ // 3. persist on CN if service is user-defined, rollback if failed
+ if ((serviceInfo.getServiceType() == ServiceInfo.ServiceType.USER_DEFINED)) {
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TSStatus status = client.startExternalService(QueryId.getDataNodeId(), serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().stop();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+ }
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(RUNNING);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private IExternalService createExternalServiceInstance(String serviceName, String className) {
+ // close ClassLoader automatically to release the file handle
+ try (ExternalServiceClassLoader classLoader = new ExternalServiceClassLoader(libRoot); ) {
+ return (IExternalService)
+ Class.forName(className, true, classLoader).getDeclaredConstructor().newInstance();
+ } catch (IOException
+ | InstantiationException
+ | InvocationTargetException
+ | NoSuchMethodException
+ | IllegalAccessException
+ | ClassNotFoundException
+ | ClassCastException e) {
+ TSStatus status =
+ new TSStatus(TSStatusCode.EXTERNAL_SERVICE_INSTANCE_CREATE_ERROR.getStatusCode());
+ status.setMessage(
+ String.format(
+ "Failed to start External Service %s, because its instance can not be constructed successfully. Exception: %s",
+ serviceName, e));
+ LOGGER.warn(status.getMessage(), e);
+ throw new ExternalServiceManagementException(status);
+ }
+ }
+
+ public void stopService(String serviceName) throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ TSStatus status = new TSStatus(TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode());
+ status.setMessage(
+ String.format(
+ "Failed to stop External Service %s, because it is not existed!", serviceName));
+ throw new ExternalServiceManagementException(status);
+ }
+
+ // 2. call stop method of ServiceInstance
+ if (serviceInfo.getState() == STOPPED) {
+ return;
+ } else {
+ // The state is RUNNING
+ stopService(serviceInfo);
+ }
+
+ // 3. persist on CN if service is user-defined, rollback if failed
+ if ((serviceInfo.getServiceType() == ServiceInfo.ServiceType.USER_DEFINED)) {
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID); ) {
+ TSStatus status = client.stopExternalService(QueryId.getDataNodeId(), serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ serviceInfo.getServiceInstance().start();
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+ }
+ }
+
+ // 4. modify memory info
+ serviceInfo.setState(STOPPED);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ private void stopService(ServiceInfo serviceInfo) {
+ checkState(
+ serviceInfo.getServiceInstance() != null,
+ "External Service instance is null when state is RUNNING!",
+ serviceInfo.getServiceName());
+ serviceInfo.getServiceInstance().stop();
+ }
+
+ public void dropService(String serviceName, boolean forcedly)
+ throws ClientManagerException, TException {
+ try {
+ lock.writeLock().lock();
+
+ // 1. validate
+ ServiceInfo serviceInfo = serviceInfos.get(serviceName);
+ if (serviceInfo == null) {
+ TSStatus status = new TSStatus(TSStatusCode.NO_SUCH_EXTERNAL_SERVICE.getStatusCode());
+ status.setMessage(
+ String.format(
+ "Failed to drop External Service %s, because it is not existed!", serviceName));
+ throw new ExternalServiceManagementException(status);
+ }
+ if (serviceInfo.getServiceType() == ServiceInfo.ServiceType.BUILTIN) {
+ TSStatus status =
+ new TSStatus(TSStatusCode.CANNOT_DROP_BUILTIN_EXTERNAL_SERVICE.getStatusCode());
+ status.setMessage(
+ String.format(
+ "Failed to drop External Service %s, because it is BUILT-IN!", serviceName));
+ throw new ExternalServiceManagementException(status);
+ }
+
+ // 2. stop or fail when service are not stopped
+ if (serviceInfo.getState() == STOPPED) {
+ // do nothing
+ } else {
+ // The state is RUNNING
+ if (forcedly) {
+ try {
+ stopService(serviceInfo);
+ } catch (Exception e) {
+ // record errMsg if exception occurs during the stop of service
+ LOGGER.warn(
+ "Failed to stop External Service %s because %s. It will be drop forcedly",
+ serviceName, e.getMessage());
+ }
+ } else {
+ TSStatus status =
+ new TSStatus(TSStatusCode.CANNOT_DROP_RUNNING_EXTERNAL_SERVICE.getStatusCode());
+ status.setMessage(
+ String.format(
+ "Failed to drop External Service %s, because it is RUNNING!", serviceName));
+ throw new ExternalServiceManagementException(status);
+ }
+ }
+
+ // 3. persist on CN
+ try (ConfigNodeClient client =
+ ConfigNodeClientManager.getInstance().borrowClient(ConfigNodeInfo.CONFIG_REGION_ID)) {
+ TSStatus status = client.dropExternalService(QueryId.getDataNodeId(), serviceName);
+ if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+ throw new IoTDBRuntimeException(status.message, status.code);
+ }
+ }
+
+ // 4. modify memory info
+ serviceInfos.remove(serviceName);
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ public void restoreRunningServiceInstance() {
+ // Needn't use lock here, we use this method when active DN and there is no concurrent risk
+ serviceInfos
+ .values()
+ .forEach(
+ serviceInfo -> {
+ // start services with RUNNING state
+ if (serviceInfo.getState() == RUNNING) {
+ IExternalService serviceInstance =
+ createExternalServiceInstance(
+ serviceInfo.getServiceName(), serviceInfo.getClassName());
+ serviceInfo.setServiceInstance(serviceInstance);
+ serviceInstance.start();
+ }
+ });
+ }
+
+ public void restoreUserDefinedServices(List serviceEntryList) {
+ // Needn't use lock here, we use this method when active DN and there is no concurrent risk
+ for (TExternalServiceEntry serviceEntry : serviceEntryList) {
+ serviceInfos.put(
+ serviceEntry.getServiceName(),
+ new ServiceInfo(
+ serviceEntry.getServiceName(),
+ serviceEntry.getClassName(),
+ ServiceInfo.ServiceType.USER_DEFINED,
+ ServiceInfo.State.deserialize(serviceEntry.getState())));
+ }
+ }
+
+ private void restoreBuiltInServices() {
+ for (BuiltinExternalServices builtinExternalService : BuiltinExternalServices.values()) {
+ serviceInfos.put(
+ builtinExternalService.getServiceName(),
+ new ServiceInfo(
+ builtinExternalService.getServiceName(),
+ builtinExternalService.getClassName(),
+ ServiceInfo.ServiceType.BUILTIN,
+ builtinExternalService.isEnabled() ? RUNNING : STOPPED));
+ }
+ }
+
+ public List getBuiltInServices() {
+ try {
+ lock.readLock().lock();
+ return serviceInfos.values().stream()
+ .filter(serviceInfo -> serviceInfo.getServiceType() == ServiceInfo.ServiceType.BUILTIN)
+ .map(
+ serviceInfo ->
+ new TExternalServiceEntry(
+ serviceInfo.getServiceName(),
+ serviceInfo.getClassName(),
+ serviceInfo.getState().getValue(),
+ QueryId.getDataNodeId(),
+ ServiceInfo.ServiceType.BUILTIN.getValue()))
+ .collect(Collectors.toList());
+ } finally {
+ lock.readLock().unlock();
+ }
+ }
+
+ public static ExternalServiceManagementService getInstance() {
+ return ExternalServiceManagementServiceHolder.INSTANCE;
+ }
+
+ private static class ExternalServiceManagementServiceHolder {
+
+ private static final ExternalServiceManagementService INSTANCE =
+ new ExternalServiceManagementService(
+ IoTDBDescriptor.getInstance().getConfig().getExternalServiceDir());
+
+ private ExternalServiceManagementServiceHolder() {}
+ }
+}
diff --git a/iotdb-core/node-commons/pom.xml b/iotdb-core/node-commons/pom.xml
index 0bf2c01bfb56..6e54c6b0b4cf 100644
--- a/iotdb-core/node-commons/pom.xml
+++ b/iotdb-core/node-commons/pom.xml
@@ -45,6 +45,11 @@
common
${tsfile.version}