Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
ac1fe9a
Add clear dataset operator that clears the dataset that is connected …
andreas-schultz Oct 8, 2024
237a1dc
Fix missing parameter in clear function
andreas-schultz Oct 9, 2024
8fb075f
Mark 'clear before execution' parameter as deprecated set default to …
andreas-schultz Oct 10, 2024
ceca6cf
Add clear output dataset button to transform and linking execution tab
andreas-schultz Oct 11, 2024
b8530a9
Merge remote-tracking branch 'origin/develop' into feature/datasetCle…
robertisele Oct 29, 2024
1d4bd2d
If a dataset is cleared, it's cache should be reloaded.
robertisele Oct 29, 2024
c8a66c8
Improve reports for clear dataset operator to make it more clear what…
robertisele Oct 29, 2024
0cc7c11
Merge remote-tracking branch 'origin/develop' into feature/datasetCle…
robertisele Aug 4, 2025
fa50fb5
Adaptations following merge of develop into feature branch
robertisele Aug 4, 2025
6d236a6
Adaptations clear methods of new datasets for clear dataset changes.
robertisele Aug 4, 2025
3a36128
Merge remote-tracking branch 'origin/develop' into feature/datasetCle…
robertisele Dec 4, 2025
23cefae
Fix dataset clear button component
andreas-schultz Dec 8, 2025
16f4348
Return empty path list for JSON and XML datasets when the resource do…
andreas-schultz Dec 11, 2025
d2542e7
Merge remote-tracking branch 'origin/develop' into feature/datasetCle…
robertisele Feb 9, 2026
7b3533b
refactor: update metadata type from IMetaData to IMetadata in Dataset…
robertisele Feb 9, 2026
a382693
Merge remote-tracking branch 'origin/feature/datasetClearOperator-CME…
robertisele Feb 9, 2026
50236f1
Remove duplicated endpoint and replace usages in UI code
andreas-schultz Feb 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions silk-core/src/main/scala/org/silkframework/config/Port.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,7 @@ case class FixedNumberOfInputs(ports: Seq[Port]) extends InputPorts
case class FlexibleNumberOfInputs(portDefinition: Port = FlexibleSchemaPort(),
min: Int = 0,
max: Option[Int] = None) extends InputPorts

object InputPorts {
final val NoInputPorts = FixedNumberOfInputs(Seq.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ object SilkVocab {

val RestTaskResultResponseBody: String = RestTaskResult + "/responseBody"

// Clear dataset
val ClearDatasetType: String = namespace + "ClearDatasetType"

// Empty table
val EmptySchemaType: String = namespace + "EmptySchemaType"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,5 @@ trait DataSink extends CloseableDataset {
*
* @param force Forces the clearing of the dataset. E.g. even when "clear before execution" flag is not set.
*/
def clear(force: Boolean = false)(implicit userContext: UserContext): Unit
def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = { }
}
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ case class DatasetSpec[+DatasetType <: Dataset](plugin: DatasetType,
/** Datasets don't define input schemata, because any data can be written to them. */
override def inputPorts: InputPorts = {
if(readOnly || characteristics.readOnly) {
FixedNumberOfInputs(Seq.empty)
InputPorts.NoInputPorts
} else if(characteristics.supportsMultipleWrites) {
FlexibleNumberOfInputs()
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ trait DirtyTrackingFileDataSink extends DataSink {
DirtyTrackingFileDataSink.addUpdatedFile(resource.name)
super.close()
}

abstract override def clear(force: Boolean)(implicit userContext: UserContext): Unit = {
DirtyTrackingFileDataSink.addUpdatedFile(resource.name)
super.clear(force)
}
}

object DirtyTrackingFileDataSink {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.silkframework.dataset.operations

import org.silkframework.config._
import org.silkframework.entity.EntitySchema
import org.silkframework.execution.EmptyEntityHolder
import org.silkframework.execution.local.LocalEntities
import org.silkframework.runtime.plugin.annotations.Plugin

@Plugin(
id = "clearDataset",
label = "Clear dataset",
description =
"""Clears the dataset that is connected to the output of this operator."""
)
case class ClearDatasetOperator() extends CustomTask {

/**
* The input ports and their schemata.
*/
override def inputPorts: InputPorts = InputPorts.NoInputPorts

/**
* The output port and it's schema.
* None, if this operator does not generate any output.
*/
override def outputPort: Option[Port] = Some(FixedSchemaPort(ClearDatasetOperator.clearDatasetSchema))
}

object ClearDatasetOperator {
private val clearDatasetSchema = EntitySchema(SilkVocab.ClearDatasetType, IndexedSeq.empty)

case class ClearDatasetTable(task: Task[TaskSpec]) extends LocalEntities with EmptyEntityHolder {
override def entitySchema: EntitySchema = clearDatasetSchema
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package org.silkframework.dataset.operations

import org.silkframework.config.{Task, TaskSpec}
import org.silkframework.dataset.operations.ClearDatasetOperator.ClearDatasetTable
import org.silkframework.execution.local.{LocalEntities, LocalExecution, LocalExecutor}
import org.silkframework.execution.{ExecutionReport, ExecutionReportUpdater, ExecutorOutput, SimpleExecutionReport}
import org.silkframework.runtime.activity.ActivityContext
import org.silkframework.runtime.plugin.PluginContext

/** Executes a clear dataset operator. */
case class ClearDatasetOperatorLocalExecutor() extends LocalExecutor[ClearDatasetOperator] {

override def execute(task: Task[ClearDatasetOperator],
inputs: Seq[LocalEntities],
output: ExecutorOutput,
execution: LocalExecution,
context: ActivityContext[ExecutionReport])
(implicit pluginContext: PluginContext): Option[LocalEntities] = {
context.value.update(SimpleExecutionReport(
task = task,
summary = Seq.empty,
warnings = Seq.empty,
error = None,
isDone = true,
entityCount = 1,
operation = Some("generate clear instruction"),
operationDesc = "clear instruction generated"
))
Some(ClearDatasetTable(task))
}
}

case class ClearDatasetOperatorExecutionReportUpdater(task: Task[TaskSpec],
context: ActivityContext[ExecutionReport]) extends ExecutionReportUpdater {

override def operationLabel: Option[String] = Some("clear dataset")

override def entityLabelSingle: String = "dataset"
override def entityLabelPlural: String = "datasets"
override def entityProcessVerb: String = "cleared"

override def minEntitiesBetweenUpdates: Int = 1

override def additionalFields(): Seq[(String, String)] = Seq(
"Cleared dataset" -> task.fullLabel
)
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import org.silkframework.config.{Prefixes, Task, TaskSpec}
import org.silkframework.dataset.CloseableDataset.using
import org.silkframework.dataset.DatasetSpec.{EntitySinkWrapper, GenericDatasetSpec}
import org.silkframework.dataset._
import org.silkframework.dataset.operations.ClearDatasetOperator.ClearDatasetTable
import org.silkframework.dataset.operations.ClearDatasetOperatorExecutionReportUpdater
import org.silkframework.dataset.bulk.{BulkResourceBasedDataset, ZipWritableResource}
import org.silkframework.dataset.rdf._
import org.silkframework.dataset.sql.SqlDataset
Expand Down Expand Up @@ -161,6 +163,8 @@ abstract class LocalDatasetExecutor[DatasetType <: Dataset] extends DatasetExecu
uploadFilesViaGraphStore(dataset, files.typedEntities, UploadFilesViaGspReportUpdater(dataset, context))
case SparqlUpdateEntitySchema(queries) =>
executeSparqlUpdateQueries(dataset, queries, execution)
case _: ClearDatasetTable =>
executeClearDataset(dataset)
case SqlUpdateEntitySchema(queries) =>
executeSqlStatement(dataset, queries, execution)
case et: LocalEntities =>
Expand Down Expand Up @@ -263,6 +267,17 @@ abstract class LocalDatasetExecutor[DatasetType <: Dataset] extends DatasetExecu
}
}

private def executeClearDataset(dataset: Task[DatasetSpec[DatasetType]])
(implicit userContext: UserContext, context: ActivityContext[ExecutionReport]): Unit = {
if(dataset.readOnly) {
throw new RuntimeException(s"Cannot clear dataset '${dataset.fullLabel}', because it is configured as read-only.")
}
val executionReport = ClearDatasetOperatorExecutionReportUpdater(dataset, context)
dataset.entitySink.clear(force = true)
executionReport.increaseEntityCounter()
executionReport.executionDone()
}

/** Buffers queries to make prediction about how many queries will be executed.
*
* @param bufferSize max size of queries that should be buffered
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import org.silkframework.config.Task.GenericTaskFormat
import org.silkframework.config.TaskSpec.TaskSpecXmlFormat
import org.silkframework.dataset.DatasetSpec.{DatasetSpecFormat, DatasetTaskXmlFormat}
import org.silkframework.dataset.VariableDataset
import org.silkframework.dataset.operations.{ClearDatasetOperator, ClearDatasetOperatorLocalExecutor}
import org.silkframework.dataset.operations.{AddProjectFilesOperator, DeleteFilesOperator, GetProjectFilesOperator, LocalAddProjectFilesOperatorExecutor, LocalDeleteFilesOperatorExecutor, LocalGetProjectFilesOperatorExecutor}
import org.silkframework.entity.EntitySchema.EntitySchemaFormat
import org.silkframework.entity.ValueType
Expand Down Expand Up @@ -49,6 +50,8 @@ class CorePlugins extends PluginModule {
classOf[LocalDeleteFilesOperatorExecutor] ::
classOf[GetProjectFilesOperator] ::
classOf[LocalGetProjectFilesOperatorExecutor] ::
classOf[ClearDatasetOperator] ::
classOf[ClearDatasetOperatorLocalExecutor] ::
Nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ case class CsvDataset (
quoteEscapeCharacter: String = "\"",
@Param(label = "ZIP file regex", value = "If the input resource is a ZIP file, files inside the file are filtered via this regex.", advanced = true)
override val zipFileRegex: String = CsvDataset.defaultZipFileRegex,
@Param(label = "Delete file before workflow execution",
value = "If set to true this will clear the specified file before executing a workflow that writes to it.",
@Param(label = "Delete file before workflow execution (deprecated)",
value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear the specified file before executing a workflow that writes to it.",
advanced = true)
clearBeforeExecution: Boolean = false,
@Param(label = "Trim whitespace and non-printable characters.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.silkframework.util.Uri
import java.io.{File, IOException}
import java.util.logging.Logger

class CsvSink(val resource: WritableResource, settings: CsvSettings) extends DataSink with DirtyTrackingFileDataSink {
class CsvSink(val resource: WritableResource, settings: CsvSettings) extends DirtyTrackingFileDataSink {
private val log: Logger = Logger.getLogger(getClass.getName)

@volatile
Expand Down Expand Up @@ -56,6 +56,7 @@ class CsvSink(val resource: WritableResource, settings: CsvSettings) extends Dat
case e: IOException =>
log.warning("IO exception occurred when deleting CRC file: " + e.getMessage)
}
super.clear(force)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ class JsonSink (val resource: WritableResource,
*/
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
resource.delete()
super.clear(force)
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,11 @@ class JsonSourceStreaming(taskId: Identifier, resource: Resource, basePath: Stri

protected def createParser(): JsonParser = {
val factory = new JsonFactoryBuilder().configure(StreamReadFeature.AUTO_CLOSE_SOURCE, true).build()
factory.createParser(resource.inputStream)
if(resource.exists) {
factory.createParser(resource.inputStream)
} else {
factory.createParser("{}")
}
}

override def retrieve(entitySchema: EntitySchema, limit: Option[Int])(implicit context: PluginContext): EntityHolder = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@ import org.silkframework.runtime.plugin.annotations.{Param, Plugin}
categories = Array(DatasetCategories.embedded),
description = "A Dataset that holds all data in-memory."
)
case class InMemoryDataset(@Param(label = "Clear graph before workflow execution",
value = "If set to true this will clear this dataset before it is used in a workflow execution.")
clearGraphBeforeExecution: Boolean = true) extends RdfDataset with TripleSinkDataset {
case class InMemoryDataset(@Param(label = "Clear graph before workflow execution (deprecated)",
value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear this dataset before it is used in a workflow execution.",
advanced = true)
clearGraphBeforeExecution: Boolean = false) extends RdfDataset with TripleSinkDataset {

private val model = ModelFactory.createDefaultModel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ import org.silkframework.runtime.plugin.annotations.{Param, Plugin}
@Plugin(id = "rdf", label = "RDF in-memory", description = "A Dataset where all entities are given directly in the configuration.")
case class RdfInMemoryDataset(data: String,
format: String,
@Param(label = "Clear graph before workflow execution",
value = "If set to true this will clear the specified graph before executing a workflow that writes to it.")
clearBeforeExecution: Boolean = true) extends RdfDataset with TripleSinkDataset {
@Param(label = "Clear graph before workflow execution (deprecated)",
value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to true this will clear the specified graph before executing a workflow that writes to it.",
advanced = true)
clearBeforeExecution: Boolean = false) extends RdfDataset with TripleSinkDataset {

private lazy val model = ModelFactory.createDefaultModel
model.read(new StringReader(data), null, format)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,9 @@ case class SparqlDataset(
strategy: EntityRetrieverStrategy = EntityRetrieverStrategy.parallel,
@Param("Enforces the correct ordering of values, if set to `true` (default).")
useOrderBy: Boolean = true,
@Param(label = "Clear graph before workflow execution",
value = "If set to `true`, this will clear the specified graph before executing a workflow that writes into it.")
@Param(label = "Clear graph before workflow execution (deprecated)",
value = "This is deprecated, use the 'Clear dataset' operator instead to clear a dataset in a workflow. If set to `true`, this will clear the specified graph before executing a workflow that writes into it.",
advanced = true)
clearGraphBeforeExecution: Boolean = false,
@Param(
label = "SPARQL query timeout (ms)",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ case class SparqlUpdateCustomTask(

override def inputPorts: InputPorts = {
if(isStaticTemplate) {
FixedNumberOfInputs(Seq.empty)
InputPorts.NoInputPorts
} else {
FixedNumberOfInputs(Seq(FixedSchemaPort(expectedInputSchema)))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ class LocalSparqlUpdateExecutorTest extends AnyFlatSpec with Matchers with TestW

class DummyTaskSpec(params: Map[String, String]) extends CustomTask {

override def inputPorts: InputPorts = FixedNumberOfInputs(Seq.empty)
override def inputPorts: InputPorts = InputPorts.NoInputPorts

override def outputPort: Option[Port] = None

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,6 @@ class XmlSink(val resource: WritableResource,
*/
override def clear(force: Boolean = false)(implicit userContext: UserContext): Unit = {
resource.delete()
super.clear(force)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,9 @@ class XmlSourceStreaming(file: Resource, basePath: String, uriPattern: String) e
paths.put(Nil, 0)
val idx = new AtomicInteger(1)
var currentPath = List[String]()
if(!file.exists) {
return Seq.empty
}
val inputStream = file.inputStream
try {
val reader: XMLStreamReader = initStreamReader(inputStream)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ case class AutoCompletableTestPlugin(@Param(value = "Some param", autoCompletion
autoCompleteValueWithLabels = true, allowOnlyAutoCompletedValues = true, autoCompletionDependsOnParameters = Array("otherParam"))
completableParam: String,
otherParam: String) extends CustomTask {
override def inputPorts: InputPorts = FixedNumberOfInputs(Seq.empty)
override def inputPorts: InputPorts = InputPorts.NoInputPorts
override def outputPort: Option[Port] = None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ object BlockingTask {

/** Task that blocks until externally released. */
case class BlockingTask() extends CustomTask {
override def inputPorts: InputPorts = FixedNumberOfInputs(Seq.empty)
override def inputPorts: InputPorts = InputPorts.NoInputPorts
override def outputPort: Option[Port] = None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ class DatasetApi @Inject() () extends InjectedController with UserContextActions
new ApiResponse(
responseCode = "404",
description = "If the specified project or dataset has not been found."
),
new ApiResponse(
responseCode = "409",
description = "If the dataset is currently configured as read-only. The user needs to change the config before trying again."
)
)
)
Expand All @@ -101,7 +105,7 @@ class DatasetApi @Inject() () extends InjectedController with UserContextActions
val sink = dataset.data.entitySink
sink.clear(force = true)
val typeCache = dataset.activity[TypesCache].control
// This will throw an exception if the previous cache execution as failed.
// This will throw an exception if the previous cache execution has failed.
Try(typeCache.waitUntilFinished())
Try(typeCache.start())
NoContent
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.silkframework.rule.TransformSpec
import org.silkframework.runtime.activity.UserContext
import org.silkframework.runtime.plugin.{ParameterValues, PluginContext}
import org.silkframework.runtime.serialization.ReadContext
import org.silkframework.runtime.validation.{BadUserInputException, RequestException}
import org.silkframework.runtime.validation.{BadUserInputException, ConflictRequestException, RequestException}
import org.silkframework.util.Uri
import org.silkframework.workbench.Context
import org.silkframework.workbench.utils.ErrorResult
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ class ActivityApiTest extends PlaySpec with ConfigTestTrait with IntegrationTest
}

case class MessageTask(message: String) extends CustomTask {
override def inputPorts: InputPorts = FixedNumberOfInputs(Seq.empty)
override def inputPorts: InputPorts = InputPorts.NoInputPorts
override def outputPort: Option[Port] = None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object CountingTask {
}
/** Task that counts its executions. */
case class CountingTask() extends CustomTask {
override def inputPorts: InputPorts = FixedNumberOfInputs(Seq.empty)
override def inputPorts: InputPorts = InputPorts.NoInputPorts
override def outputPort: Option[Port] = None
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,7 +759,7 @@ trait WorkspaceProviderTestTrait extends AnyFlatSpec with Matchers with MockitoS

@Plugin(id = "WorkspaceProviderTestTask", label = "test task")
case class TestCustomTask(stringParam: String, numberParam: Int) extends CustomTask {
override def inputPorts: InputPorts = FixedNumberOfInputs(Seq.empty)
override def inputPorts: InputPorts = InputPorts.NoInputPorts
override def outputPort: Option[Port] = None
}

Expand All @@ -776,7 +776,7 @@ object WorkspaceProviderTestPlugins {
throw new FailingTaskException("Failed!")
}

override def inputPorts: InputPorts = FixedNumberOfInputs(Seq.empty)
override def inputPorts: InputPorts = InputPorts.NoInputPorts

override def outputPort: Option[Port] = None
}
Expand Down
Loading