Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions cmd/kcp/kcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,26 @@ func main() {

// manually extract root directory from flags first as it influence all other flags
rootDir := ".kcp"
additionalMappingsFile := ""
for i, f := range os.Args {
if f == "--root-directory" {
if i < len(os.Args)-1 {
rootDir = os.Args[i+1]
} // else let normal flag processing fail
} else if strings.HasPrefix(f, "--root-directory=") {
rootDir = strings.TrimPrefix(f, "--root-directory=")
} else if f == "--miniproxy-mapping-file" {
if i < len(os.Args)-1 {
additionalMappingsFile = os.Args[i+1]
} // else let normal flag processing fail
} else if strings.HasPrefix(f, "--miniproxy-mapping-file") {
additionalMappingsFile = strings.TrimPrefix(f, "--mapping-file=")
}
}

kcpOptions := options.NewOptions(rootDir)
kcpOptions.Server.GenericControlPlane.Logs.Verbosity = logsapiv1.VerbosityLevel(2)
kcpOptions.Server.Extra.AdditionalMappingsFile = additionalMappingsFile

startCmd := &cobra.Command{
Use: "start",
Expand Down
12 changes: 12 additions & 0 deletions cmd/kcp/options/generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ import (

type GenericOptions struct {
RootDirectory string
MappingFile string
}

func NewGeneric(rootDir string) *GenericOptions {
return &GenericOptions{
RootDirectory: rootDir,
MappingFile: "",
}
}

func (o *GenericOptions) AddFlags(fss *cliflag.NamedFlagSets) {
fs := fss.FlagSet("KCP")
fs.StringVar(&o.RootDirectory, "root-directory", o.RootDirectory, "Root directory. Set to \"\" to disable file (e.g. certificates) generation in a root directory.")
fs.StringVar(&o.MappingFile, "miniproxy-mapping-file", o.MappingFile, "DEVELOPMENT ONLY. Path to additional mapping file to be used by mini-front-proxy. This should not be used in production. For production usecase use front-proxy component instead.")
}

func (o *GenericOptions) Complete() (*GenericOptions, error) {
Expand All @@ -55,6 +58,15 @@ func (o *GenericOptions) Complete() (*GenericOptions, error) {
return nil, err
}
}
if o.MappingFile != "" {
if !filepath.IsAbs(o.MappingFile) {
pwd, err := os.Getwd()
if err != nil {
return nil, err
}
o.MappingFile = filepath.Join(pwd, o.MappingFile)
}
}

return o, nil
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/proxy/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,17 @@ func shardHandler(index index.Index, proxy http.Handler) http.HandlerFunc {
return
}

shardURLString, found, errCode := index.LookupURL(clusterPath)
if errCode != 0 {
http.Error(w, "Not available.", errCode)
result, found := index.LookupURL(clusterPath)
if result.ErrorCode != 0 {
http.Error(w, "Not available.", result.ErrorCode)
return
}
if !found {
logger.WithValues("clusterPath", clusterPath).V(4).Info("Unknown cluster path")
responsewriters.Forbidden(req.Context(), attributes, w, req, kcpauthorization.WorkspaceAccessNotPermittedReason, kubernetesscheme.Codecs)
return
}
shardURL, err := url.Parse(shardURLString)
shardURL, err := url.Parse(result.URL)
if err != nil {
responsewriters.InternalError(w, req, err)
return
Expand Down
14 changes: 10 additions & 4 deletions pkg/proxy/index/index_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ const (
)

type Index interface {
LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int)
LookupURL(path logicalcluster.Path) (index.Result, bool)
}

type ClusterClientGetter func(shard *corev1alpha1.Shard) (kcpclientset.ClusterInterface, error)
Expand Down Expand Up @@ -291,10 +291,16 @@ func (c *Controller) stopShard(shardName string) {
delete(c.shardLogicalClusterInformers, shardName)
}

func (c *Controller) LookupURL(path logicalcluster.Path) (url string, found bool, errorCode int) {
func (c *Controller) LookupURL(path logicalcluster.Path) (index.Result, bool) {
r, found := c.state.LookupURL(path)
if found && r.ErrorCode != 0 {
return r.URL, found, r.ErrorCode
return index.Result{
URL: r.URL,
ErrorCode: r.ErrorCode,
}, found
}
return r.URL, found, 0
return index.Result{
URL: r.URL,
ErrorCode: 0,
}, found
}
129 changes: 14 additions & 115 deletions pkg/proxy/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,121 +23,34 @@ import (
"net/http/httputil"
"net/url"
"os"
"path"
"strings"

"github.com/kcp-dev/logicalcluster/v3"

"k8s.io/component-base/metrics/legacyregistry"
"k8s.io/klog/v2"
"sigs.k8s.io/yaml"

"github.com/kcp-dev/kcp/pkg/proxy/index"
proxyoptions "github.com/kcp-dev/kcp/pkg/proxy/options"
"github.com/kcp-dev/kcp/pkg/server/proxy"
)

// PathMapping describes how to route traffic from a path to a backend server.
// Each Path is registered with the DefaultServeMux with a handler that
// delegates to the specified backend.
type PathMapping struct {
Path string `json:"path"`
Backend string `json:"backend"`
BackendServerCA string `json:"backend_server_ca"`
ProxyClientCert string `json:"proxy_client_cert"`
ProxyClientKey string `json:"proxy_client_key"`
UserHeader string `json:"user_header,omitempty"`
GroupHeader string `json:"group_header,omitempty"`
ExtraHeaderPrefix string `json:"extra_header_prefix"`
}

type HttpHandler struct {
index index.Index
mapping []httpHandlerMapping
defaultHandler http.Handler
}

// httpHandlerMapping is used to route traffic to the correct backend server.
// Higher weight means that the mapping is more specific and should be matched first.
type httpHandlerMapping struct {
weight int
path string
handler http.Handler
}

func (h *HttpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
// mappings are used to route traffic to the correct backend server.
// It should not have `/clusters` as prefix because that is handled by the
// shardHandler or mounts. Logic is as follows:
// 1. We detect URL for the request and find the correct handler. URL can be
// shard based, virtual workspace or mount. First two are covered by r.URL,
// where mounts are covered by annotation on the workspace with the mount path.
// 2. If mountpoint is found, we rewrite the URL to resolve, else use one in
// request to match with mappings.
// 3. Iterate over mappings and find the one that matches the URL. If found,
// use the handler for that mapping, else use default handler - kcp.
// Mappings are done from most specific to least specific:
// Example: /clusters/cluster1/ will be matched before /clusters/
for _, m := range h.mapping {
url, errorCode := h.resolveURL(r)
if errorCode != 0 {
http.Error(w, http.StatusText(errorCode), errorCode)
return
}
if strings.HasPrefix(url, m.path) {
m.handler.ServeHTTP(w, r)
return
}
}

h.defaultHandler.ServeHTTP(w, r)
}

func (h *HttpHandler) resolveURL(r *http.Request) (string, int) {
// if we don't match any of the paths, use the default behavior - request
var cs = strings.SplitN(strings.TrimLeft(r.URL.Path, "/"), "/", 3)
if len(cs) < 2 || cs[0] != "clusters" {
return r.URL.Path, 0
}

clusterPath := logicalcluster.NewPath(cs[1])
if !clusterPath.IsValid() {
return r.URL.Path, 0
}

u, found, errCode := h.index.LookupURL(clusterPath)
if errCode != 0 {
return "", errCode
}
if found {
u, err := url.Parse(u)
if err == nil && u != nil {
u.Path = strings.TrimSuffix(u.Path, "/")
r.URL.Path = path.Join(u.Path, strings.Join(cs[2:], "/")) // override request prefix and keep kube api contextual suffix
return u.Path, 0
}
}

return r.URL.Path, 0
}

func NewHandler(ctx context.Context, o *proxyoptions.Options, index index.Index) (http.Handler, error) {
mappingData, err := os.ReadFile(o.MappingFile)
if err != nil {
return nil, fmt.Errorf("failed to read mapping file %q: %w", o.MappingFile, err)
}

var mapping []PathMapping
var mapping []proxy.PathMapping
if err = yaml.Unmarshal(mappingData, &mapping); err != nil {
return nil, fmt.Errorf("failed to unmarshal mapping file %q: %w", o.MappingFile, err)
}

handlers := HttpHandler{
index: index,
mapping: []httpHandlerMapping{
handlers := proxy.HttpHandler{
Index: index,
Mappings: proxy.HttpHandlerMappings{
{
weight: 0,
path: "/metrics",
handler: legacyregistry.Handler(),
Weight: 0,
Path: "/metrics",
Handler: legacyregistry.Handler(),
},
},
}
Expand Down Expand Up @@ -185,31 +98,17 @@ func NewHandler(ctx context.Context, o *proxyoptions.Options, index index.Index)

logger.V(2).WithValues("path", m.Path).Info("adding handler")
if m.Path == "/" {
handlers.defaultHandler = handler
handlers.DefaultHandler = handler
} else {
handlers.mapping = append(handlers.mapping, httpHandlerMapping{
weight: len(m.Path),
path: m.Path,
handler: handler,
handlers.Mappings = append(handlers.Mappings, proxy.HttpHandlerMapping{
Weight: len(m.Path),
Path: m.Path,
Handler: handler,
})
}
}

handlers.mapping = sortMappings(handlers.mapping)
handlers.Mappings.Sort()

return &handlers, nil
}

func sortMappings(mappings []httpHandlerMapping) []httpHandlerMapping {
// sort mappings by weight
// higher weight means that the mapping is more specific and should be matched first
// Example: /clusters/cluster1/ will be matched before /clusters/
for i := range mappings {
for j := range mappings {
if mappings[i].weight > mappings[j].weight {
mappings[i], mappings[j] = mappings[j], mappings[i]
}
}
}
return mappings
}
11 changes: 10 additions & 1 deletion pkg/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,16 @@ func NewConfig(ctx context.Context, opts kcpserveroptions.CompletedOptions) (*Co
apiHandler = mux

apiHandler = filters.WithAuditInit(apiHandler) // Must run before any audit annotation is made
apiHandler = WithLocalProxy(apiHandler, opts.Extra.ShardName, opts.Extra.ShardBaseURL, c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(), c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters())
apiHandler, err = WithLocalProxy(apiHandler,
opts.Extra.ShardName,
opts.Extra.ShardBaseURL,
opts.Extra.AdditionalMappingsFile,
c.KcpSharedInformerFactory.Tenancy().V1alpha1().Workspaces(),
c.KcpSharedInformerFactory.Core().V1alpha1().LogicalClusters(),
)
if err != nil {
panic(err) // shouldn't happen due to flag validation
}
apiHandler = WithInClusterServiceAccountRequestRewrite(apiHandler)
apiHandler = kcpfilters.WithAcceptHeader(apiHandler)
apiHandler = WithUserAgent(apiHandler)
Expand Down
Loading
Loading