WebSockets, Pub/Sub Operations and Real-Time Communication



Websocket


WebSockets serve as a communication protocol that facilitates full-duplex (two-way) interaction between the client and the server.


Once a connection is forged between these entities, they can commence their communication.



Key Characteristics of Websocket



-
It permits comprehensive, two-way dialogue between the client and the server.
- It maintains persistent connections between these, often referred to as long polling.
-It can also be configured within an Event-Driven framework.



Websockets and Real-Time Communication


Websockets are vital for fostering smooth, two-way dialogue between client and server, especially in applications that demand real-time engagement. In the realm of language translation, websockets drive systems like our real-time audio translator, where live audio feeds are processed and translated on the fly. This low-latency connection guarantees that users enjoy uninterrupted, immediate language translation, rendering websockets indispensable for facilitating seamless, global communication solutions.



Pub/Sub Model Overview


The Pub/Sub model is based on the premise that the server broadcasts to a designated topic, while the client must subscribe to that topic in order to receive the broadcasted information on their end.


If the client subscribes post-broadcast, they will be unable to capture the transmitted data from the server, hence, it is essential to subscribe beforehand.


In this framework, only the server is authorized to shoot messages to the clients; clients cannot communicate back to the server.



Websocket: Core Components of the Pub/Sub Model


Publisher: The sender responsible for disseminating messages.
Subscriber: The receiver tasked with accepting published messages.
Topic: The subject area wherein data will be published by the Publisher, enabling the Subscriber to access that data.



Code visualisation (Configuration of Websocket as Pub/Sub model)


@EnableWebSocketMessageBroker
@Configuration
public class WebSocketMessageBrokerConfig implements WebSocketMessageBrokerConfigurer {

@Override
public void configureMessageBroker(MessageBrokerRegistry config) {
config.enableSimpleBroker("/topic");
}

@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/websocket")
.setAllowedOriginPatterns("*")
.withSockJS();
}

}

Inject dependency of below mentioned class to ready for broadcasting


@Autowired
private SimpMessagingTemplate simpMessagingTemplate;

Now we can broadcast using below method -


simpMessagingTemplate.convertAndSend(“/topic/user”, user);

Real Time communication between client and server


In this architecture, the server is set up for real-time engagement between client and server. Once a connection is established between the two, both can exchange messages simultaneously. Real-world instances of this model include chat applications, online status indicators, etc.


We can also deploy interceptors to monitor websocket requests, such as validating authorization, and based on that, either proceed or halt the requests.



Code visualisation (Websocket for real time configuration)


@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {
@Override
public void registerWebSocketHandlers (WebSocketHandlerRegistry registry) {
registry.addHandler(new MyWebSocketHandler(), "/websocket")
.setAllowedOriginPatterns("*");
}
}

Message Handler for client messages


In the handler handled messages like chatting application -> Assuming one client has multiple sessions.



@Log4j2
@Component
public class MyWebSocketHandler extends TextWebSocketHandler {
/**
* assuming one userId has multiple sessions
*/

private final Map<String, List<WebSocketSession>> sessionsCache = new ConcurrentHashMap<>();
private final ObjectMapper objectMapper = new ObjectMapper();

/**
* Add session into sessionsMapper when a session established
* @param session
* @throws Exception
*/

@Override
public void afterConnectionEstablished (WebSocketSession session) throws Exception {
final String username = extractUsernameFromSession(session);
sessionsCache.computeIfAbsent(username, k -> new CopyOnWriteArrayList<>());
sessionsCache.get(username).add(session);
log.info("Connection established {}", sessionsCache.get(username));
}


/**
* Extract username from session
* @param webSocketSession
* @return
*/

private String extractUsernameFromSession (final WebSocketSession webSocketSession) {
final MultiValueMap<String, String> queryParams = UriComponentsBuilder
.fromUri(Objects.requireNonNull(webSocketSession.getUri()))
.build()
.getQueryParams();
return queryParams.getFirst("username");
}


/**
* Sending message to a user on all active sessions
* @param session
* @param message
* @throws Exception
*/

@Override
protected void handleTextMessage (WebSocketSession session, TextMessage message)
throws Exception {
final MessageRequestDto messageRequestDto = objectMapper.readValue(message.getPayload(),
MessageRequestDto.class);
if (sessionsCache.get(messageRequestDto.getToUser()) == null) {
log.error("Message sending failed because user does not connected {}", messageRequestDto.getToUser());
return;
}
final var messageJson = objectMapper.writeValueAsString(new MessageResponseDto(extractUsernameFromSession(session), messageRequestDto.getMessage(), messageRequestDto.getTime()));
final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
sessionsCache.get(messageRequestDto.getToUser())
.forEach(webSocketSession ->
{
try {
webSocketSession.sendMessage(new TextMessage(messageJson));
atomicBoolean.set(true);
} catch (IOException e) {
log.error("Error in sending messages {}", e);
}
});
if (atomicBoolean.get()) {
log.info("Message {} sent to all active sessions of user {}", messageRequestDto.getMessage(), messageRequestDto.getToUser());
}
}

/**
* remove session from sessiosMapper when a session disconnect
* @param session
* @param status
* @throws Exception
*/

@Override
public void afterConnectionClosed (WebSocketSession session, CloseStatus status)
throws Exception {
final String username = extractUsernameFromSession(session);
if (sessionsCache.get(username).remove(session)) {
log.warn("Connection closed {}", session);
}
}


}

Conclusion


In this article, I have discussed Websockets and illustrated how clients and servers can communicate concurrently through broadcasting and real-time connections. The server is aware of how many clients are linked in real-time within this framework.


We also gain insights into the connectivity status of both the server and client, allowing for effective communication between them.


Click Here to Read more about this blog — WebSockets, Pub/Sub Operations and Real-Time Communication

Leave a Reply

Your email address will not be published. Required fields are marked *