Skip to content

Commit a97e2bb

Browse files
committed
support publications tags, resolve todos
1 parent 476ace6 commit a97e2bb

File tree

3 files changed

+33
-62
lines changed

3 files changed

+33
-62
lines changed

centrifuge/client.py

Lines changed: 30 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -346,14 +346,12 @@ async def _create_connection(self) -> bool:
346346
await self._schedule_reconnect()
347347
return False
348348
except Exception as e:
349-
# TODO: think on better error handling here.
350349
if self.state != ClientState.CONNECTING:
351350
return False
352351
await self._close_transport_conn()
353352
handler = self.events.on_error
354-
# TODO: think on better error code here.
355353
await handler(
356-
ErrorContext(code=_code_number(_ErrorCode.TRANSPORT_CLOSED), error=e),
354+
ErrorContext(code=_code_number(_ErrorCode.CONNECT_ERROR), error=e),
357355
)
358356
await self._schedule_reconnect()
359357
return False
@@ -372,7 +370,7 @@ async def _create_connection(self) -> bool:
372370
handler = self.events.on_error
373371
await handler(
374372
ErrorContext(
375-
code=_code_number(_ErrorCode.CONNECT_REPLY_ERROR),
373+
code=_code_number(_ErrorCode.CONNECT_ERROR),
376374
error=ReplyError(code, message, temporary),
377375
),
378376
)
@@ -499,22 +497,12 @@ async def _process_server_subs(self, subs: Dict[str, Dict[str, Any]]):
499497
del self._server_subs[channel]
500498

501499
async def _process_server_publication(self, channel: str, pub: Any):
502-
handler = self.events.on_publication
503-
info = pub.get("info")
504-
client_info = self._extract_client_info(info) if info else None
505-
offset = int(pub.get("offset", 0))
506-
await handler(
507-
ServerPublicationContext(
508-
channel=channel,
509-
pub=Publication(
510-
offset=offset,
511-
data=self._decode_data(pub.get("data")),
512-
info=client_info,
513-
),
514-
)
500+
publication = self._publication_from_proto(pub)
501+
await self.events.on_publication(
502+
ServerPublicationContext(channel=channel, pub=publication)
515503
)
516-
if offset > 0:
517-
self._server_subs[channel].offset = offset
504+
if publication.offset > 0:
505+
self._server_subs[channel].offset = publication.offset
518506

519507
def _clear_connecting_state(self) -> None:
520508
self._reconnect_attempts = 0
@@ -736,14 +724,12 @@ async def _subscribe(self, channel):
736724
await sub._schedule_resubscribe()
737725
return None
738726
except Exception as e:
739-
# TODO: think on better error handling here.
740727
if sub.state != SubscriptionState.SUBSCRIBING:
741728
return None
742729
handler = sub.events.on_error
743-
# TODO: think on better error code here.
744730
await handler(
745731
SubscriptionErrorContext(
746-
code=_code_number(_ErrorCode.TRANSPORT_CLOSED),
732+
code=_code_number(_ErrorCode.SUBSCRIBE_ERROR),
747733
error=e,
748734
),
749735
)
@@ -763,7 +749,7 @@ async def _subscribe(self, channel):
763749
handler = sub.events.on_error
764750
await handler(
765751
SubscriptionErrorContext(
766-
code=_code_number(_ErrorCode.SUBSCRIBE_REPLY_ERROR),
752+
code=_code_number(_ErrorCode.SUBSCRIBE_ERROR),
767753
error=ReplyError(code, message, temporary),
768754
),
769755
)
@@ -1001,15 +987,8 @@ async def history(
1001987

1002988
publications = []
1003989
for pub in reply["history"].get("publications", []):
1004-
info = pub.get("info")
1005-
client_info = self._extract_client_info(info) if info else None
1006-
publications.append(
1007-
Publication(
1008-
offset=int(pub.get("offset", 0)),
1009-
data=self._decode_data(pub.get("data")),
1010-
info=client_info,
1011-
),
1012-
)
990+
publication = self._publication_from_proto(pub)
991+
publications.append(publication)
1013992

1014993
return HistoryResult(
1015994
epoch=reply["history"].get("epoch", ""),
@@ -1325,6 +1304,17 @@ async def _listen(self) -> None:
13251304

13261305
await self._disconnect(disconnect_code, disconnect_reason, reconnect)
13271306

1307+
def _publication_from_proto(self, pub: Any) -> Publication:
1308+
info = pub.get("info")
1309+
client_info = self._extract_client_info(info) if info else None
1310+
offset = int(pub.get("offset", 0))
1311+
return Publication(
1312+
offset=offset,
1313+
data=self._decode_data(pub.get("data")),
1314+
info=client_info,
1315+
tags=pub.get("tags", {}),
1316+
)
1317+
13281318

13291319
class Subscription:
13301320
"""
@@ -1580,20 +1570,10 @@ async def _move_subscribed(self, subscribe: Dict[str, Any]) -> None:
15801570
if publications:
15811571
on_publication_handler = self.events.on_publication
15821572
for pub in publications:
1583-
info = pub.get("info")
1584-
client_info = self._client._extract_client_info(info) if info else None
1585-
offset = int(pub.get("offset", 0))
1586-
await on_publication_handler(
1587-
PublicationContext(
1588-
pub=Publication(
1589-
offset=offset,
1590-
data=self._client._decode_data(pub.get("data")),
1591-
info=client_info,
1592-
),
1593-
),
1594-
)
1595-
if offset > 0:
1596-
self._offset = offset
1573+
publication = self._client._publication_from_proto(pub)
1574+
await on_publication_handler(PublicationContext(pub=publication))
1575+
if publication.offset > 0:
1576+
self._offset = publication.offset
15971577

15981578
self._clear_subscribing_state()
15991579

@@ -1624,20 +1604,10 @@ async def _resubscribe(self) -> None:
16241604
await self._client._resubscribe(self)
16251605

16261606
async def _process_publication(self, pub: Any) -> None:
1627-
info = pub.get("info")
1628-
client_info = self._client._extract_client_info(info) if info else None
1629-
offset = int(pub.get("offset", 0))
1630-
await self.events.on_publication(
1631-
PublicationContext(
1632-
pub=Publication(
1633-
offset=offset,
1634-
data=self._client._decode_data(pub.get("data")),
1635-
info=client_info,
1636-
),
1637-
),
1638-
)
1639-
if offset > 0:
1640-
self._offset = offset
1607+
publication = self._client._publication_from_proto(pub)
1608+
await self.events.on_publication(PublicationContext(pub=publication))
1609+
if publication.offset > 0:
1610+
self._offset = publication.offset
16411611

16421612
def _need_recover(self):
16431613
return self._recover

centrifuge/codes.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,8 @@ class _ErrorCode(Enum):
6060
TRANSPORT_WRITE_ERROR = 10
6161
CONNECTION_CLOSED = 11
6262
BAD_CONFIGURATION = 12
63-
CONNECT_REPLY_ERROR = 13
64-
SUBSCRIBE_REPLY_ERROR = 14
63+
CONNECT_ERROR = 13
64+
SUBSCRIBE_ERROR = 14
6565

6666
# Errors with code > 100 are errors from server.
6767
TOKEN_EXPIRED = 109

centrifuge/types.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ class Publication:
4242
offset: int
4343
data: Any
4444
info: Optional[ClientInfo]
45+
tags: Dict[str, str]
4546

4647

4748
@dataclass

0 commit comments

Comments
 (0)