Skip to content

Commit 7c7dfbf

Browse files
committed
fix: segcore collection schema update not concurrent safe. (#45337)
relate: #45345 Signed-off-by: aoiasd <[email protected]>
1 parent 3203119 commit 7c7dfbf

File tree

3 files changed

+28
-10
lines changed

3 files changed

+28
-10
lines changed

internal/core/src/common/Schema.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
#pragma once
1818

19+
#include <atomic>
1920
#include <cstdint>
2021
#include <memory>
2122
#include <optional>
@@ -358,5 +359,5 @@ class Schema {
358359
};
359360

360361
using SchemaPtr = std::shared_ptr<Schema>;
361-
362+
using SafeSchemaPtr = std::atomic<SchemaPtr*>;
362363
} // namespace milvus

internal/core/src/segcore/Collection.cpp

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ Collection::parse_schema(const void* schema_proto_blob,
7070
const uint64_t version) {
7171
Assert(schema_proto_blob != nullptr);
7272

73-
if (version <= schema_->get_schema_version()) {
73+
if (version <= get_schema_version()) {
7474
return;
7575
}
7676

@@ -79,14 +79,9 @@ Collection::parse_schema(const void* schema_proto_blob,
7979

8080
AssertInfo(suc, "parse schema proto failed");
8181

82-
auto old_schema = schema_;
83-
84-
schema_ = Schema::ParseFrom(collection_schema);
85-
schema_->set_schema_version(version);
86-
87-
if (old_schema) {
88-
schema_->UpdateLoadFields(old_schema->load_fields());
89-
}
82+
auto new_schema = Schema::ParseFrom(collection_schema);
83+
new_schema->set_schema_version(version);
84+
set_schema(new_schema);
9085
}
9186

9287
} // namespace milvus::segcore

internal/core/src/segcore/Collection.h

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
#pragma once
1313

1414
#include <memory>
15+
#include <shared_mutex>
1516
#include <string>
1617

1718
#include "common/Schema.h"
@@ -36,9 +37,29 @@ class Collection {
3637
public:
3738
SchemaPtr
3839
get_schema() {
40+
std::shared_lock lock(schema_mutex_);
3941
return schema_;
4042
}
4143

44+
uint64_t
45+
get_schema_version() {
46+
std::shared_lock lock(schema_mutex_);
47+
return schema_->get_schema_version();
48+
}
49+
50+
void
51+
set_schema(SchemaPtr& new_schema) {
52+
std::unique_lock lock(schema_mutex_);
53+
auto old_schema = schema_;
54+
if (new_schema->get_schema_version() > schema_->get_schema_version()) {
55+
schema_ = new_schema;
56+
}
57+
58+
if (old_schema) {
59+
schema_->UpdateLoadFields(old_schema->load_fields());
60+
}
61+
}
62+
4263
IndexMetaPtr&
4364
get_index_meta() {
4465
return index_meta_;
@@ -57,6 +78,7 @@ class Collection {
5778
private:
5879
std::string collection_name_;
5980
SchemaPtr schema_;
81+
std::shared_mutex schema_mutex_;
6082
IndexMetaPtr index_meta_;
6183
};
6284

0 commit comments

Comments
 (0)