Skip to content

Commit 9effd56

Browse files
Merge pull request #14 from amazon-mq/md/functional-core
2 parents 79e3b5a + 00235b6 commit 9effd56

File tree

5 files changed

+777
-443
lines changed

5 files changed

+777
-443
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,3 +16,4 @@
1616
/xrefr
1717

1818
rabbitmq_stream_s3.d
19+
.rabbitmq_stream_s3.plt

include/rabbitmq_stream_s3.hrl

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
-define(SEGMENT_HEADER_B, 8).
77
-define(SEGMENT_VERSION, 1).
88
-define(SEGMENT_HEADER, <<"OSIL", ?SEGMENT_VERSION:32/unsigned>>).
9+
-define(SEGMENT_HEADER_HASH, erlang:crc32(?SEGMENT_HEADER)).
910
-define(IDX_HEADER_B, 8).
1011
-define(IDX_VERSION, 1).
1112
-define(IDX_HEADER, ?IDX_HEADER(<<>>)).
@@ -156,3 +157,81 @@
156157
-define(MAX_SEGMENT_SIZE_BYTES, 536_870_912).
157158

158159
-type byte_offset() :: non_neg_integer().
160+
-type checksum() :: non_neg_integer().
161+
162+
%% rabbitmq_stream_s3_log_manifest_machine types:
163+
164+
%% Set by `rabbit_stream_queue:make_stream_conf/1'.
165+
-type writer_ref() :: rabbit_amqqueue:name().
166+
167+
-record(fragment, {
168+
segment_offset :: osiris:offset(),
169+
segment_pos = ?SEGMENT_HEADER_B :: pos_integer(),
170+
%% Number of chunks in prior fragments and number in current fragment.
171+
num_chunks = {0, 0} :: {non_neg_integer(), non_neg_integer()},
172+
first_offset :: osiris:offset() | undefined,
173+
first_timestamp :: osiris:timestamp() | undefined,
174+
last_offset :: osiris:offset() | undefined,
175+
next_offset :: osiris:offset() | undefined,
176+
%% Zero-based increasing integer for sequence number within the segment.
177+
seq_no = 0 :: non_neg_integer(),
178+
%% NOTE: header size is not included.
179+
size = 0 :: non_neg_integer(),
180+
%% TODO: do checksum during upload if undefined.
181+
checksum = ?SEGMENT_HEADER_HASH :: checksum() | undefined
182+
}).
183+
184+
%% Events.
185+
186+
%% The writer has written enough data and the given fragment is ready to be
187+
%% handed off to the manifest.
188+
-record(fragment_available, {writer_ref :: writer_ref(), fragment :: #fragment{}}).
189+
%% The writer notified the manifest that the commit offset has moved forward.
190+
-record(commit_offset_increased, {writer_ref :: writer_ref(), offset :: osiris:offset()}).
191+
-record(fragment_uploaded, {writer_ref :: writer_ref(), info :: #fragment_info{}}).
192+
-record(manifest_uploaded, {dir :: file:filename_all()}).
193+
-record(manifest_rebalanced, {dir :: file:filename_all(), manifest :: #manifest{}}).
194+
-record(manifest_requested, {requester :: gen_server:from(), dir :: file:filename_all()}).
195+
-record(manifest_downloaded, {dir :: file:filename_all(), manifest :: #manifest{} | undefined}).
196+
-record(writer_spawned, {
197+
pid :: pid(),
198+
reply_tag :: gen_server:reply_tag(),
199+
writer_ref :: writer_ref(),
200+
dir :: file:filename_all()
201+
}).
202+
203+
-type event() ::
204+
#fragment_available{}
205+
| #commit_offset_increased{}
206+
| #fragment_uploaded{}
207+
| #manifest_uploaded{}
208+
| #manifest_rebalanced{}
209+
| #manifest_requested{}
210+
| #manifest_downloaded{}
211+
| #writer_spawned{}.
212+
213+
%% Effects.
214+
215+
-record(upload_fragment, {
216+
writer_ref :: writer_ref(), dir :: file:filename_all(), fragment :: #fragment{}
217+
}).
218+
-record(register_offset_listener, {writer_pid :: pid(), offset :: osiris:offset() | -1}).
219+
-record(upload_manifest, {dir :: file:filename_all(), manifest :: #manifest{}}).
220+
-record(rebalance_manifest, {
221+
dir :: file:filename_all(),
222+
kind :: rabbitmq_stream_s3_log_manifest_entry:kind(),
223+
size :: pos_integer(),
224+
new_group :: rabbitmq_stream_s3_log_manifest_entry:entries(),
225+
rebalanced :: rabbitmq_stream_s3_log_manifest_entry:entries(),
226+
manifest :: #manifest{}
227+
}).
228+
-record(download_manifest, {dir :: file:filename_all()}).
229+
-record(reply, {to :: gen_server:from(), response :: term()}).
230+
231+
-type effect() ::
232+
#download_manifest{}
233+
| #rebalance_manifest{}
234+
| #register_offset_listener{}
235+
| #reply{}
236+
| #upload_fragment{}
237+
| #upload_manifest{}.

0 commit comments

Comments
 (0)