Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ChangeLog.txt
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@
- Plugins can now use the `next_s` and `next_ms` members of the tick event data
struct to set a minimum interval that the broker will wait before calling the
tick callback again.
- Add `mosquitto_plugin_publish()` and `mosquitto_plugin_publish_copy()`.
These allow plugins to publish messages on the broker that will also be
consumed by other plugins in their message-in events.

# Plugins
- Add acl-file plugin.
Expand Down
91 changes: 88 additions & 3 deletions include/mosquitto/broker.h
Original file line number Diff line number Diff line change
Expand Up @@ -804,9 +804,9 @@ mosq_EXPORT int mosquitto_apply_on_all_clients(int (*FUNC_client_functor)(const
*
* This function allows a plugin to publish a message. Messages published in
* this way are treated as coming from the broker and so will not be passed to
* `mosquitto_auth_acl_check(, MOSQ_ACL_WRITE, , )` for checking. Read access
* will be enforced as normal for individual clients when they are due to
* receive the message.
* the MOSQ_EVT_ACL_CHECK nor MOSQ_EVT_MESSAGE_IN events. Read access will be
* enforced as normal for individual clients when they are due to receive the
* message.
*
* It can be used to send messages to all clients that have a matching
* subscription, or to a single client whether or not it has a matching
Expand Down Expand Up @@ -883,6 +883,91 @@ mosq_EXPORT int mosquitto_broker_publish_copy(
bool retain,
mosquitto_property *properties);


/* Function: mosquitto_plugin_publish
*
* Publish a message from within a plugin.
*
* This function allows a plugin to publish a message. Messages published in
* this way are treated as coming from the broker and so will not be passed to
* the MOSQ_EVT_ACL_CHECK event. Read access will be enforced as normal for
* individual clients when they are due to receive the message.
*
* The important difference compared to mosquitto_broker_publish* is that
* messages from this function follow the same flow as messages from clients
* after they have been accepted. This means that messages do go through the
* MOSQ_EVT_MESSAGE_IN event. If your plugin generates new messages in its
* MOSQ_EVT_MESSAGE_IN event, it is important to ensure you do not create an
* infinite loop.
*
* Parameters:
* topic - message topic
* payloadlen - payload length in bytes. Can be 0 for an empty payload.
* payload - payload bytes. If payloadlen > 0 this must not be NULL. Must
* be allocated on the heap. Will be freed by mosquitto after use if the
* function returns success.
* qos - message QoS to use.
* retain - should retain be set on the message. This does not apply if
* clientid is non-NULL.
* properties - MQTT v5 properties to attach to the message. If the function
* returns success, then properties is owned by the broker and
* will be freed at a later point.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if topic is NULL, if payloadlen < 0, if payloadlen > 0
* and payload is NULL, if qos is not 0, 1, or 2.
* MOSQ_ERR_NOMEM - on out of memory
*/
mosq_EXPORT int mosquitto_plugin_publish(
const char *topic,
int payloadlen,
void *payload,
int qos,
bool retain,
mosquitto_property *properties);


/* Function: mosquitto_plugin_publish_copy
*
* Publish a message from within a plugin.
*
* This function is identical to mosquitto_plugin_publish, except that a copy
* of `payload` is taken.
*
* Parameters:
* topic - message topic
* payloadlen - payload length in bytes. Can be 0 for an empty payload.
* payload - payload bytes. If payloadlen > 0 this must not be NULL.
* Memory remains the property of the calling function.
* qos - message QoS to use.
* retain - should retain be set on the message. This does not apply if
* clientid is non-NULL.
* properties - MQTT v5 properties to attach to the message. If the function
* returns success, then properties is owned by the broker and
* will be freed at a later point.
*
* Returns:
* MOSQ_ERR_SUCCESS - on success
* MOSQ_ERR_INVAL - if topic is NULL, if payloadlen < 0, if payloadlen > 0
* and payload is NULL, if qos is not 0, 1, or 2.
* MOSQ_ERR_NOMEM - on out of memory
*/
mosq_EXPORT int mosquitto_plugin_publish_copy(
const char *topic,
int payloadlen,
const void *payload,
int qos,
bool retain,
mosquitto_property *properties);


/* =========================================================================
*
* Section: Authentication functions
*
* ========================================================================= */

/* Function: mosquitto_complete_basic_auth
*
* Complete a delayed authentication request.
Expand Down
1 change: 1 addition & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ add_executable(mosquitto
../plugins/acl-file/acl_check.c
../plugins/acl-file/acl_parse.c
../lib/alias_mosq.c ../lib/alias_mosq.h
base_msg.c
bridge.c bridge_topic.c
broker_control.c
conf.c
Expand Down
1 change: 1 addition & 0 deletions src/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ endif

OBJS= mosquitto.o \
acl_file.o \
base_msg.o \
bridge.o \
bridge_topic.o \
broker_control.o \
Expand Down
56 changes: 56 additions & 0 deletions src/base_msg.c
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
Copyright (c) 2009-2021 Roger Light <[email protected]>

All rights reserved. This program and the accompanying materials
are made available under the terms of the Eclipse Public License 2.0
and Eclipse Distribution License v1.0 which accompany this distribution.

The Eclipse Public License is available at
https://www.eclipse.org/legal/epl-2.0/
and the Eclipse Distribution License is available at
http://www.eclipse.org/org/documents/edl-v10.php.

SPDX-License-Identifier: EPL-2.0 OR BSD-3-Clause

Contributors:
Roger Light - initial implementation and documentation.
*/

#include "mosquitto_broker_internal.h"

/* This is using the uthash internal hh.prev/hh.next pointers to make our own
* double linked list. This function must not be used on a base_msg that is
* stored in a hash table. */
void base_msg__dl_append(struct mosquitto__base_msg **head, struct mosquitto__base_msg *add_msg)
{
if(*head){
add_msg->hh.prev = db.plugin_msgs->hh.prev;
((struct mosquitto__base_msg *)(*head)->hh.prev)->hh.next = add_msg;
(*head)->hh.prev = add_msg;
add_msg->hh.next = NULL;
} else {
(*head) = add_msg;
(*head)->hh.prev = (*head);
(*head)->hh.next = NULL;
}
}

/* This is using the uthash internal hh.prev/hh.next pointers to make our own
* double linked list. This function must not be used on a base_msg that is
* stored in a hash table. */
void base_msg__dl_delete(struct mosquitto__base_msg **head, struct mosquitto__base_msg *del_msg)
{
if(del_msg->hh.prev == del_msg){
*head = NULL;
}else if(del_msg == *head){
((struct mosquitto__base_msg *)del_msg->hh.next)->hh.prev = del_msg->hh.prev;
*head = del_msg->hh.next;
}else{
((struct mosquitto__base_msg *)del_msg->hh.prev)->hh.next = del_msg->hh.next;
if(del_msg->hh.next){
((struct mosquitto__base_msg *)del_msg->hh.next)->hh.prev = del_msg->hh.prev;
}else{
(*head)->hh.prev = del_msg->hh.prev;
}
}
}
6 changes: 5 additions & 1 deletion src/database.c
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ bool db__ready_for_flight(struct mosquitto *context, enum mosquitto_msg_directio
bool valid_bytes;
bool valid_count;

if(!context){
return true;
}

if(dir == mosq_md_out){
msgs = &context->msgs_out;
}else{
Expand Down Expand Up @@ -1514,7 +1518,7 @@ int db__message_write_queued_in(struct mosquitto *context)
struct mosquitto__client_msg *client_msg, *tmp;
int rc;

if(context->state != mosq_cs_active){
if(!context || context->state != mosq_cs_active){
return MOSQ_ERR_SUCCESS;
}

Expand Down
69 changes: 45 additions & 24 deletions src/handle_publish.c
Original file line number Diff line number Diff line change
Expand Up @@ -64,32 +64,18 @@ static int process_bad_message(struct mosquitto *context, struct mosquitto__base
}


int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint16_t mid, int dup, uint32_t *message_expiry_interval)
static int handle__accepted_publish_plugin(struct mosquitto__base_msg *base_msg)
{
int rc;
int rc2;
struct mosquitto__base_msg *stored = NULL;
struct mosquitto__client_msg *cmsg_stored = NULL;

{
rc = plugin__handle_message_in(context, &base_msg->data);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG,
"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
context->id, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic,
(long)base_msg->data.payloadlen);
return sub__messages_queue("plugin", base_msg->data.topic, base_msg->data.qos, base_msg->data.retain, &base_msg);
}

return process_bad_message(context, base_msg, MQTT_RC_NOT_AUTHORIZED);
}else if(rc == MOSQ_ERR_QUOTA_EXCEEDED){
log__printf(NULL, MOSQ_LOG_DEBUG,
"Rejected PUBLISH from %s, quota exceeded.", context->id);

return process_bad_message(context, base_msg, MQTT_RC_QUOTA_EXCEEDED);
}else if(rc != MOSQ_ERR_SUCCESS){
db__msg_store_free(base_msg);
return rc;
}
}
static int handle__accepted_publish_client(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint16_t mid, int dup, uint32_t *message_expiry_interval)
{
struct mosquitto__client_msg *cmsg_stored = NULL;
struct mosquitto__base_msg *stored = NULL;
int rc;
int rc2;

if(base_msg->data.qos > 0){
db__message_store_find(context, base_msg->data.source_mid, &cmsg_stored);
Expand All @@ -107,7 +93,7 @@ int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_m
}

if(!cmsg_stored){
if(base_msg->data.qos > 0 && context->msgs_in.inflight_quota == 0){
if(base_msg->data.qos > 0 && context && context->msgs_in.inflight_quota == 0){
/* Client isn't allowed any more incoming messages, so fail early */
db__msg_store_free(base_msg);
return MOSQ_ERR_RECEIVE_MAXIMUM_EXCEEDED;
Expand Down Expand Up @@ -195,6 +181,41 @@ int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_m
}


int handle__accepted_publish(struct mosquitto *context, struct mosquitto__base_msg *base_msg, uint16_t mid, int dup, uint32_t *message_expiry_interval)
{
int rc;
const char *clientid = context?context->id:"plugin";

/* context == NULL happens if mosquitto_plugin_publish*() is used message */

{
rc = plugin__handle_message_in(context, &base_msg->data);
if(rc == MOSQ_ERR_ACL_DENIED){
log__printf(NULL, MOSQ_LOG_DEBUG,
"Denied PUBLISH from %s (d%d, q%d, r%d, m%d, '%s', ... (%ld bytes))",
clientid, dup, base_msg->data.qos, base_msg->data.retain, base_msg->data.source_mid, base_msg->data.topic,
(long)base_msg->data.payloadlen);

return process_bad_message(context, base_msg, MQTT_RC_NOT_AUTHORIZED);
}else if(rc == MOSQ_ERR_QUOTA_EXCEEDED){
log__printf(NULL, MOSQ_LOG_DEBUG,
"Rejected PUBLISH from %s, quota exceeded.", clientid);

return process_bad_message(context, base_msg, MQTT_RC_QUOTA_EXCEEDED);
}else if(rc != MOSQ_ERR_SUCCESS){
db__msg_store_free(base_msg);
return rc;
}
}

if(context){
return handle__accepted_publish_client(context, base_msg, mid, dup, message_expiry_interval);
}else{
return handle__accepted_publish_plugin(base_msg);
}
}


int handle__publish(struct mosquitto *context)
{
uint8_t dup;
Expand Down
1 change: 1 addition & 0 deletions src/http_api.c
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ static int check_access(struct mosquitto__listener *listener, struct MHD_Connect
char *buf = "Not authorised\n";
struct MHD_Response *response = MHD_create_response_from_buffer(strlen(buf), (void *)buf, MHD_RESPMEM_MUST_COPY);
MHD_queue_basic_auth_fail_response(connection, "Mosquitto API", response);
MHD_destroy_response(response);

return MOSQ_ERR_AUTH;
}
Expand Down
2 changes: 2 additions & 0 deletions src/linker-aix.syms
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ mosquitto_persist_client_update
mosquitto_persist_retain_msg_delete
mosquitto_persist_retain_msg_set
mosquitto_persistence_location
mosquitto_plugin_publish
mosquitto_plugin_publish_copy
mosquitto_plugin_set_info
mosquitto_property_add_binary
mosquitto_property_add_byte
Expand Down
2 changes: 2 additions & 0 deletions src/linker-macosx.syms
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ _mosquitto_persist_client_update
_mosquitto_persist_retain_msg_delete
_mosquitto_persist_retain_msg_set
_mosquitto_persistence_location
_mosquitto_plugin_publish
_mosquitto_plugin_publish_copy
_mosquitto_plugin_set_info
_mosquitto_property_add_binary
_mosquitto_property_add_byte
Expand Down
2 changes: 2 additions & 0 deletions src/linker.syms
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@
mosquitto_persist_retain_msg_delete;
mosquitto_persist_retain_msg_set;
mosquitto_persistence_location;
mosquitto_plugin_publish;
mosquitto_plugin_publish_copy;
mosquitto_plugin_set_info;
mosquitto_property_add_binary;
mosquitto_property_add_byte;
Expand Down
Loading
Loading