Skip to content

Commit a7ba3c0

Browse files
authored
Merge pull request #40 from montevideo-tech/feature/fetch-control-messages
fetch control message
2 parents 9c00d6b + 0db7661 commit a7ba3c0

File tree

3 files changed

+190
-6
lines changed

3 files changed

+190
-6
lines changed

lib/transport/client.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ export class Client {
4848

4949
// Send the setup message.
5050
await setup.send.client({
51-
versions: [Setup.Version.DRAFT_06],
51+
versions: [Setup.Version.DRAFT_07],
5252
role: this.config.role,
5353
})
5454

lib/transport/control.ts

Lines changed: 188 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import { Reader, Writer } from "./stream"
33
export type Message = Subscriber | Publisher
44

55
// Sent by subscriber
6-
export type Subscriber = Subscribe | Unsubscribe | AnnounceOk | AnnounceError
6+
export type Subscriber = Subscribe | Unsubscribe | AnnounceOk | AnnounceError | Fetch | FetchCancel
77

88
export function isSubscriber(m: Message): m is Subscriber {
99
return (
@@ -12,7 +12,7 @@ export function isSubscriber(m: Message): m is Subscriber {
1212
}
1313

1414
// Sent by publisher
15-
export type Publisher = SubscribeOk | SubscribeError | SubscribeDone | Announce | Unannounce
15+
export type Publisher = SubscribeOk | SubscribeError | SubscribeDone | Announce | Unannounce | FetchOk | FetchError
1616

1717
export function isPublisher(m: Message): m is Publisher {
1818
return (
@@ -39,6 +39,10 @@ export enum Msg {
3939
AnnounceError = "announce_error",
4040
Unannounce = "unannounce",
4141
GoAway = "go_away",
42+
Fetch = "fetch",
43+
FetchCancel = "fetch_cancel",
44+
FetchOk = "fetch_ok",
45+
FetchError = "fetch_error",
4246
}
4347

4448
enum Id {
@@ -56,6 +60,10 @@ enum Id {
5660
AnnounceError = 0x8,
5761
Unannounce = 0x9,
5862
GoAway = 0x10,
63+
Fetch = 0x16,
64+
FetchCancel = 0x17,
65+
FetchOk = 0x18,
66+
FetchError = 0x19,
5967
}
6068

6169
export interface Subscribe {
@@ -158,6 +166,42 @@ export interface Unannounce {
158166
namespace: string[]
159167
}
160168

169+
export interface Fetch {
170+
kind: Msg.Fetch
171+
id: bigint
172+
namespace: string[]
173+
name: string
174+
subscriber_priority: number
175+
group_order: GroupOrder
176+
start_group: number
177+
start_object: number
178+
end_group: number
179+
end_object: number
180+
params?: Parameters
181+
}
182+
183+
export interface FetchOk {
184+
kind: Msg.FetchOk
185+
id: bigint
186+
group_order: number
187+
end_of_track: number
188+
largest_group_id: bigint
189+
largest_object_id: bigint
190+
params?: Parameters
191+
}
192+
193+
export interface FetchError {
194+
kind: Msg.FetchError
195+
id: bigint
196+
code: bigint
197+
reason: string
198+
}
199+
200+
export interface FetchCancel {
201+
kind: Msg.FetchCancel
202+
id: bigint
203+
}
204+
161205
export class Stream {
162206
private decoder: Decoder
163207
private encoder: Encoder
@@ -244,9 +288,15 @@ export class Decoder {
244288
return Msg.Unannounce
245289
case Id.GoAway:
246290
return Msg.GoAway
291+
case Id.Fetch:
292+
return Msg.Fetch
293+
case Id.FetchCancel:
294+
return Msg.FetchCancel
295+
case Id.FetchOk:
296+
return Msg.FetchOk
297+
case Id.FetchError:
298+
return Msg.FetchError
247299
}
248-
249-
throw new Error(`unknown control message type: ${t}`)
250300
}
251301

252302
async message(): Promise<Message> {
@@ -259,7 +309,6 @@ export class Decoder {
259309
case Msg.SubscribeError:
260310
return this.subscribe_error()
261311
case Msg.SubscribeDone:
262-
return this.subscribe_done()
263312
case Msg.Unsubscribe:
264313
return this.unsubscribe()
265314
case Msg.Announce:
@@ -272,6 +321,14 @@ export class Decoder {
272321
return this.announce_error()
273322
case Msg.GoAway:
274323
throw new Error("TODO: implement go away")
324+
case Msg.Fetch:
325+
return this.fetch()
326+
case Msg.FetchCancel:
327+
return this.fetchCancel()
328+
case Msg.FetchOk:
329+
return this.fetchOk()
330+
case Msg.FetchError:
331+
return this.fetchError()
275332
}
276333
}
277334

@@ -450,6 +507,50 @@ export class Decoder {
450507
namespace: await this.r.tuple(),
451508
}
452509
}
510+
511+
private async fetch(): Promise<Fetch> {
512+
return {
513+
kind: Msg.Fetch,
514+
id: await this.r.u62(),
515+
namespace: await this.r.tuple(),
516+
name: await this.r.string(),
517+
subscriber_priority: await this.r.u8(),
518+
group_order: await this.decodeGroupOrder(),
519+
start_group: await this.r.u53(),
520+
start_object: await this.r.u53(),
521+
end_group: await this.r.u53(),
522+
end_object: await this.r.u53(),
523+
params: await this.parameters(),
524+
}
525+
}
526+
527+
private async fetchCancel(): Promise<FetchCancel> {
528+
return {
529+
kind: Msg.FetchCancel,
530+
id: await this.r.u62(),
531+
}
532+
}
533+
534+
private async fetchOk(): Promise<FetchOk> {
535+
return {
536+
kind: Msg.FetchOk,
537+
id: await this.r.u62(),
538+
group_order: await this.r.u8(),
539+
end_of_track: await this.r.u8(),
540+
largest_group_id: await this.r.u62(),
541+
largest_object_id: await this.r.u62(),
542+
params: await this.parameters(),
543+
}
544+
}
545+
546+
private async fetchError(): Promise<FetchError> {
547+
return {
548+
kind: Msg.FetchError,
549+
id: await this.r.u62(),
550+
code: await this.r.u62(),
551+
reason: await this.r.string(),
552+
}
553+
}
453554
}
454555

455556
export class Encoder {
@@ -479,6 +580,14 @@ export class Encoder {
479580
return this.announce_error(m)
480581
case Msg.Unannounce:
481582
return this.unannounce(m)
583+
case Msg.Fetch:
584+
return this.fetch(m)
585+
case Msg.FetchCancel:
586+
return this.fetchCancel(m)
587+
case Msg.FetchOk:
588+
return this.fetchOk(m)
589+
case Msg.FetchError:
590+
return this.fetchError(m)
482591
}
483592
}
484593

@@ -669,4 +778,78 @@ export class Encoder {
669778

670779
return this.w.concatBuffer(paramFields)
671780
}
781+
782+
async fetch(f: Fetch) {
783+
const buffer = new Uint8Array(8)
784+
785+
const msgData = this.w.concatBuffer([
786+
this.w.setVint62(buffer, f.id),
787+
this.w.encodeTuple(buffer, f.namespace),
788+
this.w.encodeString(buffer, f.name),
789+
this.w.setUint8(buffer, f.subscriber_priority),
790+
this.w.setUint8(buffer, f.group_order),
791+
this.w.setVint53(buffer, f.start_group),
792+
this.w.setVint53(buffer, f.start_object),
793+
this.w.setVint53(buffer, f.end_group),
794+
this.w.setVint53(buffer, f.end_object),
795+
this.encodeParameters(buffer, f.params),
796+
])
797+
798+
const messageType = this.w.setVint53(buffer, Id.Fetch)
799+
const messageLength = this.w.setVint53(buffer, msgData.length)
800+
801+
for (const elem of [messageType, messageLength, msgData]) {
802+
await this.w.write(elem)
803+
}
804+
}
805+
806+
async fetchCancel(fc: FetchCancel) {
807+
const buffer = new Uint8Array(8)
808+
809+
const msgData = this.w.concatBuffer([this.w.setVint62(buffer, fc.id)])
810+
811+
const messageType = this.w.setVint53(buffer, Id.FetchCancel)
812+
const messageLength = this.w.setVint53(buffer, msgData.length)
813+
814+
for (const elem of [messageType, messageLength, msgData]) {
815+
await this.w.write(elem)
816+
}
817+
}
818+
819+
async fetchOk(fo: FetchOk) {
820+
const buffer = new Uint8Array(8)
821+
822+
const msgData = this.w.concatBuffer([
823+
this.w.setVint62(buffer, fo.id),
824+
this.w.setUint8(buffer, fo.group_order),
825+
this.w.setUint8(buffer, fo.end_of_track),
826+
this.w.setVint62(buffer, fo.largest_group_id),
827+
this.w.setVint62(buffer, fo.largest_object_id),
828+
this.encodeParameters(buffer, fo.params),
829+
])
830+
831+
const messageType = this.w.setVint53(buffer, Id.FetchOk)
832+
const messageLength = this.w.setVint53(buffer, msgData.length)
833+
834+
for (const elem of [messageType, messageLength, msgData]) {
835+
await this.w.write(elem)
836+
}
837+
}
838+
839+
async fetchError(fe: FetchError) {
840+
const buffer = new Uint8Array(8)
841+
842+
const msgData = this.w.concatBuffer([
843+
this.w.setVint62(buffer, fe.id),
844+
this.w.setVint62(buffer, fe.code),
845+
this.w.encodeString(buffer, fe.reason),
846+
])
847+
848+
const messageType = this.w.setVint53(buffer, Id.FetchError)
849+
const messageLength = this.w.setVint53(buffer, msgData.length)
850+
851+
for (const elem of [messageType, messageLength, msgData]) {
852+
await this.w.write(elem)
853+
}
854+
}
672855
}

lib/transport/setup.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export enum Version {
1111
DRAFT_04 = 0xff000004,
1212
DRAFT_05 = 0xff000005,
1313
DRAFT_06 = 0xff000006,
14+
DRAFT_07 = 0xff000007,
1415
KIXEL_00 = 0xbad00,
1516
KIXEL_01 = 0xbad01,
1617
}

0 commit comments

Comments
 (0)