Skip to content

Commit ec61da5

Browse files
committed
support clustered index
1 parent aba6988 commit ec61da5

21 files changed

+762
-141
lines changed

pom.xml

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@
5757
<protobuf.version>3.18.0</protobuf.version>
5858
<log4j.version>1.2.17</log4j.version>
5959
<slf4j.version>1.7.16</slf4j.version>
60-
<grpc.version>1.60.0</grpc.version>
60+
<grpc.version>1.69.0</grpc.version>
6161
<netty.tcnative.version>2.0.34.Final</netty.tcnative.version>
6262
<gson.version>2.8.9</gson.version>
6363
<powermock.version>1.6.6</powermock.version>
@@ -164,6 +164,11 @@
164164
<artifactId>netty-tcnative-boringssl-static</artifactId>
165165
<version>${netty.tcnative.version}</version>
166166
</dependency>
167+
<dependency>
168+
<groupId>io.netty</groupId>
169+
<artifactId>netty-all</artifactId>
170+
<version>4.1.100.Final</version>
171+
</dependency>
167172
<dependency>
168173
<groupId>io.grpc</groupId>
169174
<artifactId>grpc-testing</artifactId>

src/main/java/org/tikv/common/Snapshot.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import java.util.stream.Collectors;
2929
import javax.annotation.Nonnull;
3030
import org.tikv.common.columnar.TiChunk;
31+
import org.tikv.common.handle.Handle;
3132
import org.tikv.common.key.Key;
3233
import org.tikv.common.meta.TiDAGRequest;
3334
import org.tikv.common.meta.TiTimestamp;
@@ -102,6 +103,7 @@ public Iterator<TiChunk> tableReadChunk(
102103
return getTiChunkIterator(dagRequest, tasks, getSession(), numOfRows);
103104
}
104105
}
106+
105107
/**
106108
* Issue a table read request
107109
*
@@ -126,7 +128,7 @@ public Iterator<Row> tableReadRow(TiDAGRequest dagRequest, long physicalId) {
126128
*/
127129
private Iterator<Row> tableReadRow(TiDAGRequest dagRequest, List<RegionTask> tasks) {
128130
if (dagRequest.isDoubleRead()) {
129-
Iterator<Long> iter = getHandleIterator(dagRequest, tasks, getSession());
131+
Iterator<Handle> iter = getHandleIterator(dagRequest, tasks, getSession());
130132
return new IndexScanIterator(this, dagRequest, iter);
131133
} else {
132134
return getRowIterator(dagRequest, tasks, getSession());
@@ -141,7 +143,7 @@ private Iterator<Row> tableReadRow(TiDAGRequest dagRequest, List<RegionTask> tas
141143
* @param tasks RegionTask of the coprocessor request to send
142144
* @return Row iterator to iterate over resulting rows
143145
*/
144-
public Iterator<Long> indexHandleRead(TiDAGRequest dagRequest, List<RegionTask> tasks) {
146+
public Iterator<Handle> indexHandleRead(TiDAGRequest dagRequest, List<RegionTask> tasks) {
145147
return getHandleIterator(dagRequest, tasks, session);
146148
}
147149

src/main/java/org/tikv/common/codec/Codec.java

Lines changed: 47 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -32,10 +32,18 @@
3232
import org.joda.time.LocalDate;
3333
import org.joda.time.LocalDateTime;
3434
import org.tikv.common.ExtendedDateTime;
35+
import org.tikv.common.exception.CodecException;
3536
import org.tikv.common.exception.ConvertOverflowException;
3637
import org.tikv.common.exception.InvalidCodecFormatException;
3738
import org.tikv.common.exception.TypeException;
3839
import org.tikv.common.exception.UnsupportedSyntaxException;
40+
import org.tikv.common.types.BytesType;
41+
import org.tikv.common.types.DataType;
42+
import org.tikv.common.types.DecimalType;
43+
import org.tikv.common.types.IntegerType;
44+
import org.tikv.common.types.JsonType;
45+
import org.tikv.common.types.RealType;
46+
import org.tikv.common.types.TimeType;
3947

4048
public class Codec {
4149

@@ -57,6 +65,41 @@ public static boolean isNullFlag(int flag) {
5765
return flag == NULL_FLAG;
5866
}
5967

68+
public static Object decodeOne(byte[] colData) {
69+
if (colData.length <= 1) {
70+
throw new CodecException("invalid encoded column data, length <=1");
71+
}
72+
int flag = colData[0];
73+
DataType tp;
74+
switch (flag) {
75+
case INT_FLAG:
76+
case UINT_FLAG:
77+
case VARINT_FLAG:
78+
case UVARINT_FLAG:
79+
tp = IntegerType.BIGINT;
80+
break;
81+
case FLOATING_FLAG:
82+
tp = RealType.DOUBLE;
83+
break;
84+
case BYTES_FLAG:
85+
case COMPACT_BYTES_FLAG:
86+
tp = BytesType.TEXT;
87+
break;
88+
case DECIMAL_FLAG:
89+
tp = DecimalType.DECIMAL;
90+
break;
91+
case DURATION_FLAG:
92+
tp = TimeType.TIME;
93+
break;
94+
case JSON_FLAG:
95+
tp = JsonType.JSON;
96+
break;
97+
default:
98+
throw new CodecException("Unknown type");
99+
}
100+
return tp.decode(new CodecDataInput(colData));
101+
}
102+
60103
public static class IntegerCodec {
61104

62105
private static long flipSignBit(long v) {
@@ -603,10 +646,10 @@ public static void writeDateTimeProto(
603646
* Read datetime from packed Long encoded as unsigned var-len integer converting into specified
604647
* timezone
605648
*
606-
* @see DateTimeCodec#fromPackedLong(long, DateTimeZone)
607649
* @param cdi codec buffer input
608650
* @param tz timezone to interpret datetime parts
609651
* @return decoded ExtendedDateTime using provided timezone
652+
* @see DateTimeCodec#fromPackedLong(long, DateTimeZone)
610653
*/
611654
public static ExtendedDateTime readFromUVarInt(CodecDataInput cdi, DateTimeZone tz) {
612655
return DateTimeCodec.fromPackedLong(IntegerCodec.readUVarLong(cdi), tz);
@@ -615,10 +658,10 @@ public static ExtendedDateTime readFromUVarInt(CodecDataInput cdi, DateTimeZone
615658
/**
616659
* Read datetime from packed Long as unsigned fixed-len integer
617660
*
618-
* @see DateTimeCodec#fromPackedLong(long, DateTimeZone)
619661
* @param cdi codec buffer input
620662
* @param tz timezone to interpret datetime parts
621663
* @return decoded ExtendedDateTime using provided timezone
664+
* @see DateTimeCodec#fromPackedLong(long, DateTimeZone)
622665
*/
623666
public static ExtendedDateTime readFromUInt(CodecDataInput cdi, DateTimeZone tz) {
624667
return DateTimeCodec.fromPackedLong(IntegerCodec.readULong(cdi), tz);
@@ -731,9 +774,9 @@ public static void writeDateProto(CodecDataOutput cdo, Date date, DateTimeZone t
731774
* Read date from packed Long encoded as unsigned var-len integer converting into specified
732775
* timezone
733776
*
734-
* @see DateCodec#fromPackedLong(long)
735777
* @param cdi codec buffer input
736778
* @return decoded DateTime using provided timezone
779+
* @see DateCodec#fromPackedLong(long)
737780
*/
738781
public static LocalDate readFromUVarInt(CodecDataInput cdi) {
739782
return DateCodec.fromPackedLong(IntegerCodec.readUVarLong(cdi));
@@ -742,9 +785,9 @@ public static LocalDate readFromUVarInt(CodecDataInput cdi) {
742785
/**
743786
* Read date from packed Long as unsigned fixed-len integer
744787
*
745-
* @see DateCodec#fromPackedLong(long)
746788
* @param cdi codec buffer input
747789
* @return decoded DateTime using provided timezone
790+
* @see DateCodec#fromPackedLong(long)
748791
*/
749792
public static LocalDate readFromUInt(CodecDataInput cdi) {
750793
return DateCodec.fromPackedLong(IntegerCodec.readULong(cdi));

src/main/java/org/tikv/common/codec/CodecDataInput.java

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,23 @@
1717

1818
package org.tikv.common.codec;
1919

20+
import static org.tikv.common.codec.Codec.BYTES_FLAG;
21+
import static org.tikv.common.codec.Codec.COMPACT_BYTES_FLAG;
22+
import static org.tikv.common.codec.Codec.DECIMAL_FLAG;
23+
import static org.tikv.common.codec.Codec.DURATION_FLAG;
24+
import static org.tikv.common.codec.Codec.FLOATING_FLAG;
25+
import static org.tikv.common.codec.Codec.INT_FLAG;
26+
import static org.tikv.common.codec.Codec.NULL_FLAG;
27+
import static org.tikv.common.codec.Codec.UINT_FLAG;
28+
2029
import com.google.protobuf.ByteString;
21-
import java.io.*;
30+
import java.io.ByteArrayInputStream;
31+
import java.io.DataInput;
32+
import java.io.DataInputStream;
33+
import java.io.IOException;
34+
import java.io.InputStream;
2235
import javax.annotation.Nonnull;
36+
import org.tikv.common.exception.CodecException;
2337

2438
public class CodecDataInput implements DataInput {
2539
protected final DataInputStream inputStream;
@@ -191,6 +205,48 @@ public String readUTF() {
191205
}
192206
}
193207

208+
/**
209+
* peek the first encoded value and return its length
210+
*
211+
* @return first encoded value
212+
*/
213+
public int cutOne() {
214+
if (available() < 1) {
215+
throw new CodecException("invalid encoded key");
216+
}
217+
int flag = readByte();
218+
int a1 = this.available();
219+
220+
switch (flag) {
221+
case NULL_FLAG:
222+
case INT_FLAG:
223+
case UINT_FLAG:
224+
case FLOATING_FLAG:
225+
case DURATION_FLAG:
226+
Codec.RealCodec.readDouble(this);
227+
break;
228+
case BYTES_FLAG:
229+
Codec.BytesCodec.readBytes(this);
230+
break;
231+
case COMPACT_BYTES_FLAG:
232+
Codec.BytesCodec.readCompactBytes(this);
233+
break;
234+
case DECIMAL_FLAG:
235+
Codec.DecimalCodec.readDecimal(this);
236+
break;
237+
// case VARINT_FLAG:
238+
// l = peekVarint(b);
239+
// case UVARINT_FLAG:
240+
// l = peekUvarint(b);
241+
// case JSON_FLAG:
242+
// l = json.PeekBytesAsJSON(b);
243+
default:
244+
throw new CodecException("invalid encoded key flag " + flag);
245+
}
246+
int a2 = this.available();
247+
return a1 - a2 + 1;
248+
}
249+
194250
public int peekByte() {
195251
mark(currentPos());
196252
int b = readByte() & 0xFF;

src/main/java/org/tikv/common/codec/TableCodec.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@
1919

2020
import java.util.List;
2121
import org.tikv.common.exception.CodecException;
22+
import org.tikv.common.handle.CommonHandle;
23+
import org.tikv.common.handle.Handle;
24+
import org.tikv.common.handle.IntHandle;
2225
import org.tikv.common.meta.TiColumnInfo;
2326
import org.tikv.common.meta.TiTableInfo;
2427
import org.tikv.common.row.Row;
@@ -42,7 +45,7 @@ public static byte[] encodeRow(
4245
return TableCodecV1.encodeRow(columnInfos, values, isPkHandle);
4346
}
4447

45-
public static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) {
48+
public static Object[] decodeObjects(byte[] value, Handle handle, TiTableInfo tableInfo) {
4649
if (value.length == 0) {
4750
throw new CodecException("Decode fails: value length is zero");
4851
}
@@ -52,7 +55,7 @@ public static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tabl
5255
return TableCodecV1.decodeObjects(value, handle, tableInfo);
5356
}
5457

55-
public static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) {
58+
public static Row decodeRow(byte[] value, Handle handle, TiTableInfo tableInfo) {
5659
if (value.length == 0) {
5760
throw new CodecException("Decode fails: value length is zero");
5861
}
@@ -62,7 +65,10 @@ public static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) {
6265
return TableCodecV1.decodeRow(value, handle, tableInfo);
6366
}
6467

65-
public static long decodeHandle(byte[] value) {
66-
return new CodecDataInput(value).readLong();
68+
public static Handle decodeHandle(byte[] value, boolean isCommonHandle) {
69+
if (isCommonHandle) {
70+
return new CommonHandle(value);
71+
}
72+
return new IntHandle(new CodecDataInput(value).readLong());
6773
}
6874
}

src/main/java/org/tikv/common/codec/TableCodecV1.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.util.HashMap;
2121
import java.util.List;
2222
import org.tikv.common.codec.Codec.IntegerCodec;
23+
import org.tikv.common.handle.Handle;
2324
import org.tikv.common.meta.TiColumnInfo;
2425
import org.tikv.common.meta.TiTableInfo;
2526
import org.tikv.common.row.ObjectRowImpl;
@@ -51,7 +52,7 @@ protected static byte[] encodeRow(
5152
return cdo.toBytes();
5253
}
5354

54-
protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) {
55+
protected static Object[] decodeObjects(byte[] value, Handle handle, TiTableInfo tableInfo) {
5556
if (handle == null && tableInfo.isPkHandle()) {
5657
throw new IllegalArgumentException("when pk is handle, handle cannot be null");
5758
}
@@ -85,7 +86,7 @@ protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo t
8586
return res;
8687
}
8788

88-
protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) {
89+
protected static Row decodeRow(byte[] value, Handle handle, TiTableInfo tableInfo) {
8990
return ObjectRowImpl.create(decodeObjects(value, handle, tableInfo));
9091
}
9192
}

src/main/java/org/tikv/common/codec/TableCodecV2.java

Lines changed: 28 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,10 @@
2020
import java.util.ArrayList;
2121
import java.util.HashMap;
2222
import java.util.List;
23+
import org.tikv.common.handle.Handle;
2324
import org.tikv.common.meta.TiColumnInfo;
25+
import org.tikv.common.meta.TiIndexColumn;
26+
import org.tikv.common.meta.TiIndexInfo;
2427
import org.tikv.common.meta.TiTableInfo;
2528
import org.tikv.common.row.ObjectRowImpl;
2629
import org.tikv.common.row.Row;
@@ -51,7 +54,7 @@ protected static byte[] encodeRow(
5154
return encoder.encode(columnInfoList, valueList);
5255
}
5356

54-
protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo tableInfo) {
57+
protected static Object[] decodeObjects(byte[] value, Handle handle, TiTableInfo tableInfo) {
5558
if (handle == null && tableInfo.isPkHandle()) {
5659
throw new IllegalArgumentException("when pk is handle, handle cannot be null");
5760
}
@@ -60,12 +63,33 @@ protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo t
6063
HashMap<Long, Object> decodedDataMap = new HashMap<>(colSize);
6164
org.tikv.common.codec.RowV2 rowV2 = org.tikv.common.codec.RowV2.createNew(value);
6265

66+
TiIndexInfo pk = tableInfo.getPrimaryKey();
67+
68+
if (pk != null) {
69+
List<TiColumnInfo> cols = new ArrayList<>();
70+
for (TiIndexColumn indexColumn : pk.getIndexColumns()) {
71+
TiColumnInfo col = tableInfo.getColumn(indexColumn.getOffset());
72+
cols.add(col);
73+
}
74+
if (tableInfo.isPkHandle()) {
75+
assert cols.size() == 1;
76+
decodedDataMap.put(cols.get(0).getId(), handle.data()[0]);
77+
}
78+
if (tableInfo.isCommonHandle()) {
79+
for (int i = 0; i < cols.size(); i++) {
80+
decodedDataMap.put(cols.get(i).getId(), handle.data()[i]);
81+
}
82+
}
83+
}
84+
6385
for (TiColumnInfo col : tableInfo.getColumns()) {
64-
if (col.isPrimaryKey() && tableInfo.isPkHandle()) {
86+
if (decodedDataMap.containsKey(col.getId())) {
87+
continue;
88+
} else if (col.isPrimaryKey() && tableInfo.isPkHandle()) {
6589
decodedDataMap.put(col.getId(), handle);
6690
continue;
6791
}
68-
org.tikv.common.codec.RowV2.ColIDSearchResult searchResult = rowV2.findColID(col.getId());
92+
RowV2.ColIDSearchResult searchResult = rowV2.findColID(col.getId());
6993
if (searchResult.isNull) {
7094
// current col is null, nothing should be added to decodedMap
7195
continue;
@@ -78,7 +102,6 @@ protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo t
78102
decodedDataMap.put(col.getId(), d);
79103
}
80104
}
81-
82105
Object[] res = new Object[colSize];
83106

84107
// construct Row with Map<ColumnID, Data> & handle
@@ -90,7 +113,7 @@ protected static Object[] decodeObjects(byte[] value, Long handle, TiTableInfo t
90113
return res;
91114
}
92115

93-
protected static Row decodeRow(byte[] value, Long handle, TiTableInfo tableInfo) {
116+
protected static Row decodeRow(byte[] value, Handle handle, TiTableInfo tableInfo) {
94117
return ObjectRowImpl.create(decodeObjects(value, handle, tableInfo));
95118
}
96119
}

0 commit comments

Comments
 (0)