Skip to content

Commit 430bd84

Browse files
committed
Add mosquitto_plugin_publish*()
These allow plugins to publish messages on the broker that can be consumed by other plugins in the message-in event
1 parent c12eeaf commit 430bd84

15 files changed

+393
-51
lines changed

ChangeLog.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@
149149
- Plugins can now use the `next_s` and `next_ms` members of the tick event data
150150
struct to set a minimum interval that the broker will wait before calling the
151151
tick callback again.
152+
- Add `mosquitto_plugin_publish()` and `mosquitto_plugin_publish_copy()`.
153+
These allow plugins to publish messages on the broker that will also be
154+
consumed by other plugins in their message-in events.
152155

153156
# Plugins
154157
- Add acl-file plugin.

include/mosquitto/broker.h

Lines changed: 88 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -804,9 +804,9 @@ mosq_EXPORT int mosquitto_apply_on_all_clients(int (*FUNC_client_functor)(const
804804
*
805805
* This function allows a plugin to publish a message. Messages published in
806806
* this way are treated as coming from the broker and so will not be passed to
807-
* `mosquitto_auth_acl_check(, MOSQ_ACL_WRITE, , )` for checking. Read access
808-
* will be enforced as normal for individual clients when they are due to
809-
* receive the message.
807+
* the MOSQ_EVT_ACL_CHECK nor MOSQ_EVT_MESSAGE_IN events. Read access will be
808+
* enforced as normal for individual clients when they are due to receive the
809+
* message.
810810
*
811811
* It can be used to send messages to all clients that have a matching
812812
* subscription, or to a single client whether or not it has a matching
@@ -883,6 +883,91 @@ mosq_EXPORT int mosquitto_broker_publish_copy(
883883
bool retain,
884884
mosquitto_property *properties);
885885

886+
887+
/* Function: mosquitto_plugin_publish
888+
*
889+
* Publish a message from within a plugin.
890+
*
891+
* This function allows a plugin to publish a message. Messages published in
892+
* this way are treated as coming from the broker and so will not be passed to
893+
* the MOSQ_EVT_ACL_CHECK event. Read access will be enforced as normal for
894+
* individual clients when they are due to receive the message.
895+
*
896+
* The important difference compared to mosquitto_broker_publish* is that
897+
* messages from this function follow the same flow as messages from clients
898+
* after they have been accepted. This means that messages do go through the
899+
* MOSQ_EVT_MESSAGE_IN event. If your plugin generates new messages in its
900+
* MOSQ_EVT_MESSAGE_IN event, it is important to ensure you do not create an
901+
* infinite loop.
902+
*
903+
* Parameters:
904+
* topic - message topic
905+
* payloadlen - payload length in bytes. Can be 0 for an empty payload.
906+
* payload - payload bytes. If payloadlen > 0 this must not be NULL. Must
907+
* be allocated on the heap. Will be freed by mosquitto after use if the
908+
* function returns success.
909+
* qos - message QoS to use.
910+
* retain - should retain be set on the message. This does not apply if
911+
* clientid is non-NULL.
912+
* properties - MQTT v5 properties to attach to the message. If the function
913+
* returns success, then properties is owned by the broker and
914+
* will be freed at a later point.
915+
*
916+
* Returns:
917+
* MOSQ_ERR_SUCCESS - on success
918+
* MOSQ_ERR_INVAL - if topic is NULL, if payloadlen < 0, if payloadlen > 0
919+
* and payload is NULL, if qos is not 0, 1, or 2.
920+
* MOSQ_ERR_NOMEM - on out of memory
921+
*/
922+
mosq_EXPORT int mosquitto_plugin_publish(
923+
const char *topic,
924+
int payloadlen,
925+
void *payload,
926+
int qos,
927+
bool retain,
928+
mosquitto_property *properties);
929+
930+
931+
/* Function: mosquitto_plugin_publish_copy
932+
*
933+
* Publish a message from within a plugin.
934+
*
935+
* This function is identical to mosquitto_plugin_publish, except that a copy
936+
* of `payload` is taken.
937+
*
938+
* Parameters:
939+
* topic - message topic
940+
* payloadlen - payload length in bytes. Can be 0 for an empty payload.
941+
* payload - payload bytes. If payloadlen > 0 this must not be NULL.
942+
* Memory remains the property of the calling function.
943+
* qos - message QoS to use.
944+
* retain - should retain be set on the message. This does not apply if
945+
* clientid is non-NULL.
946+
* properties - MQTT v5 properties to attach to the message. If the function
947+
* returns success, then properties is owned by the broker and
948+
* will be freed at a later point.
949+
*
950+
* Returns:
951+
* MOSQ_ERR_SUCCESS - on success
952+
* MOSQ_ERR_INVAL - if topic is NULL, if payloadlen < 0, if payloadlen > 0
953+
* and payload is NULL, if qos is not 0, 1, or 2.
954+
* MOSQ_ERR_NOMEM - on out of memory
955+
*/
956+
mosq_EXPORT int mosquitto_plugin_publish_copy(
957+
const char *topic,
958+
int payloadlen,
959+
const void *payload,
960+
int qos,
961+
bool retain,
962+
mosquitto_property *properties);
963+
964+
965+
/* =========================================================================
966+
*
967+
* Section: Authentication functions
968+
*
969+
* ========================================================================= */
970+
886971
/* Function: mosquitto_complete_basic_auth
887972
*
888973
* Complete a delayed authentication request.

src/database.c

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_directio
4040
bool valid_bytes;
4141
bool valid_count;
4242

43+
if(!context){
44+
return true;
45+
}
46+
4347
if(dir == mosq_md_out){
4448
msgs = &context->msgs_out;
4549
}else{
@@ -1514,7 +1518,7 @@ int db__message_write_queued_in(struct mosquitto *context)
15141518
struct mosquitto__client_msg *client_msg, *tmp;
15151519
int rc;
15161520

1517-
if(context->state != mosq_cs_active){
1521+
if(!context || context->state != mosq_cs_active){
15181522
return MOSQ_ERR_SUCCESS;
15191523
}
15201524

src/handle_publish.c

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -64,32 +64,18 @@ static int process_bad_message(struct mosquitto *context, struct mosquitto__base
6464
}
6565

6666

67-
int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint16_t mid, int dup, uint32_t *message_expiry_interval)
67+
static int handle__accepted_publish_plugin(struct mosquitto__base_msg *base_msg)
6868
{
69-
int rc;
70-
int rc2;
71-
struct mosquitto__base_msg *stored = NULL;
72-
struct mosquitto__client_msg *cmsg_stored = NULL;
73-
74-
{
75-
rc = plugin__handle_message_in(context, &base_msg->data);
76-
if(rc == MOSQ_ERR_ACL_DENIED){
77-
log__printf(NULL, MOSQ_LOG_DEBUG,
78-
"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
79-
context->id, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic,
80-
(long)base_msg->data.payloadlen);
69+
return sub__messages_queue("plugin", base_msg->data.topic, base_msg->data.qos, base_msg->data.retain, &base_msg);
70+
}
8171

82-
return process_bad_message(context, base_msg, MQTT_RC_NOT_AUTHORIZED);
83-
}else if(rc == MOSQ_ERR_QUOTA_EXCEEDED){
84-
log__printf(NULL, MOSQ_LOG_DEBUG,
85-
"Rejected PUBLISH from %s, quota exceeded.", context->id);
8672

87-
return process_bad_message(context, base_msg, MQTT_RC_QUOTA_EXCEEDED);
88-
}else if(rc != MOSQ_ERR_SUCCESS){
89-
db__msg_store_free(base_msg);
90-
return rc;
91-
}
92-
}
73+
static int handle__accepted_publish_client(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint16_t mid, int dup, uint32_t *message_expiry_interval)
74+
{
75+
struct mosquitto__client_msg *cmsg_stored = NULL;
76+
struct mosquitto__base_msg *stored = NULL;
77+
int rc;
78+
int rc2;
9379

9480
if(base_msg->data.qos > 0){
9581
db__message_store_find(context, base_msg->data.source_mid, &cmsg_stored);
@@ -107,7 +93,7 @@ int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_m
10793
}
10894

10995
if(!cmsg_stored){
110-
if(base_msg->data.qos > 0 && context->msgs_in.inflight_quota == 0){
96+
if(base_msg->data.qos > 0 && context && context->msgs_in.inflight_quota == 0){
11197
/* Client isn't allowed any more incoming messages, so fail early */
11298
db__msg_store_free(base_msg);
11399
return MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED;
@@ -195,6 +181,41 @@ int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_m
195181
}
196182

197183

184+
int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint16_t mid, int dup, uint32_t *message_expiry_interval)
185+
{
186+
int rc;
187+
const char *clientid = context?context->id:"plugin";
188+
189+
/* context == NULL happens if mosquitto_plugin_publish*() is used message */
190+
191+
{
192+
rc = plugin__handle_message_in(context, &base_msg->data);
193+
if(rc == MOSQ_ERR_ACL_DENIED){
194+
log__printf(NULL, MOSQ_LOG_DEBUG,
195+
"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
196+
clientid, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic,
197+
(long)base_msg->data.payloadlen);
198+
199+
return process_bad_message(context, base_msg, MQTT_RC_NOT_AUTHORIZED);
200+
}else if(rc == MOSQ_ERR_QUOTA_EXCEEDED){
201+
log__printf(NULL, MOSQ_LOG_DEBUG,
202+
"Rejected PUBLISH from %s, quota exceeded.", clientid);
203+
204+
return process_bad_message(context, base_msg, MQTT_RC_QUOTA_EXCEEDED);
205+
}else if(rc != MOSQ_ERR_SUCCESS){
206+
db__msg_store_free(base_msg);
207+
return rc;
208+
}
209+
}
210+
211+
if(context){
212+
return handle__accepted_publish_client(context, base_msg, mid, dup, message_expiry_interval);
213+
}else{
214+
return handle__accepted_publish_plugin(base_msg);
215+
}
216+
}
217+
218+
198219
int handle__publish(struct mosquitto *context)
199220
{
200221
uint8_t dup;

src/linker-aix.syms

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,8 @@ mosquitto_persist_client_update
3939
mosquitto_persist_retain_msg_delete
4040
mosquitto_persist_retain_msg_set
4141
mosquitto_persistence_location
42+
mosquitto_plugin_publish
43+
mosquitto_plugin_publish_copy
4244
mosquitto_plugin_set_info
4345
mosquitto_property_add_binary
4446
mosquitto_property_add_byte

src/linker-macosx.syms

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ _mosquitto_persist_client_update
4040
_mosquitto_persist_retain_msg_delete
4141
_mosquitto_persist_retain_msg_set
4242
_mosquitto_persistence_location
43+
_mosquitto_plugin_publish
44+
_mosquitto_plugin_publish_copy
4345
_mosquitto_plugin_set_info
4446
_mosquitto_property_add_binary
4547
_mosquitto_property_add_byte

src/linker.syms

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
mosquitto_persist_retain_msg_delete;
4242
mosquitto_persist_retain_msg_set;
4343
mosquitto_persistence_location;
44+
mosquitto_plugin_publish;
45+
mosquitto_plugin_publish_copy;
4446
mosquitto_plugin_set_info;
4547
mosquitto_property_add_binary;
4648
mosquitto_property_add_byte;

src/loop.c

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -115,32 +115,36 @@ static void read_message_expiry_interval(mosquitto_property **proplist, uint32_t
115115
static void queue_plugin_msgs(void)
116116
{
117117
struct mosquitto__base_msg *base_msg, *base_tmp;
118-
struct mosquitto *context;
119118
uint32_t message_expiry;
120119

121120
for(base_msg = db.plugin_msgs; base_msg && (base_tmp = base_msg->hh.next, 1); base_msg = base_tmp){
122121
base_msg__dl_delete(&db.plugin_msgs, base_msg);
123122

124123
read_message_expiry_interval(&base_msg->data.properties, &message_expiry);
125124

126-
if(base_msg->data.source_id){
127-
HASH_FIND(hh_id, db.contexts_by_id, base_msg->data.source_id, strlen(base_msg->data.source_id), context);
128-
mosquitto_FREE(base_msg->data.source_id);
129-
if(context){
130-
single_publish(context, base_msg, message_expiry);
125+
if(base_msg->origin == mosq_mo_plugin){
126+
handle__accepted_publish(NULL, base_msg, 0, 0, &message_expiry);
127+
}else{
128+
if(base_msg->data.source_id){
129+
struct mosquitto *context;
130+
HASH_FIND(hh_id, db.contexts_by_id, base_msg->data.source_id, strlen(base_msg->data.source_id), context);
131+
mosquitto_FREE(base_msg->data.source_id);
132+
if(context){
133+
single_publish(context, base_msg, message_expiry);
134+
}else{
135+
db__msg_store_free(base_msg);
136+
}
131137
}else{
138+
db__messages_easy_queue(NULL,
139+
base_msg->data.topic,
140+
base_msg->data.qos,
141+
base_msg->data.payloadlen,
142+
base_msg->data.payload,
143+
base_msg->data.retain,
144+
message_expiry,
145+
&base_msg->data.properties);
132146
db__msg_store_free(base_msg);
133147
}
134-
}else{
135-
db__messages_easy_queue(NULL,
136-
base_msg->data.topic,
137-
base_msg->data.qos,
138-
base_msg->data.payloadlen,
139-
base_msg->data.payload,
140-
base_msg->data.retain,
141-
message_expiry,
142-
&base_msg->data.properties);
143-
db__msg_store_free(base_msg);
144148
}
145149
}
146150
}

src/mosquitto_broker_internal.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ typedef int (*FUNC_auth_plugin_psk_key_get_v2)(void *, const char *, const char
102102
enum mosquitto_msg_origin {
103103
mosq_mo_client = 0,
104104
mosq_mo_broker = 1,
105+
mosq_mo_plugin = 2,
105106
};
106107

107108
struct mosquitto__plugin_lib {
@@ -693,6 +694,7 @@ int handle__publish(struct mosquitto *context);
693694
int handle__subscribe(struct mosquitto *context);
694695
int handle__unsubscribe(struct mosquitto *context);
695696
int handle__auth(struct mosquitto *context);
697+
int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint16_t mid, int dup, uint32_t *message_expiry_interval);
696698

697699
/* ============================================================
698700
* Database handling

0 commit comments

Comments
 (0)