Workload API
概述
Workload API 是 SPIFFE 规范定义的标准 API,工作负载通过它获取身份凭证。SPIRE Agent 实现了这个 API。
API 端点
Agent 通过 Unix Domain Socket 暴露 Workload API:
默认路径:
/tmp/spire-agent/public/api.sock协议: gRPC
gRPC 服务定义
service SpiffeWorkloadAPI {
// X.509 相关
rpc FetchX509SVID(X509SVIDRequest) returns (stream X509SVIDResponse);
rpc FetchX509Bundles(X509BundlesRequest) returns (stream X509BundlesResponse);
// JWT 相关
rpc FetchJWTSVID(JWTSVIDRequest) returns (JWTSVIDResponse);
rpc FetchJWTBundles(JWTBundlesRequest) returns (stream JWTBundlesResponse);
rpc ValidateJWTSVID(ValidateJWTSVIDRequest) returns (ValidateJWTSVIDResponse);
}
X.509 操作
FetchX509SVID
获取 X.509-SVID,这是一个流式 RPC,会持续推送更新。
请求:
message X509SVIDRequest {}
响应:
message X509SVIDResponse {
repeated X509SVID svids = 1;
repeated X509Bundle federated_bundles = 2;
}
message X509SVID {
string spiffe_id = 1;
bytes x509_svid = 2; // DER 编码的证书链
bytes x509_svid_key = 3; // PKCS#8 编码的私钥
bytes bundle = 4; // DER 编码的信任包
}
Go 示例:
package main
import (
"context"
"log"
"github.com/spiffe/go-spiffe/v2/workloadapi"
)
func main() {
ctx := context.Background()
// 创建 X.509 源
source, err := workloadapi.NewX509Source(ctx,
workloadapi.WithClientOptions(
workloadapi.WithAddr("unix:///tmp/spire-agent/public/api.sock"),
),
)
if err != nil {
log.Fatal(err)
}
defer source.Close()
// 获取 SVID
svid, err := source.GetX509SVID()
if err != nil {
log.Fatal(err)
}
log.Printf("SPIFFE ID: %s", svid.ID)
log.Printf("证书数量: %d", len(svid.Certificates))
log.Printf("过期时间: %v", svid.Certificates[0].NotAfter)
}
FetchX509Bundles
仅获取信任包,不包含 SVID。
Go 示例:
func fetchBundles() {
ctx := context.Background()
source, err := workloadapi.NewBundleSource(ctx)
if err != nil {
log.Fatal(err)
}
defer source.Close()
bundle, err := source.GetBundleForTrustDomain(spiffeid.RequireTrustDomainFromString("example.org"))
if err != nil {
log.Fatal(err)
}
log.Printf("信任包包含 %d 个 CA 证书", len(bundle.X509Authorities()))
}
JWT 操作
FetchJWTSVID
获取 JWT-SVID。
请求:
message JWTSVIDRequest {
repeated string audience = 1; // 必填:目标受众
string spiffe_id = 2; // 可选:请求特定 SPIFFE ID
}
响应:
message JWTSVIDResponse {
repeated JWTSVID svids = 1;
}
message JWTSVID {
string spiffe_id = 1;
string svid = 2; // JWT token
}
Go 示例:
func fetchJWTSVID() {
ctx := context.Background()
client, err := workloadapi.New(ctx,
workloadapi.WithAddr("unix:///tmp/spire-agent/public/api.sock"),
)
if err != nil {
log.Fatal(err)
}
defer client.Close()
svid, err := client.FetchJWTSVID(ctx, jwtsvid.Params{
Audience: "https://api.example.org",
})
if err != nil {
log.Fatal(err)
}
log.Printf("JWT Token: %s", svid.Marshal())
log.Printf("SPIFFE ID: %s", svid.ID)
log.Printf("过期时间: %v", svid.Expiry)
}
FetchJWTBundles
获取 JWT 验证密钥。
ValidateJWTSVID
验证 JWT-SVID。
请求:
message ValidateJWTSVIDRequest {
string audience = 1; // 期望的受众
string svid = 2; // JWT token
}
响应:
message ValidateJWTSVIDResponse {
string spiffe_id = 1; // 验证成功返回 SPIFFE ID
google.protobuf.Struct claims = 2; // JWT 声明
}
Go 示例:
func validateJWT(token string) {
ctx := context.Background()
source, err := workloadapi.NewJWTSource(ctx)
if err != nil {
log.Fatal(err)
}
defer source.Close()
svid, err := jwtsvid.ParseAndValidate(token, source, []string{"https://api.example.org"})
if err != nil {
log.Printf("验证失败: %v", err)
return
}
log.Printf("SPIFFE ID: %s", svid.ID)
log.Printf("声明: %v", svid.Claims)
}
监听更新
X.509 更新监听
func watchX509Updates() {
ctx := context.Background()
watcher := &x509Watcher{}
err := workloadapi.WatchX509Context(ctx, watcher,
workloadapi.WithClientOptions(
workloadapi.WithAddr("unix:///tmp/spire-agent/public/api.sock"),
),
)
if err != nil {
log.Fatal(err)
}
}
type x509Watcher struct{}
func (w *x509Watcher) OnX509ContextUpdate(ctx *workloadapi.X509Context) {
for _, svid := range ctx.SVIDs {
log.Printf("收到 SVID 更新: %s (过期: %v)",
svid.ID, svid.Certificates[0].NotAfter)
}
}
func (w *x509Watcher) OnX509ContextWatchError(err error) {
log.Printf("监听错误: %v", err)
}
JWT Bundle 更新监听
func watchJWTBundles() {
ctx := context.Background()
watcher := &jwtWatcher{}
err := workloadapi.WatchJWTBundles(ctx, watcher)
if err != nil {
log.Fatal(err)
}
}
type jwtWatcher struct{}
func (w *jwtWatcher) OnJWTBundlesUpdate(bundles *jwtbundle.Set) {
log.Printf("收到 JWT Bundle 更新")
}
func (w *jwtWatcher) OnJWTBundlesWatchError(err error) {
log.Printf("监听错误: %v", err)
}
TLS 配置
mTLS 服务端
func startMTLSServer() {
ctx := context.Background()
source, err := workloadapi.NewX509Source(ctx)
if err != nil {
log.Fatal(err)
}
defer source.Close()
// 创建 TLS 配置
tlsConfig := tlsconfig.MTLSServerConfig(
source,
source,
tlsconfig.AuthorizeAny(), // 或使用更严格的授权
)
listener, err := tls.Listen("tcp", ":8443", tlsConfig)
if err != nil {
log.Fatal(err)
}
// 处理连接...
}
mTLS 客户端
func createMTLSClient() *http.Client {
ctx := context.Background()
source, err := workloadapi.NewX509Source(ctx)
if err != nil {
log.Fatal(err)
}
// 只允许特定 SPIFFE ID
authorized := tlsconfig.AuthorizeID(
spiffeid.RequireFromString("spiffe://example.org/server"),
)
tlsConfig := tlsconfig.MTLSClientConfig(source, source, authorized)
return &http.Client{
Transport: &http.Transport{
TLSClientConfig: tlsConfig,
},
}
}
授权策略
允许任意 SPIFFE ID
tlsconfig.AuthorizeAny()
允许特定 SPIFFE ID
tlsconfig.AuthorizeID(spiffeid.RequireFromString("spiffe://example.org/server"))
允许同一信任域
tlsconfig.AuthorizeMemberOf(spiffeid.RequireTrustDomainFromString("example.org"))
自定义授权
tlsconfig.Authorize(func(id spiffeid.ID) error {
if strings.HasPrefix(id.Path(), "/web/") {
return nil
}
return errors.New("unauthorized")
})
错误处理
svid, err := source.GetX509SVID()
if err != nil {
switch {
case errors.Is(err, context.Canceled):
log.Println("操作被取消")
case errors.Is(err, context.DeadlineExceeded):
log.Println("操作超时")
default:
log.Printf("获取 SVID 失败: %v", err)
}
return
}
最佳实践
建议
重用 X509Source: 创建一次,多次使用
监听更新: 使用 Watch 而非轮询
优雅关闭: 确保调用
Close()处理错误: 实现重试逻辑
验证对端: 不要使用
AuthorizeAny()在生产环境