Skip to content

Commit ccd50c5

Browse files
committed
feat: add subscriptions management
Signed-off-by: moxiaoying <[email protected]>
1 parent 8961496 commit ccd50c5

File tree

3 files changed

+133
-0
lines changed

3 files changed

+133
-0
lines changed

pulsar-admin/src/main/java/io/github/protocol/pulsar/admin/jdk/PersistentTopics.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
package io.github.protocol.pulsar.admin.jdk;
22

3+
import com.fasterxml.jackson.core.type.TypeReference;
4+
import io.github.openfacade.http.HttpResponse;
5+
import io.github.protocol.pulsar.admin.common.JacksonService;
6+
7+
import java.io.IOException;
8+
import java.util.List;
9+
import java.util.concurrent.ExecutionException;
10+
311
public class PersistentTopics extends BaseTopicsImpl {
412

513
private static final String BASE_URL_PERSISTENT_DOMAIN = "/admin/v2" + "/persistent";
@@ -11,4 +19,58 @@ public PersistentTopics(InnerHttpClient httpClient) {
1119
public String getDomainBaseUrl() {
1220
return BASE_URL_PERSISTENT_DOMAIN;
1321
}
22+
23+
public void createSubscription(String tenant, String namespace, String encodedTopic, String subscriptionName,
24+
boolean replicated, boolean authoritative, SubscriptionMessageId messageId)
25+
throws PulsarAdminException {
26+
String url = String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic,
27+
subscriptionName);
28+
try {
29+
HttpResponse response =
30+
httpClient.put(url, messageId, "replicated", String.valueOf(replicated), "authoritative",
31+
String.valueOf(authoritative));
32+
if (response.statusCode() != 204) {
33+
throw new PulsarAdminException(
34+
String.format("failed to create subscription %s for topic %s/%s/%s, status code %s, body : %s",
35+
subscriptionName, tenant, namespace, encodedTopic, response.statusCode(),
36+
response.bodyAsString()));
37+
}
38+
} catch (IOException | InterruptedException | ExecutionException e) {
39+
throw new PulsarAdminException(e);
40+
}
41+
}
42+
43+
public void deleteSubscription(String tenant, String namespace, String encodedTopic, String subName, boolean force,
44+
boolean authoritative) throws PulsarAdminException {
45+
String url =
46+
String.format("%s/%s/%s/%s/subscription/%s", getDomainBaseUrl(), tenant, namespace, encodedTopic, subName);
47+
try {
48+
HttpResponse response =
49+
httpClient.delete(url, "force", String.valueOf(force), "authoritative", String.valueOf(authoritative));
50+
if (response.statusCode() != 204) {
51+
throw new PulsarAdminException(
52+
String.format("failed to delete subscription %s of topic %s/%s/%s, status code %s, body : %s",
53+
subName, tenant, namespace, encodedTopic, response.statusCode(), response.bodyAsString()));
54+
}
55+
} catch (IOException | InterruptedException | ExecutionException e) {
56+
throw new PulsarAdminException(e);
57+
}
58+
}
59+
60+
public List<String> getSubscriptions(String tenant, String namespace, String encodedTopic, boolean authoritative)
61+
throws PulsarAdminException {
62+
String url = String.format("%s/%s/%s/%s/subscriptions", getDomainBaseUrl(), tenant, namespace, encodedTopic);
63+
try {
64+
HttpResponse response = httpClient.get(url, "authoritative", String.valueOf(authoritative));
65+
if (response.statusCode() != 200) {
66+
throw new PulsarAdminException(
67+
String.format("failed to get subscriptions of topic %s/%s/%s, status code %s, body : %s", tenant,
68+
namespace, encodedTopic, response.statusCode(), response.bodyAsString()));
69+
}
70+
return JacksonService.toRefer(response.body(), new TypeReference<List<String>>() {
71+
});
72+
} catch (IOException | InterruptedException | ExecutionException e) {
73+
throw new PulsarAdminException(e);
74+
}
75+
}
1476
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
package io.github.protocol.pulsar.admin.jdk;
2+
3+
import lombok.EqualsAndHashCode;
4+
import lombok.Getter;
5+
import lombok.NoArgsConstructor;
6+
import lombok.Setter;
7+
8+
import java.util.Map;
9+
10+
@Getter
11+
@Setter
12+
@NoArgsConstructor
13+
@EqualsAndHashCode
14+
public class SubscriptionMessageId {
15+
16+
private Integer batchIndex = -1;
17+
18+
private Long entryId = -1L;
19+
20+
private Long ledgerId = -1L;
21+
22+
private Integer partitionIndex = -1;
23+
24+
private Map<String, String> properties = null;
25+
26+
public static SubscriptionMessageId earliest() {
27+
SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId();
28+
return subscriptionMessageId;
29+
}
30+
31+
public static SubscriptionMessageId latest() {
32+
SubscriptionMessageId subscriptionMessageId = new SubscriptionMessageId();
33+
subscriptionMessageId.setEntryId(Long.MAX_VALUE);
34+
subscriptionMessageId.setLedgerId(Long.MAX_VALUE);
35+
subscriptionMessageId.setPartitionIndex(Integer.MAX_VALUE);
36+
return subscriptionMessageId;
37+
}
38+
}

pulsar-admin/src/test/java/io/github/protocol/pulsar/admin/jdk/PersistentTopicsTest.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import java.util.Arrays;
1111
import java.util.HashSet;
12+
import java.util.List;
1213
import java.util.TreeMap;
1314

1415
public class PersistentTopicsTest extends BaseTest {
@@ -197,4 +198,36 @@ public void getPartitionedStatsTest(PulsarAdmin pulsarAdmin) throws PulsarAdminE
197198
Assertions.assertNotNull(pulsarAdmin.persistentTopics().getPartitionedStats(tenant, namespace, topic, false));
198199
}
199200

201+
@Test
202+
public void subscriptionTest() throws PulsarAdminException {
203+
String namespace = RandomUtil.randomString();
204+
String topic = RandomUtil.randomString();
205+
String subscriptionNameLatest = RandomUtil.randomString();
206+
String subscriptionNameEarliest = RandomUtil.randomString();
207+
208+
// Create namespace and topic
209+
pulsarAdmin.namespaces().createNamespace(tenant, namespace);
210+
pulsarAdmin.persistentTopics().createNonPartitionedTopic(tenant, namespace, topic, false, null);
211+
212+
// Create subscription with message ID latest and earliest
213+
pulsarAdmin.persistentTopics()
214+
.createSubscription(tenant, namespace, topic, subscriptionNameLatest, false, false,
215+
SubscriptionMessageId.latest());
216+
pulsarAdmin.persistentTopics()
217+
.createSubscription(tenant, namespace, topic, subscriptionNameEarliest, false, false,
218+
SubscriptionMessageId.earliest());
219+
220+
// Verify subscription was created
221+
List<String> subscriptions = pulsarAdmin.persistentTopics().getSubscriptions(tenant, namespace, topic, false);
222+
Assertions.assertTrue(subscriptions.contains(subscriptionNameEarliest),
223+
"Should contain subscription created with message ID");
224+
Assertions.assertTrue(subscriptions.contains(subscriptionNameLatest),
225+
"Should contain subscription created with message ID");
226+
227+
// Clean up
228+
pulsarAdmin.persistentTopics()
229+
.deleteSubscription(tenant, namespace, topic, subscriptionNameEarliest, false, false);
230+
pulsarAdmin.persistentTopics()
231+
.deleteSubscription(tenant, namespace, topic, subscriptionNameLatest, false, false);
232+
}
200233
}

0 commit comments

Comments
 (0)