Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.Assert;
import org.apache.dubbo.remoting.exchange.PortUnificationExchanger;
import org.apache.dubbo.rpc.GracefulShutdown;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ModuleModel;
Expand Down Expand Up @@ -93,6 +94,8 @@ private void doDestroy() {
for (GracefulShutdown gracefulShutdown : gracefulShutdowns) {
gracefulShutdown.readonly();
}
// close all exchangers to reject new requests
PortUnificationExchanger.goaway();

boolean hasModuleBindSpring = false;
// check if any modules are bound to Spring
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.dubbo.remoting.api;

/**
* Event used to trigger graceful shutdown for protocols that support it.
* For HTTP/2 (Triple protocol), this event triggers sending a GOAWAY frame
* to notify clients that the server is shutting down.
*
* <p>This event is fired through Netty's user event mechanism and is handled
* by protocol-specific handlers (e.g., TripleServerConnectionHandler for Triple protocol).
* Protocols that don't support GOAWAY (e.g., Dubbo protocol) will simply ignore this event.
*/
public class GoAwayEvent {

/**
* Singleton instance of GoAwayEvent.
*/
public static final GoAwayEvent INSTANCE = new GoAwayEvent();

private GoAwayEvent() {
// Private constructor to enforce singleton pattern
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -103,4 +103,6 @@ public Map<String, ChannelHandler> getSupportedHandlers() {
// this getter is just used by implementation of this class
return supportedHandlers;
}

public abstract void goaway();
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ public class PortUnificationExchanger {

private static final ErrorTypeAwareLogger log =
LoggerFactory.getErrorTypeAwareLogger(PortUnificationExchanger.class);
private static final ConcurrentMap<String, RemotingServer> servers = new ConcurrentHashMap<>();

private static final ConcurrentMap<String, AbstractPortUnificationServer> servers = new ConcurrentHashMap<>();

public static RemotingServer bind(URL url, ChannelHandler handler) {
ConcurrentHashMapUtils.computeIfAbsent(servers, url.getAddress(), addr -> {
Expand All @@ -52,7 +53,7 @@ public static RemotingServer bind(URL url, ChannelHandler handler) {
});

servers.computeIfPresent(url.getAddress(), (addr, server) -> {
((AbstractPortUnificationServer) server).addSupportedProtocol(url, handler);
server.addSupportedProtocol(url, handler);
return server;
});
return servers.get(url.getAddress());
Expand All @@ -69,9 +70,9 @@ public static AbstractConnectionClient connect(URL url, ChannelHandler handler)
}

public static void close() {
final ArrayList<RemotingServer> toClose = new ArrayList<>(servers.values());
final ArrayList<AbstractPortUnificationServer> toClose = new ArrayList<>(servers.values());
servers.clear();
for (RemotingServer server : toClose) {
for (AbstractPortUnificationServer server : toClose) {
try {
server.close();
} catch (Throwable throwable) {
Expand All @@ -80,8 +81,18 @@ public static void close() {
}
}

public static void goaway() {
for (AbstractPortUnificationServer server : servers.values()) {
try {
server.goaway();
} catch (Throwable throwable) {
log.error(PROTOCOL_ERROR_CLOSE_SERVER, "", "", "Goaway all port unification server failed", throwable);
}
}
}

// for test
public static ConcurrentMap<String, RemotingServer> getServers() {
public static ConcurrentMap<String, AbstractPortUnificationServer> getServers() {
return servers;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,16 @@ public void addSupportedProtocol(URL url, ChannelHandler handler) {
super.addSupportedProtocol(url, ChannelHandlers.wrap(handler, url));
}

@Override
public void goaway() {
// Netty 3 does not support HTTP/2 (Triple protocol), so we only need to mark channels as closing.
// The actual graceful shutdown for Dubbo protocol is handled by sending READONLY_EVENT
// through DubboGracefulShutdown.readonly() before this method is called.
for (Channel channel : getChannels()) {
channel.startClose();
}
}

@Override
public void close() {
if (channel != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.api.GoAwayEvent;
import org.apache.dubbo.remoting.api.WireProtocol;
import org.apache.dubbo.remoting.api.pu.AbstractPortUnificationServer;
import org.apache.dubbo.remoting.transport.dispatcher.ChannelHandlers;
Expand Down Expand Up @@ -87,6 +88,20 @@ public void close() {
}
}

@Override
public void goaway() {
for (Channel channel : getChannels()) {
channel.startClose();
// Fire GoAwayEvent through Netty pipeline to trigger protocol-specific graceful shutdown
// For HTTP/2 (Triple protocol), TripleServerConnectionHandler will handle this event
// and send Http2GoAwayFrame to notify clients
// For other protocols (e.g., Dubbo), this event will be ignored
if (channel instanceof NettyChannel) {
((NettyChannel) channel).getNioChannel().pipeline().fireUserEventTriggered(GoAwayEvent.INSTANCE);
}
}
}

public void bind() throws Throwable {
if (channel == null) {
doOpen();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,10 @@
*/
package org.apache.dubbo.rpc.protocol;

import org.apache.dubbo.common.Parameters;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.remoting.Channel;
import org.apache.dubbo.remoting.ChannelHandler;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.remoting.RemotingException;
import org.apache.dubbo.remoting.RemotingServer;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invocation;
Expand All @@ -33,8 +29,6 @@
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;

import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand Down Expand Up @@ -211,68 +205,4 @@ public Map<String, Object> getAttributes() {
return attributes;
}
}

protected abstract class RemotingServerAdapter implements RemotingServer {

public abstract Object getDelegateServer();

/**
* @return
*/
@Override
public boolean isBound() {
return false;
}

@Override
public Collection<Channel> getChannels() {
return null;
}

@Override
public Channel getChannel(InetSocketAddress remoteAddress) {
return null;
}

@Override
public void reset(Parameters parameters) {}

@Override
public void reset(URL url) {}

@Override
public URL getUrl() {
return null;
}

@Override
public ChannelHandler getChannelHandler() {
return null;
}

@Override
public InetSocketAddress getLocalAddress() {
return null;
}

@Override
public void send(Object message) throws RemotingException {}

@Override
public void send(Object message, boolean sent) throws RemotingException {}

@Override
public void close() {}

@Override
public void close(int timeout) {}

@Override
public void startClose() {}

@Override
public boolean isClosed() {
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.apache.dubbo.common.logger.ErrorTypeAwareLogger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.remoting.api.GoAwayEvent;

import java.io.IOException;
import java.net.SocketException;
Expand Down Expand Up @@ -97,7 +98,13 @@ private boolean isQuiteException(Throwable t) {

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
super.userEventTriggered(ctx, evt);
if (evt instanceof GoAwayEvent) {
// Handle GoAwayEvent by initiating graceful shutdown
// This will send HTTP/2 GOAWAY frame to notify clients
close(ctx, ctx.newPromise());
} else {
super.userEventTriggered(ctx, evt);
}
}

@Override
Expand Down
Loading