diff --git a/Dockerfile b/Dockerfile index f637490..f53f43a 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # stage: build --------------------------------------------------------- -FROM golang:1.22-alpine as build +FROM golang:1.24-alpine AS build RUN apk add --no-cache gcc musl-dev linux-headers @@ -11,7 +11,11 @@ RUN go mod download COPY . . -RUN go build -o bin/bproxy -ldflags "-s -w" github.com/flashbots/bproxy/cmd +RUN SOURCE_DATE_EPOCH=0 CGO_ENABLED=0 go build \ + -trimpath \ + -ldflags "-s -w -buildid=" \ + -o bin/bproxy \ + github.com/flashbots/bproxy/cmd # stage: run ----------------------------------------------------------- diff --git a/Makefile b/Makefile index e4bee1d..40541db 100644 --- a/Makefile +++ b/Makefile @@ -3,8 +3,9 @@ VERSION := $(VERSION:v%=%) .PHONY: build build: - @CGO_ENABLED=0 go build \ - -ldflags "-X main.version=${VERSION}" \ + @CGO_ENABLED=0 SOURCE_DATE_EPOCH=0 go build \ + -trimpath \ + -ldflags "-s -w -X main.version=${VERSION} -buildid=" \ -o ./bin/bproxy \ github.com/flashbots/bproxy/cmd diff --git a/cmd/serve.go b/cmd/serve.go index a70fc7b..a777cbc 100644 --- a/cmd/serve.go +++ b/cmd/serve.go @@ -23,8 +23,9 @@ const ( func CommandServe(cfg *config.Config) *cli.Command { makeProxyFlags := func( cfg *config.HttpProxy, category string, backendURL, listenAddress string, - ) (flags []cli.Flag, extraMirroredJrpcMethods, peerURLsFlag *cli.StringSlice) { + ) (flags []cli.Flag, extraMirroredJrpcMethods, logMethodsFlag, peerURLsFlag *cli.StringSlice) { extraMirroredJrpcMethods = &cli.StringSlice{} + logMethodsFlag = &cli.StringSlice{} peerURLsFlag = &cli.StringSlice{} flags = []cli.Flag{ @@ -206,6 +207,14 @@ func CommandServe(cfg *config.Config) *cli.Command { Value: 4096, }, + &cli.StringSliceFlag{ // --xxx-log-methods + Category: strings.ToUpper(category), + Destination: logMethodsFlag, + EnvVars: []string{envPrefix + strings.ToUpper(category) + "_LOG_METHODS"}, + Name: category + "-log-methods", + Usage: "only log requests/responses for these jrpc `methods` (empty = log all)", + }, + &cli.DurationFlag{ // --xxx-max-backend-connection-wait-timeout Category: strings.ToUpper(category), Destination: &cfg.MaxBackendConnectionWaitTimeout, @@ -307,7 +316,7 @@ func CommandServe(cfg *config.Config) *cli.Command { return } - authrpcFlags, extraMirroredJrpcMethodsAuthRPC, peerURLsAuthRPC := makeProxyFlags( + authrpcFlags, extraMirroredJrpcMethodsAuthRPC, logMethodsAuthRPC, peerURLsAuthRPC := makeProxyFlags( cfg.Authrpc.HttpProxy, categoryAuthrpc, "http://127.0.0.1:18551", "0.0.0.0:8551", ) @@ -528,7 +537,7 @@ func CommandServe(cfg *config.Config) *cli.Command { }, } - rpcFlags, extraMirroredJrpcMethodsRPC, peerURLsRPC := makeProxyFlags( + rpcFlags, extraMirroredJrpcMethodsRPC, logMethodsRPC, peerURLsRPC := makeProxyFlags( cfg.Rpc.HttpProxy, categoryRPC, "http://127.0.0.1:18545", "0.0.0.0:8545", ) @@ -569,9 +578,11 @@ func CommandServe(cfg *config.Config) *cli.Command { Before: func(_ *cli.Context) error { cfg.Authrpc.PeerURLs = peerURLsAuthRPC.Value() cfg.Authrpc.ExtraMirroredJrpcMethods = extraMirroredJrpcMethodsAuthRPC.Value() + cfg.Authrpc.LogMethods = logMethodsAuthRPC.Value() cfg.Rpc.PeerURLs = peerURLsRPC.Value() cfg.Rpc.ExtraMirroredJrpcMethods = extraMirroredJrpcMethodsRPC.Value() + cfg.Rpc.LogMethods = logMethodsRPC.Value() return cfg.Validate() }, diff --git a/config/http_proxy.go b/config/http_proxy.go index de889e3..bc299f4 100644 --- a/config/http_proxy.go +++ b/config/http_proxy.go @@ -26,6 +26,7 @@ type HttpProxy struct { LogRequestsMaxSize int `yaml:"log_requests_max_size"` LogResponses bool `yaml:"log_responses"` LogResponsesMaxSize int `yaml:"log_responses_max_size"` + LogMethods []string `yaml:"log_methods"` MaxBackendConnectionsPerHost int `yaml:"max_backend_connections_per_host"` MaxBackendConnectionWaitTimeout time.Duration `yaml:"max_client_connection_wait_timeout"` MaxClientConnectionsPerIP int `yaml:"max_client_connections_per_ip"` diff --git a/jrpc/sanitize.go b/jrpc/sanitize.go new file mode 100644 index 0000000..d7f1f7f --- /dev/null +++ b/jrpc/sanitize.go @@ -0,0 +1,103 @@ +package jrpc + +// Sanitize sanitizes a JSON-RPC message by replacing raw transaction data +// with transaction hashes. It handles both single messages and batch requests. +// The input should be an unmarshalled JSON value. +func Sanitize(message any) { + switch msg := message.(type) { + case []any: + // Batch request + for _, item := range msg { + Sanitize(item) + } + case map[string]any: + sanitizeMessage(msg) + } +} + +func sanitizeMessage(message map[string]any) { + method, _ := message["method"].(string) + + if method != "" { + // Request: sanitize params based on method + params, ok := message["params"].([]any) + if !ok { + return + } + + switch method { + case "engine_forkchoiceUpdatedV3": + if len(params) < 2 { + return + } + executionPayload, ok := params[1].(map[string]any) + if !ok { + return + } + sanitizeTransactions(executionPayload, "transactions") + + case "engine_newPayloadV4": + if len(params) == 0 { + return + } + executionPayload, ok := params[0].(map[string]any) + if !ok { + return + } + sanitizeTransactions(executionPayload, "transactions") + + case "eth_sendBundle": + if len(params) == 0 { + return + } + bundleParams, ok := params[0].(map[string]any) + if !ok { + return + } + sanitizeTransactions(bundleParams, "txs") + + case "eth_sendRawTransaction": + for i := range params { + rawTransactionToHash(¶ms[i]) + } + } + return + } + + // Response: check for result.executionPayload.transactions (engine_getPayloadV4) + result, ok := message["result"].(map[string]any) + if !ok { + return + } + executionPayload, ok := result["executionPayload"].(map[string]any) + if !ok { + return + } + sanitizeTransactions(executionPayload, "transactions") +} + +func sanitizeTransactions(obj map[string]any, key string) { + transactions, ok := obj[key].([]any) + if !ok { + return + } + for i := range transactions { + rawTransactionToHash(&transactions[i]) + } +} + +func rawTransactionToHash(transaction *any) { + str, ok := (*transaction).(string) + if !ok { + *transaction = "[error casting tx to string]" + return + } + + _, tx, err := DecodeEthRawTransaction(str) + if err != nil { + *transaction = "[error decoding tx: " + err.Error() + "]" + return + } + + *transaction = tx.Hash().Hex() +} diff --git a/jrpc/sanitize_test.go b/jrpc/sanitize_test.go new file mode 100644 index 0000000..07cbea4 --- /dev/null +++ b/jrpc/sanitize_test.go @@ -0,0 +1,139 @@ +package jrpc + +import ( + "encoding/json" + "math/big" + "strings" + "testing" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" +) + +func createSignedTx(t *testing.T, nonce uint64) (rawTx, txHash string) { + t.Helper() + key, _ := crypto.GenerateKey() + tx := types.NewTx(&types.DynamicFeeTx{ + ChainID: big.NewInt(1), + Nonce: nonce, + GasTipCap: big.NewInt(1e9), + GasFeeCap: big.NewInt(100e9), + Gas: 21000, + To: &common.Address{}, + }) + signed, _ := types.SignTx(tx, types.NewLondonSigner(big.NewInt(1)), key) + bytes, _ := signed.MarshalBinary() + return hexutil.Encode(bytes), signed.Hash().Hex() +} + +func TestSanitize_ReplacesWithHash(t *testing.T) { + rawTx, expectedHash := createSignedTx(t, 0) + + methods := []struct { + name string + input string + path func(any) string + }{ + { + name: "eth_sendRawTransaction", + input: `{"method":"eth_sendRawTransaction","params":["` + rawTx + `"]}`, + path: func(m any) string { return m.(map[string]any)["params"].([]any)[0].(string) }, + }, + { + name: "engine_newPayloadV4", + input: `{"method":"engine_newPayloadV4","params":[{"transactions":["` + rawTx + `"]}]}`, + path: func(m any) string { + return m.(map[string]any)["params"].([]any)[0].(map[string]any)["transactions"].([]any)[0].(string) + }, + }, + { + name: "engine_forkchoiceUpdatedV3", + input: `{"method":"engine_forkchoiceUpdatedV3","params":[{},{"transactions":["` + rawTx + `"]}]}`, + path: func(m any) string { + return m.(map[string]any)["params"].([]any)[1].(map[string]any)["transactions"].([]any)[0].(string) + }, + }, + { + name: "eth_sendBundle", + input: `{"method":"eth_sendBundle","params":[{"txs":["` + rawTx + `"]}]}`, + path: func(m any) string { + return m.(map[string]any)["params"].([]any)[0].(map[string]any)["txs"].([]any)[0].(string) + }, + }, + { + name: "engine_getPayloadV4 response", + input: `{"result":{"executionPayload":{"transactions":["` + rawTx + `"]}}}`, + path: func(m any) string { + return m.(map[string]any)["result"].(map[string]any)["executionPayload"].(map[string]any)["transactions"].([]any)[0].(string) + }, + }, + } + + for _, m := range methods { + t.Run(m.name, func(t *testing.T) { + var msg any + json.Unmarshal([]byte(m.input), &msg) + Sanitize(msg) + if got := m.path(msg); got != expectedHash { + t.Errorf("got %s, want %s", got, expectedHash) + } + }) + } +} + +func TestSanitize_InvalidTxShowsError(t *testing.T) { + input := `{"method":"eth_sendRawTransaction","params":["0xdeadbeef"]}` + var msg any + json.Unmarshal([]byte(input), &msg) + Sanitize(msg) + + got := msg.(map[string]any)["params"].([]any)[0].(string) + if !strings.HasPrefix(got, "[error") { + t.Errorf("expected error message, got %s", got) + } +} + +func TestSanitize_BatchRequest(t *testing.T) { + rawTx, expectedHash := createSignedTx(t, 0) + input := `[{"method":"eth_sendRawTransaction","params":["` + rawTx + `"]},{"method":"eth_blockNumber","params":[]}]` + + var msg any + json.Unmarshal([]byte(input), &msg) + Sanitize(msg) + + batch := msg.([]any) + got := batch[0].(map[string]any)["params"].([]any)[0].(string) + if got != expectedHash { + t.Errorf("got %s, want %s", got, expectedHash) + } +} + +func TestSanitize_PreservesOtherFields(t *testing.T) { + rawTx, expectedHash := createSignedTx(t, 0) + input := `{"method":"engine_newPayloadV4","params":[{"transactions":["` + rawTx + `"],"blockNumber":"0x1"}]}` + + var msg any + json.Unmarshal([]byte(input), &msg) + Sanitize(msg) + + payload := msg.(map[string]any)["params"].([]any)[0].(map[string]any) + if payload["transactions"].([]any)[0] != expectedHash { + t.Error("transaction should be replaced with hash") + } + if payload["blockNumber"] != "0x1" { + t.Error("blockNumber should be preserved") + } +} + +func TestSanitize_UnrelatedMethodUnchanged(t *testing.T) { + input := `{"method":"eth_blockNumber","params":["0x1"]}` + var msg any + json.Unmarshal([]byte(input), &msg) + Sanitize(msg) + + if msg.(map[string]any)["params"].([]any)[0] != "0x1" { + t.Error("unrelated method params should be unchanged") + } +} diff --git a/proxy/http.go b/proxy/http.go index f5b2f4d..1b662c9 100644 --- a/proxy/http.go +++ b/proxy/http.go @@ -15,6 +15,7 @@ import ( "time" "github.com/flashbots/bproxy/config" + "github.com/flashbots/bproxy/jrpc" "github.com/flashbots/bproxy/logutils" "github.com/flashbots/bproxy/metrics" "github.com/flashbots/bproxy/triaged" @@ -40,6 +41,7 @@ type HTTP struct { backendURI *fasthttp.URI extraMirroredJrpcMethods map[string]struct{} + logMethods map[string]struct{} peerURIs map[string]*fasthttp.URI healthcheck *healthcheck @@ -183,6 +185,16 @@ func newHTTP(cfg *httpConfig) (*HTTP, error) { } } + if len(cfg.proxy.LogMethods) > 0 { + p.logMethods = make(map[string]struct{}, len(cfg.proxy.LogMethods)) + for _, method := range cfg.proxy.LogMethods { + method = strings.TrimSpace(method) + if _, known := p.logMethods[method]; !known { + p.logMethods[method] = struct{}{} + } + } + } + return p, nil } @@ -571,7 +583,7 @@ func (p *HTTP) execProxyJob(job *proxyJob) { switch utils.Str(job.req.Header.ContentType()) { case "application/json": job.res.SetStatusCode(fasthttp.StatusAccepted) - job.res.SetBody([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32042,"message":%s}}`, strconv.Quote(err.Error())))) + job.res.SetBody([]byte(fmt.Sprintf(`{"jsonrpc":"2.0","id":%s,"error":{"code":-32042,"message":%s}}`, job.triage.JrpcID, strconv.Quote(err.Error())))) default: job.res.SetBody([]byte(err.Error())) job.res.SetStatusCode(fasthttp.StatusBadGateway) @@ -594,9 +606,10 @@ func (p *HTTP) execProxyJob(job *proxyJob) { } { // add log fields - if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize { - var jsonRequest interface{} + if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize && p.shouldLogMethod(job.triage.JrpcMethod) { + var jsonRequest any if err := json.Unmarshal(job.req.Body(), &jsonRequest); err == nil { + jrpc.Sanitize(jsonRequest) loggedFields = append(loggedFields, zap.Any("json_request", jsonRequest), ) @@ -608,7 +621,7 @@ func (p *HTTP) execProxyJob(job *proxyJob) { } } - if p.cfg.proxy.LogResponses && len(job.res.Body()) <= p.cfg.proxy.LogResponsesMaxSize { + if p.cfg.proxy.LogResponses && len(job.res.Body()) <= p.cfg.proxy.LogResponsesMaxSize && p.shouldLogMethod(job.triage.JrpcMethod) { var body []byte switch utils.Str(job.res.Header.ContentEncoding()) { @@ -624,8 +637,9 @@ func (p *HTTP) execProxyJob(job *proxyJob) { } if body != nil { - var jsonResponse interface{} + var jsonResponse any if err := json.Unmarshal(body, &jsonResponse); err == nil { + jrpc.Sanitize(jsonResponse) loggedFields = append(loggedFields, zap.Any("json_response", jsonResponse), ) @@ -683,9 +697,10 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { zap.Int("response_size", len(job.res.Body())), ) - if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize { - var jsonRequest interface{} + if p.cfg.proxy.LogRequests && len(job.req.Body()) <= p.cfg.proxy.LogRequestsMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { + var jsonRequest any if err := json.Unmarshal(job.req.Body(), &jsonRequest); err == nil { + jrpc.Sanitize(jsonRequest) loggedFields = append(loggedFields, zap.Any("json_request", jsonRequest), ) @@ -701,11 +716,12 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { zap.Int("http_status", job.res.StatusCode()), ) - if p.cfg.proxy.LogResponses && len(job.res.Body()) <= p.cfg.proxy.LogResponsesMaxSize { + if p.cfg.proxy.LogResponses && len(job.res.Body()) <= p.cfg.proxy.LogResponsesMaxSize && p.shouldLogMethod(job.jrpcMethodForMetrics) { switch utils.Str(job.res.Header.ContentEncoding()) { default: - var jsonResponse interface{} + var jsonResponse any if err := json.Unmarshal(job.res.Body(), &jsonResponse); err == nil { + jrpc.Sanitize(jsonResponse) loggedFields = append(loggedFields, zap.Any("json_response", jsonResponse), ) @@ -717,8 +733,9 @@ func (p *HTTP) execMirrorJob(job *mirrorJob) { case "gzip": if body, err := job.res.BodyGunzip(); err == nil { - var jsonResponse interface{} + var jsonResponse any if err := json.Unmarshal(body, &jsonResponse); err == nil { + jrpc.Sanitize(jsonResponse) loggedFields = append(loggedFields, zap.Any("json_response", jsonResponse), ) @@ -845,6 +862,14 @@ func (p *HTTP) connectionsCount() int { return len(p.connections) } +func (p *HTTP) shouldLogMethod(method string) bool { + if len(p.logMethods) == 0 { + return true // empty = log all + } + _, ok := p.logMethods[method] + return ok +} + func (p *HTTP) backendUnhealthy(ctx context.Context) { l := logutils.LoggerFromContext(ctx) diff --git a/readme.md b/readme.md index d68c9de..c5b6ebc 100644 --- a/readme.md +++ b/readme.md @@ -23,6 +23,7 @@ OPTIONS: --authrpc-healthcheck-threshold-healthy count count of consecutive successful healthchecks to consider authrpc backend to be healthy (default: 2) [$BPROXY_AUTHRPC_HEALTHCHECK_THRESHOLD_HEALTHY] --authrpc-healthcheck-threshold-unhealthy count count of consecutive failed healthchecks to consider authrpc backend to be unhealthy (default: 2) [$BPROXY_AUTHRPC_HEALTHCHECK_THRESHOLD_UNHEALTHY] --authrpc-listen-address host:port host:port for authrpc proxy (default: "0.0.0.0:8551") [$BPROXY_AUTHRPC_LISTEN_ADDRESS] + --authrpc-log-methods methods [ --authrpc-log-methods methods ] only log requests/responses for these jrpc methods (empty = log all) [$BPROXY_AUTHRPC_LOG_METHODS] --authrpc-log-requests whether to log authrpc requests (default: false) [$BPROXY_AUTHRPC_LOG_REQUESTS] --authrpc-log-requests-max-size size do not log authrpc requests larger than size (default: 4096) [$BPROXY_AUTHRPC_LOG_REQUESTS_MAX_SIZE] --authrpc-log-responses whether to log responses to proxied/mirrored authrpc requests (default: false) [$BPROXY_AUTHRPC_LOG_RESPONSES] @@ -100,6 +101,7 @@ OPTIONS: --rpc-healthcheck-threshold-healthy count count of consecutive successful healthchecks to consider rpc backend to be healthy (default: 2) [$BPROXY_RPC_HEALTHCHECK_THRESHOLD_HEALTHY] --rpc-healthcheck-threshold-unhealthy count count of consecutive failed healthchecks to consider rpc backend to be unhealthy (default: 2) [$BPROXY_RPC_HEALTHCHECK_THRESHOLD_UNHEALTHY] --rpc-listen-address host:port host:port for rpc proxy (default: "0.0.0.0:8545") [$BPROXY_RPC_LISTEN_ADDRESS] + --rpc-log-methods methods [ --rpc-log-methods methods ] only log requests/responses for these jrpc methods (empty = log all) [$BPROXY_RPC_LOG_METHODS] --rpc-log-requests whether to log rpc requests (default: false) [$BPROXY_RPC_LOG_REQUESTS] --rpc-log-requests-max-size size do not log rpc requests larger than size (default: 4096) [$BPROXY_RPC_LOG_REQUESTS_MAX_SIZE] --rpc-log-responses whether to log responses to proxied/mirrored rpc requests (default: false) [$BPROXY_RPC_LOG_RESPONSES]