Skip to content

Commit 6189a02

Browse files
Merge pull request #15 from amazon-mq/md/dialyze
2 parents 98d0c1d + 1303b3d commit 6189a02

File tree

4 files changed

+68
-41
lines changed

4 files changed

+68
-41
lines changed

.github/workflows/build-test.yaml

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,5 +41,13 @@ jobs:
4141
- uses: actions/checkout@v5
4242
with:
4343
path: rabbitmq-server/deps/rabbitmq_stream_s3
44-
- run: make -C ${{ github.workspace }}/rabbitmq-server/deps/rabbitmq_stream_s3
45-
- run: make -C ${{ github.workspace }}/rabbitmq-server/deps/rabbitmq_stream_s3 tests
44+
- uses: actions/cache@v4
45+
with:
46+
path: ${{ github.workspace }}/rabbitmq-server/deps/rabbitmq_stream_s3/.rabbitmq_stream_s3.plt
47+
key: plt-${{ matrix.rmq-version }}-otp-${{ matrix.otp-version }}-elixir-${{ matrix.elixir-version }}
48+
- name: make
49+
run: make -C ${{ github.workspace }}/rabbitmq-server/deps/rabbitmq_stream_s3
50+
- name: tests
51+
run: make -C ${{ github.workspace }}/rabbitmq-server/deps/rabbitmq_stream_s3 tests
52+
- name: dialyze
53+
run: make -C ${{ github.workspace }}/rabbitmq-server/deps/rabbitmq_stream_s3 dialyze

Makefile

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@ PROJECT = rabbitmq_stream_s3
22
PROJECT_DESCRIPTION = RabbitMQ S3 plugin
33
PROJECT_MOD = rabbitmq_stream_s3_app
44

5-
DEPS = rabbit_common rabbit osiris
5+
DEPS = rabbit_common rabbit osiris rabbitmq_aws
66
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers
77

88
DEP_EARLY_PLUGINS = rabbit_common/mk/rabbitmq-early-plugin.mk
99
DEP_PLUGINS = rabbit_common/mk/rabbitmq-plugin.mk
1010

11+
PLT_APPS += ssl
12+
1113
include ../../rabbitmq-components.mk
1214
include ../../erlang.mk

src/rabbitmq_stream_s3_api.erl

Lines changed: 39 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -23,40 +23,41 @@
2323
-include_lib("rabbit_common/include/rabbit.hrl").
2424
-include_lib("kernel/include/logger.hrl").
2525

26-
-define(DEFAULT_OPTS, []).
27-
28-
-spec put_object(term(), binary(), binary(), binary()) ->
26+
-spec put_object(rabbitmq_aws:connection_handle(), binary(), binary(), iodata()) ->
2927
ok | {error, term()}.
3028
put_object(Handle, Bucket, Key, Object) ->
3129
put_object(Handle, Bucket, Key, Object, []).
3230

33-
-spec put_object(term(), binary(), binary(), binary(), list()) ->
31+
-spec put_object(rabbitmq_aws:connection_handle(), binary(), binary(), iodata(), list()) ->
3432
ok | {error, term()}.
35-
put_object(Handle, Bucket, Key, Object, Opts) ->
33+
put_object(Handle, Bucket, Key, Object, Opts) when is_list(Opts) ->
3634
Path = object_path(Bucket, Key),
37-
Headers =
35+
{Headers, Opts1} =
3836
case proplists:get_value(crc32, Opts, []) of
3937
[] ->
40-
[];
38+
{[], Opts};
4139
Checksum when is_integer(Checksum) ->
4240
C = base64:encode_to_string(<<Checksum:32/unsigned>>),
43-
[{"x-amz-checksum-crc32", C}]
41+
O = proplists:delete(crc32, Opts),
42+
{[{"x-amz-checksum-crc32", C}], O}
4443
end,
45-
case rabbitmq_aws:put(Handle, Path, Object, Headers, [?DEFAULT_OPTS | Opts]) of
44+
case rabbitmq_aws:put(Handle, Path, Object, Headers, Opts1) of
4645
{ok, {_Headers, <<>>}} ->
4746
ok;
4847
Error ->
4948
{error, Error}
5049
end.
5150

52-
-spec get_object(term(), binary(), binary()) ->
51+
-spec get_object(rabbitmq_aws:connection_handle(), binary(), binary()) ->
5352
{ok, binary()} | {error, term()}.
5453
get_object(Handle, Bucket, Key) ->
5554
get_object(Handle, Bucket, Key, []).
5655

56+
-spec get_object(rabbitmq_aws:connection_handle(), binary(), binary(), list()) ->
57+
{ok, binary()} | {error, term()}.
5758
get_object(Handle, Bucket, Key, Opts) ->
5859
Path = object_path(Bucket, Key),
59-
case rabbitmq_aws:get(Handle, Path, [], [?DEFAULT_OPTS | Opts]) of
60+
case rabbitmq_aws:get(Handle, Path, [], Opts) of
6061
{ok, {_Headers, Body}} ->
6162
{ok, Body};
6263
{error, "Not Found", _} ->
@@ -65,27 +66,33 @@ get_object(Handle, Bucket, Key, Opts) ->
6566
{error, Error}
6667
end.
6768

68-
-spec get_object_attributes(term(), binary(), binary()) ->
69-
{ok, proplists:proplist()} | {error, term()}.
69+
-spec get_object_attributes(rabbitmq_aws:connection_handle(), binary(), binary()) ->
70+
{ok, [{binary(), binary()}]} | {error, term()}.
7071
get_object_attributes(Handle, Bucket, Key) ->
7172
get_object_attributes(Handle, Bucket, Key, []).
7273
get_object_attributes(Handle, Bucket, Key, Opts) ->
7374
get_object_attributes(Handle, Bucket, Key, [], Opts).
7475

75-
-spec get_object_attributes(term(), binary(), binary(), [string()], [term()]) ->
76-
{ok, proplists:proplist()} | {error, term()}.
76+
-spec get_object_attributes(
77+
rabbitmq_aws:connection_handle(),
78+
binary(),
79+
binary(),
80+
[binary()],
81+
[term()]
82+
) ->
83+
{ok, [{binary(), binary()}]} | {error, term()}.
7784
get_object_attributes(Handle, Bucket, Key, Attributes, Opts) ->
7885
Path = object_path(Bucket, Key),
79-
case rabbitmq_aws:request(Handle, head, Path, <<"">>, [], [?DEFAULT_OPTS | Opts]) of
86+
case rabbitmq_aws:request(Handle, head, Path, <<"">>, [], Opts) of
8087
{ok, {Headers, _Body}} ->
81-
{ok, parse_head_response_headers(Headers, Attributes)};
88+
{ok, filter_attributes(Headers, Attributes)};
8289
{error, "Not Found", _} ->
8390
{error, not_found};
8491
Error ->
8592
{error, Error}
8693
end.
8794

88-
-spec get_object_with_range(term(), binary(), binary(), range_spec()) ->
95+
-spec get_object_with_range(rabbitmq_aws:connection_handle(), binary(), binary(), range_spec()) ->
8996
{ok, binary()} | {error, term()}.
9097
get_object_with_range(Handle, Bucket, Key, RangeSpec) ->
9198
get_object_with_range(Handle, Bucket, Key, RangeSpec, []).
@@ -94,7 +101,7 @@ get_object_with_range(Handle, Bucket, Key, RangeSpec0, Opts) ->
94101
Path = object_path(Bucket, Key),
95102
RangeValue = range_specifier(RangeSpec0),
96103
Headers = [{"Range", lists:flatten(["bytes=" | RangeValue])}],
97-
case rabbitmq_aws:get(Handle, Path, Headers, [?DEFAULT_OPTS | Opts]) of
104+
case rabbitmq_aws:get(Handle, Path, Headers, Opts) of
98105
{ok, {_Headers, Body}} ->
99106
{ok, Body};
100107
{error, "Not Found", _} ->
@@ -112,14 +119,21 @@ range_specifier(SuffixLen) when is_integer(SuffixLen) andalso SuffixLen < 0 ->
112119
%% ~b will format the '-' for us.
113120
io_lib:format("~b", [SuffixLen]).
114121

115-
-spec get_object_size(term(), binary(), binary()) ->
116-
integer() | {error, term()}.
122+
-spec get_object_size(rabbitmq_aws:connection_handle(), binary(), binary()) ->
123+
{ok, integer()} | {error, term()}.
117124
get_object_size(Handle, Bucket, Key) ->
118125
get_object_size(Handle, Bucket, Key, []).
119126

127+
-spec get_object_size(rabbitmq_aws:connection_handle(), binary(), binary(), [term()]) ->
128+
{ok, integer()} | {error, term()}.
120129
get_object_size(Handle, Bucket, Key, Opts) ->
121-
{ok, [{_, Size}]} = get_object_attributes(Handle, Bucket, Key, [<<"content-length">>], Opts),
122-
binary_to_integer(Size).
130+
case get_object_attributes(Handle, Bucket, Key, [<<"content-length">>], Opts) of
131+
{ok, Attributes} ->
132+
[{<<"content-length">>, Size}] = Attributes,
133+
{ok, binary_to_integer(Size)};
134+
Error ->
135+
Error
136+
end.
123137

124138
object_path(Bucket, Key) ->
125139
BucketStr = ensure_string(Bucket),
@@ -131,9 +145,8 @@ ensure_string(Binary) when is_binary(Binary) ->
131145
ensure_string(List) when is_list(List) ->
132146
List.
133147

134-
parse_head_response_headers(Headers, Attributes) ->
135-
filter_attributes(Headers, Attributes).
136-
148+
-spec filter_attributes([{binary(), binary()}], [binary()]) ->
149+
[{binary(), binary()}].
137150
filter_attributes(Headers, []) ->
138151
Headers;
139152
filter_attributes(Headers, Attributes) ->

src/rabbitmq_stream_s3_log_reader.erl

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -36,10 +36,10 @@
3636
}).
3737

3838
-record(state, {
39-
connection :: pid(),
39+
connection :: rabbitmq_aws:connection_handle(),
4040
buffer = <<>> :: binary(),
41-
offset_start :: byte_offset(),
42-
offset_end :: byte_offset(),
41+
offset_start :: byte_offset() | undefined,
42+
offset_end :: byte_offset() | undefined,
4343
read_size :: pos_integer(),
4444
bucket :: binary(),
4545
object :: binary(),
@@ -242,11 +242,11 @@ chunk_iterator(#?MODULE{mode = Local0} = State0, Credit, PrevIter) ->
242242
iterator_next(#remote_iterator{next_offset = NextOffset0, data = Data0} = Iter0) ->
243243
case Data0 of
244244
?REC_MATCH_SIMPLE(Len, Rem0) ->
245-
<<Record:Len/binary, Rem>> = Rem0,
245+
<<Record:Len/binary, Rem/binary>> = Rem0,
246246
Iter = Iter0#remote_iterator{next_offset = NextOffset0 + 1, data = Rem},
247247
{{NextOffset0, Record}, Iter};
248248
?REC_MATCH_SUBBATCH(CompType, NumRecs, UncompressedLen, Len, Rem0) ->
249-
<<BatchData:Len/binary, Rem>> = Rem0,
249+
<<BatchData:Len/binary, Rem/binary>> = Rem0,
250250
Record = {batch, NumRecs, CompType, UncompressedLen, BatchData},
251251
Iter = Iter0#remote_iterator{next_offset = NextOffset0 + NumRecs, data = Rem},
252252
{{NextOffset0, Record}, Iter};
@@ -266,7 +266,7 @@ start_link(Reader, Bucket, Key) ->
266266
init({Reader, Bucket, Key}) ->
267267
erlang:monitor(process, Reader),
268268
{ok, Connection} = rabbitmq_aws:open_connection("s3", []),
269-
Size = rabbitmq_stream_s3_api:get_object_size(
269+
{ok, Size} = rabbitmq_stream_s3_api:get_object_size(
270270
Connection,
271271
Bucket,
272272
Key,
@@ -314,7 +314,7 @@ terminate(_Reason, #state{connection = Connection}) ->
314314

315315
format_status(#{state := #state{buffer = Buffer} = State0} = Status0) ->
316316
%% Avoid formatting the buffer - it can be large.
317-
Size = lists:flatten(io_lib:format("~b bytes", [byte_size(Buffer)])),
317+
Size = iolist_to_binary(io_lib:format("~b bytes", [byte_size(Buffer)])),
318318
Status0#{state := State0#state{buffer = Size}}.
319319

320320
code_change(_, _, State) ->
@@ -472,9 +472,7 @@ read_header2(
472472
next_offset = NextChId0 + NumRecords,
473473
position = NextPosition
474474
},
475-
read_header(Remote);
476-
{retry_with, Filter} ->
477-
read_header(Remote0#remote{filter = Filter})
475+
read_header(Remote)
478476
end;
479477
false ->
480478
%% skip and recurse
@@ -560,9 +558,15 @@ convert_remote_to_local(#?MODULE{
560558
init_local_reader(first, Config).
561559

562560
%% TODO: make this generic for timestamps too.
563-
-spec find_fragment_for_offset(osiris:offset(), #manifest{}, file:filename_all()) ->
561+
-spec find_fragment_for_offset(
562+
osiris:offset(),
563+
#manifest{} | rabbitmq_stream_s3_binary_array:array(),
564+
file:filename_all()
565+
) ->
564566
{ok, ChunkId :: osiris:offset(), byte_offset(), Fragment :: osiris:offset()}.
565567
find_fragment_for_offset(Offset, #manifest{entries = Entries}, Dir) ->
568+
find_fragment_for_offset(Offset, Entries, Dir);
569+
find_fragment_for_offset(Offset, Entries, Dir) ->
566570
RootIdx0 =
567571
rabbitmq_stream_s3_binary_array:partition_point(
568572
fun(?ENTRY(O, _T, _K, _S, _N, _)) -> Offset > O end,
@@ -619,7 +623,7 @@ find_fragment_for_offset(Offset, #manifest{entries = Entries}, Dir) ->
619623
_:70,
620624
GroupEntries/binary
621625
>> = get_group(Dir, Kind, EntryOffset),
622-
find_fragment_for_offset(Dir, Offset, GroupEntries)
626+
find_fragment_for_offset(Offset, GroupEntries, Dir)
623627
end.
624628

625629
saturating_decr(0) -> 0;

0 commit comments

Comments
 (0)