Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions apps/opik-backend/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,13 @@ batchOperations:
# Description: The maximal number of ids to be used for IN clause. Find requests with a larger number of ids will
# involve the use of temp tables for querying
maxExperimentInClauseSize: ${BATCH_OPERATIONS_MAX_EXPERIMENT_IN_CLAUSE_SIZE:-5000}
analytics:
# Default: 100
# Description: Chunk size for bulk update operations (traces, spans, threads). ClickHouse performs best with batch sizes of 100-1000 records per query.
bulkUpdateChunkSize: ${BATCH_OPERATIONS_BULK_UPDATE_CHUNK_SIZE:-100}
# Default: 1000
# Description: Buffer size for streaming operations to batch records for efficient processing
streamBufferSize: ${BATCH_OPERATIONS_STREAM_BUFFER_SIZE:-1000}

# Configuration for rate limit. This is not enabled by default for open source installations.
# If enabled, rate limit is applied to creation and update of various entities including traces, spans, projects,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.Set;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@Schema(description = "Request to batch update multiple spans")
public record SpanBatchUpdate(
@NotNull @NotEmpty @Size(min = 1, max = 1000) @Schema(description = "List of span IDs to update (max 1000)") Set<UUID> ids,
@NotNull @Valid @Schema(description = "Update to apply to all spans") SpanUpdate update,
@Schema(description = "If true, merge tags with existing tags instead of replacing them. Default: false") Boolean mergeTags) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.Set;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@Schema(description = "Request to batch update multiple traces")
public record TraceBatchUpdate(
@NotNull @NotEmpty @Size(min = 1, max = 1000) @Schema(description = "List of trace IDs to update (max 1000)") Set<UUID> ids,
@NotNull @Valid @Schema(description = "Update to apply to all traces") TraceUpdate update,
@Schema(description = "If true, merge tags with existing tags instead of replacing them. Default: false") Boolean mergeTags) {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.comet.opik.api;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.databind.PropertyNamingStrategies;
import com.fasterxml.jackson.databind.annotation.JsonNaming;
import io.swagger.v3.oas.annotations.media.Schema;
import jakarta.validation.Valid;
import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.NotNull;
import jakarta.validation.constraints.Size;
import lombok.Builder;

import java.util.Set;
import java.util.UUID;

@Builder(toBuilder = true)
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
@Schema(description = "Request to batch update multiple trace threads")
public record TraceThreadBatchUpdate(
@NotNull @NotEmpty @Size(min = 1, max = 1000) @Schema(description = "List of thread model IDs to update (max 1000)") Set<UUID> ids,
@NotNull @Valid @Schema(description = "Update to apply to all threads") TraceThreadUpdate update,
@Schema(description = "If true, merge tags with existing tags instead of replacing them. Default: false") Boolean mergeTags) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import com.comet.opik.api.ProjectStats;
import com.comet.opik.api.Span;
import com.comet.opik.api.SpanBatch;
import com.comet.opik.api.SpanBatchUpdate;
import com.comet.opik.api.SpanSearchStreamRequest;
import com.comet.opik.api.SpanUpdate;
import com.comet.opik.api.filter.FiltersFactory;
Expand Down Expand Up @@ -232,6 +233,28 @@ public Response createSpans(
return Response.noContent().build();
}

@PATCH
@Path("/batch")
@Operation(operationId = "batchUpdateSpans", summary = "Batch update spans", description = "Update multiple spans", responses = {
@ApiResponse(responseCode = "204", description = "No Content"),
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
@RateLimited
public Response batchUpdate(
@RequestBody(content = @Content(schema = @Schema(implementation = SpanBatchUpdate.class))) @Valid @NotNull SpanBatchUpdate batchUpdate) {

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Batch updating '{}' spans on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);

spanService.batchUpdate(batchUpdate)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

log.info("Batch updated '{}' spans on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);

return Response.noContent().build();
}

@PATCH
@Path("{id}")
@Operation(operationId = "updateSpan", summary = "Update span by id", description = "Update span by id", responses = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,11 @@
import com.comet.opik.api.Trace;
import com.comet.opik.api.Trace.TracePage;
import com.comet.opik.api.TraceBatch;
import com.comet.opik.api.TraceBatchUpdate;
import com.comet.opik.api.TraceSearchStreamRequest;
import com.comet.opik.api.TraceThread;
import com.comet.opik.api.TraceThreadBatchIdentifier;
import com.comet.opik.api.TraceThreadBatchUpdate;
import com.comet.opik.api.TraceThreadIdentifier;
import com.comet.opik.api.TraceThreadSearchStreamRequest;
import com.comet.opik.api.TraceThreadUpdate;
Expand Down Expand Up @@ -305,6 +307,28 @@ public Response createTraces(
return Response.noContent().build();
}

@PATCH
@Path("/batch")
@Operation(operationId = "batchUpdateTraces", summary = "Batch update traces", description = "Update multiple traces", responses = {
@ApiResponse(responseCode = "204", description = "No Content"),
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
@RateLimited
public Response batchUpdate(
@RequestBody(content = @Content(schema = @Schema(implementation = TraceBatchUpdate.class))) @Valid @NotNull TraceBatchUpdate batchUpdate) {

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Batch updating '{}' traces on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);

service.batchUpdate(batchUpdate)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

log.info("Batch updated '{}' traces on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);

return Response.noContent().build();
}

@PATCH
@Path("{id}")
@Operation(operationId = "updateTrace", summary = "Update trace by id", description = "Update trace by id", responses = {
Expand Down Expand Up @@ -787,6 +811,28 @@ public Response closeTraceThread(
return Response.noContent().build();
}

@PATCH
@Path("/threads/batch")
@Operation(operationId = "batchUpdateThreads", summary = "Batch update threads", description = "Update multiple threads", responses = {
@ApiResponse(responseCode = "204", description = "No Content"),
@ApiResponse(responseCode = "400", description = "Bad Request", content = @Content(schema = @Schema(implementation = ErrorMessage.class)))})
@RateLimited
public Response batchUpdateThreads(
@RequestBody(content = @Content(schema = @Schema(implementation = TraceThreadBatchUpdate.class))) @Valid @NotNull TraceThreadBatchUpdate batchUpdate) {

String workspaceId = requestContext.get().getWorkspaceId();

log.info("Batch updating '{}' threads on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);

traceThreadService.batchUpdate(batchUpdate)
.contextWrite(ctx -> setRequestContext(ctx, requestContext))
.block();

log.info("Batch updated '{}' threads on workspaceId '{}'", batchUpdate.ids().size(), workspaceId);

return Response.noContent().build();
}

@PATCH
@Path("/threads/{threadModelId}")
@Operation(operationId = "updateThread", summary = "Update thread", description = "Update thread", responses = {
Expand Down
Loading
Loading