Skip to content

Commit 80e375c

Browse files
committed
feat: add subscriptions management
Signed-off-by: moxiaoying <[email protected]>
1 parent 619aaa7 commit 80e375c

File tree

3 files changed

+127
-1
lines changed

3 files changed

+127
-1
lines changed

pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/PersistentTopics.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package io.github.protocol.pulsar.admin.jdk;
22

3+
import com.fasterxml.jackson.core.type.TypeReference;
4+
import io.github.openfacade.http.HttpResponse;
5+
import io.github.protocol.pulsar.admin.common.JacksonService;
6+
7+
import java.io.IOException;
8+
import java.util.List;
9+
import java.util.concurrent.ExecutionException;
10+
311
public class PersistentTopics extends BaseTopicsImpl {
412

513
private static final String BASE_URL_PERSISTENT_DOMAIN = "/admin/v2" + "/persistent";
@@ -11,4 +19,56 @@ public PersistentTopics(InnerHttpClient httpClient) {
1119
public String getDomainBaseUrl() {
1220
return BASE_URL_PERSISTENT_DOMAIN;
1321
}
22+
23+
24+
public void createSubscription(String tenant, String namespace, String encodedTopic, String subscriptionName,
25+
boolean replicated, boolean authoritative, SubscriptionMessageId messageId) throws PulsarAdminException {
26+
String url = String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, subscriptionName);
27+
try {
28+
HttpResponse response = httpClient.put(url, messageId,
29+
"replicated", String.valueOf(replicated),
30+
"authoritative", String.valueOf(authoritative));
31+
if (response.statusCode() != 204) {
32+
throw new PulsarAdminException(
33+
String.format("failed to create subscription %s for topic %s/%s/%s, status code %s, body : %s",
34+
subscriptionName, tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString()));
35+
}
36+
} catch (IOException | InterruptedException | ExecutionException e) {
37+
throw new PulsarAdminException(e);
38+
}
39+
}
40+
41+
public void deleteSubscription(String tenant, String namespace, String encodedTopic, String subName,
42+
boolean force, boolean authoritative) throws PulsarAdminException {
43+
String url = String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, subName);
44+
try {
45+
HttpResponse response = httpClient.delete(url,
46+
"force", String.valueOf(force),
47+
"authoritative", String.valueOf(authoritative));
48+
if (response.statusCode() != 204) {
49+
throw new PulsarAdminException(
50+
String.format("failed to delete subscription %s of topic %s/%s/%s, status code %s, body : %s",
51+
subName, tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString()));
52+
}
53+
} catch (IOException | InterruptedException | ExecutionException e) {
54+
throw new PulsarAdminException(e);
55+
}
56+
}
57+
58+
public List<String> getSubscriptions(String tenant, String namespace, String encodedTopic, boolean authoritative)
59+
throws PulsarAdminException {
60+
String url = String.format("%s/%s/%s/%s/subscriptions", getDomainBaseUrl(), tenant, namespace, encodedTopic);
61+
try {
62+
HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative));
63+
if (response.statusCode() != 200) {
64+
throw new PulsarAdminException(
65+
String.format("failed to get subscriptions of topic %s/%s/%s, status code %s, body : %s",
66+
tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString()));
67+
}
68+
return JacksonService.toRefer(response.body(), new TypeReference<List<String>>() {
69+
});
70+
} catch (IOException | InterruptedException | ExecutionException e) {
71+
throw new PulsarAdminException(e);
72+
}
73+
}
1474
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.github.protocol.pulsar.admin.jdk;
2+
3+
import lombok.EqualsAndHashCode;
4+
import lombok.Getter;
5+
import lombok.NoArgsConstructor;
6+
import lombok.Setter;
7+
8+
import java.util.Map;
9+
10+
@Getter
11+
@Setter
12+
@NoArgsConstructor
13+
@EqualsAndHashCode
14+
public class SubscriptionMessageId {
15+
16+
private Integer batchIndex = -1;
17+
18+
private Long entryId = -1L;
19+
20+
private Long ledgerId = -1L;
21+
22+
private Integer partitionIndex = -1;
23+
24+
private Map<String, String> properties = null;
25+
26+
public static SubscriptionMessageId earliest() {
27+
SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId();
28+
return subscriptionMessageId;
29+
}
30+
31+
public static SubscriptionMessageId latest() {
32+
SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId();
33+
subscriptionMessageId.setEntryId(Long.MAX_VALUE);
34+
subscriptionMessageId.setLedgerId(Long.MAX_VALUE);
35+
subscriptionMessageId.setPartitionIndex(Integer.MAX_VALUE);
36+
return subscriptionMessageId;
37+
}
38+
}

pulsar-admin/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010

1111
import java.util.Arrays;
1212
import java.util.HashSet;
13+
import java.util.List;
1314
import java.util.TreeMap;
1415

1516
public class PersistentTopicsTest {
@@ -198,7 +199,34 @@ public void getPartitionedStatsTest() throws PulsarAdminException {
198199
String topic = RandomUtil.randomString();
199200
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
200201
pulsarAdmin.persistentTopics().createPartitionedTopic(tenant, namespace, topic, 2, false);
201-
Assertions.assertNotNull(pulsarAdmin.persistentTopics().getPartitionedStats(tenant, namespace, topic, false));
202+
Assertions.assertNotNull(pulsarAdmin.persistentTopics().getPartitionedStats(tenant, namespace,
203+
topic, false));
202204
}
203205

206+
@Test
207+
public void subscriptionTest() throws PulsarAdminException {
208+
String namespace = RandomUtil.randomString();
209+
String topic = RandomUtil.randomString();
210+
String subscriptionNameLatest = RandomUtil.randomString();
211+
String subscriptionNameEarliest = RandomUtil.randomString();
212+
213+
// Create namespace and topic
214+
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
215+
pulsarAdmin.persistentTopics().createNonPartitionedTopic(tenant, namespace, topic, false, null);
216+
217+
// Create subscription with message ID latest and earliest
218+
pulsarAdmin.persistentTopics().createSubscription(
219+
tenant, namespace, topic, subscriptionNameLatest, false, false, SubscriptionMessageId.latest());
220+
pulsarAdmin.persistentTopics().createSubscription(
221+
tenant, namespace, topic, subscriptionNameEarliest, false, false, SubscriptionMessageId.earliest());
222+
223+
// Verify subscription was created
224+
List<String> subscriptions = pulsarAdmin.persistentTopics().getSubscriptions(tenant, namespace, topic, false);
225+
Assertions.assertTrue(subscriptions.contains(subscriptionNameEarliest), "Should contain subscription created with message ID");
226+
Assertions.assertTrue(subscriptions.contains(subscriptionNameLatest), "Should contain subscription created with message ID");
227+
228+
// Clean up
229+
pulsarAdmin.persistentTopics().deleteSubscription(tenant, namespace, topic, subscriptionNameEarliest, false, false);
230+
pulsarAdmin.persistentTopics().deleteSubscription(tenant, namespace, topic, subscriptionNameLatest, false, false);
231+
}
204232
}

0 commit comments

Comments
 (0)