+ 3
Pub Sub Pattern - Java
Hi guys. I have some problems here. I want to develop a message hub. I have a Map<String, Set<Subscriber>> subscribersTopicMap = new HashMap<String, Set<Subscriber>>(); to have a topic with a set of subs, Queue<Message> messagesQueue = new LinkedList<Message>(); a queue for messages, and the the methods to add and remove subscriber and publisher. The connections are done via Tcp and Udp, we are working with Json Objects. All methods are working, but I am having some problems about broadcast the messages to multiple subscribers. Is there someone that implemented something like this before? I have some doubts about the way I should implement the channel. If someone can help, let me know and I can show you the code. Thanks
3 ответов
+ 2
Could you put an example of code that should broadcast multiple messages in code playground and then include it here? I need to see how it's being implemented as I can't really tell what's wrong from explanation alone.
0
Let me try to explain then. We are receiving a json string from udpclient like {"type": pub, "topic": music, "payload": {"name": Nirvana}}, for publisher, and, {"type": sub, "topic": music} for subscribers. After read the commands, our server will connect to PubSubService: if(type.equalsIgnoreCase("pub")) {
String topic = jsonObj.getString("topic");
JSONObject payload = jsonObj.getJSONObject("payload");
Message message = new Message(topic, payload);
publisher.publish(message, pubSubService);
for(Integer port : portSet) {
if(port == subscriber.getPort(port)) {
subscriber.getMessagesForSubscriberOfTopic(topic, pubSubService);
String response = subscriber.getMessages().toString();
sendData = response.getBytes();
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
clientIP, port);
udpServerSocket.send(sendPacket);
}.
Here, after call the publish, we add a message to a queue in the PubSubService. I was trying with HashSet<Integer> portSet, send the messages to all subs of the topic, but is not happening.
}else if (type.equalsIgnoreCase("sub")) {
String topic = jsonObj.getString("topic");
int clientport = receivePacket.getPort();
//portSet.add(clientport);
portSet.add(subscriber.getPort(clientport));
subscriber.addSubscriber(clientIP, clientport, topic, pubSubService);
subscriber.getMessagesForSubscriberOfTopic(topic, pubSubService);
subscriber.printMessages();
String response = "Added with Sucess";
sendData = response.getBytes();
DatagramPacket sendPacket = new DatagramPacket(sendData, sendData.length,
clientIP, clientport);
udpServerSocket.send(sendPacket);
}
0
and broadcast:
public void broadcast(){
if(messagesQueue.isEmpty()){
System.out.println("No messages from publishers to display");
}else{
while(!messagesQueue.isEmpty()){
Message message = messagesQueue.remove();
String topic = message.getTopic();
Set<Subscriber> subscribersOfTopic = subscribersTopicMap.get(topic);
for(Subscriber subscriber : subscribersOfTopic){
//add broadcasted message to subscribers message queue
List<Message> subscriberMessages = subscriber.getSubscriberMessages();
subscriberMessages.add(message);
subscriber.setSubscriberMessages(subscriberMessages);
}