Files
a301_server/internal/chain/client.go
tolelom feb8ec96ad
All checks were successful
Server CI/CD / lint-and-build (push) Successful in 21s
Server CI/CD / deploy (push) Successful in 56s
feat: 체인 클라이언트 멀티노드 페일오버 (SPOF 해결)
CHAIN_NODE_URLS 환경변수(쉼표 구분)로 복수 노드 지정 가능.
Client.Call()이 네트워크/HTTP 오류 시 다음 노드로 자동 전환.
RPC 레벨 오류(트랜잭션 실패 등)는 즉시 반환 (페일오버 미적용).
기존 CHAIN_NODE_URL 단일 설정은 하위 호환 유지.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-18 17:31:46 +09:00

261 lines
7.3 KiB
Go

package chain
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"sync/atomic"
"time"
)
type rpcRequest struct {
JSONRPC string `json:"jsonrpc"`
ID int64 `json:"id"`
Method string `json:"method"`
Params any `json:"params"`
}
type rpcResponse struct {
JSONRPC string `json:"jsonrpc"`
ID any `json:"id"`
Result json.RawMessage `json:"result,omitempty"`
Error *rpcError `json:"error,omitempty"`
}
type rpcError struct {
Code int `json:"code"`
Message string `json:"message"`
}
func (e *rpcError) Error() string {
return fmt.Sprintf("RPC error %d: %s", e.Code, e.Message)
}
// Client is a JSON-RPC 2.0 client for the TOL Chain node.
// It supports multiple node URLs for failover: on a network/HTTP error the
// client automatically retries against the next URL in the list.
// RPC-level errors (transaction failures, etc.) are returned immediately
// without failover since they indicate a logical error, not node unavailability.
type Client struct {
nodeURLs []string
http *http.Client
idSeq atomic.Int64
next atomic.Uint64 // round-robin index
}
// NewClient creates a client for one or more chain node URLs.
// When multiple URLs are provided, failed requests fall over to the next URL.
func NewClient(nodeURLs ...string) *Client {
if len(nodeURLs) == 0 {
panic("chain.NewClient: at least one node URL is required")
}
return &Client{
nodeURLs: nodeURLs,
http: &http.Client{Timeout: 10 * time.Second},
}
}
// Call invokes a JSON-RPC method and unmarshals the result into out.
// On network or HTTP errors it tries each node URL once before giving up.
func (c *Client) Call(method string, params any, out any) error {
n := len(c.nodeURLs)
start := int(c.next.Load() % uint64(n))
var lastErr error
for i := 0; i < n; i++ {
url := c.nodeURLs[(start+i)%n]
err := c.callNode(url, method, params, out)
if err == nil {
return nil
}
// RPC-level error (e.g. tx execution failure): return immediately,
// retrying on another node would give the same result.
if _, isRPC := err.(*rpcError); isRPC {
return err
}
// Network / HTTP error: mark this node as degraded and try the next.
lastErr = err
c.next.Add(1)
}
return fmt.Errorf("all chain nodes unreachable: %w", lastErr)
}
func (c *Client) callNode(nodeURL, method string, params any, out any) error {
reqBody := rpcRequest{
JSONRPC: "2.0",
ID: c.idSeq.Add(1),
Method: method,
Params: params,
}
data, err := json.Marshal(reqBody)
if err != nil {
return fmt.Errorf("marshal RPC request: %w", err)
}
resp, err := c.http.Post(nodeURL, "application/json", bytes.NewReader(data))
if err != nil {
return fmt.Errorf("RPC network error (%s): %w", nodeURL, err)
}
defer resp.Body.Close()
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return fmt.Errorf("RPC HTTP error (%s): status %d", nodeURL, resp.StatusCode)
}
body, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024*1024))
if err != nil {
return fmt.Errorf("read RPC response: %w", err)
}
var rpcResp rpcResponse
if err := json.Unmarshal(body, &rpcResp); err != nil {
return fmt.Errorf("unmarshal RPC response: %w", err)
}
if rpcResp.Error != nil {
return rpcResp.Error
}
if out != nil {
if err := json.Unmarshal(rpcResp.Result, out); err != nil {
return fmt.Errorf("unmarshal RPC result: %w", err)
}
}
return nil
}
// --- Typed convenience methods ---
type BalanceResult struct {
Address string `json:"address"`
Balance uint64 `json:"balance"`
Nonce uint64 `json:"nonce"`
}
func (c *Client) GetBalance(address string) (*BalanceResult, error) {
var result BalanceResult
err := c.Call("getBalance", map[string]string{"address": address}, &result)
return &result, err
}
func (c *Client) GetAsset(id string) (json.RawMessage, error) {
var result json.RawMessage
err := c.Call("getAsset", map[string]string{"id": id}, &result)
return result, err
}
func (c *Client) GetAssetsByOwner(owner string, offset, limit int) (json.RawMessage, error) {
var result json.RawMessage
err := c.Call("getAssetsByOwner", map[string]any{
"owner": owner, "offset": offset, "limit": limit,
}, &result)
return result, err
}
func (c *Client) GetInventory(owner string) (json.RawMessage, error) {
var result json.RawMessage
err := c.Call("getInventory", map[string]string{"owner": owner}, &result)
return result, err
}
func (c *Client) GetActiveListings(offset, limit int) (json.RawMessage, error) {
var result json.RawMessage
err := c.Call("getActiveListings", map[string]any{
"offset": offset, "limit": limit,
}, &result)
return result, err
}
func (c *Client) GetListing(id string) (json.RawMessage, error) {
var result json.RawMessage
err := c.Call("getListing", map[string]string{"id": id}, &result)
return result, err
}
type SendTxResult struct {
TxID string `json:"tx_id"`
}
func (c *Client) SendTx(tx any) (*SendTxResult, error) {
var result SendTxResult
err := c.Call("sendTx", tx, &result)
return &result, err
}
// TxStatusResult mirrors the indexer.TxResult from the TOL Chain node.
type TxStatusResult struct {
TxID string `json:"tx_id"`
BlockHeight int64 `json:"block_height"`
Success bool `json:"success"`
Error string `json:"error"`
}
// GetTxStatus queries the execution result of a transaction.
// Returns nil result (no error) if the transaction has not been included in a block yet.
func (c *Client) GetTxStatus(txID string) (*TxStatusResult, error) {
var result *TxStatusResult
err := c.Call("getTxStatus", map[string]string{"tx_id": txID}, &result)
if err != nil {
return nil, err
}
return result, nil
}
// TxError is returned when a transaction was included in a block but execution failed.
type TxError struct {
TxID string
Message string
}
func (e *TxError) Error() string {
return fmt.Sprintf("transaction %s failed: %s", e.TxID, e.Message)
}
// DefaultTxTimeout is the default timeout for WaitForTx. PoA block intervals
// are typically a few seconds, so 15s provides ample margin.
const DefaultTxTimeout = 15 * time.Second
// SendTxAndWait sends a transaction and waits for block confirmation.
// It combines SendTx + WaitForTx for the common fire-and-confirm pattern.
func (c *Client) SendTxAndWait(tx any, timeout time.Duration) (*TxStatusResult, error) {
sendResult, err := c.SendTx(tx)
if err != nil {
return nil, fmt.Errorf("send tx: %w", err)
}
return c.WaitForTx(sendResult.TxID, timeout)
}
// WaitForTx polls getTxStatus until the transaction is included in a block or
// the timeout is reached. It returns the confirmed TxStatusResult on success,
// a TxError if the transaction executed but failed, or a timeout error.
func (c *Client) WaitForTx(txID string, timeout time.Duration) (*TxStatusResult, error) {
deadline := time.Now().Add(timeout)
interval := 200 * time.Millisecond
for {
result, err := c.GetTxStatus(txID)
if err != nil {
return nil, fmt.Errorf("getTxStatus: %w", err)
}
if result != nil {
if !result.Success {
return result, &TxError{TxID: txID, Message: result.Error}
}
return result, nil
}
if time.Now().After(deadline) {
return nil, fmt.Errorf("transaction %s not confirmed within %s", txID, timeout)
}
time.Sleep(interval)
// Increase interval up to 1s to reduce polling pressure.
if interval < time.Second {
interval = interval * 3 / 2
if interval > time.Second {
interval = time.Second
}
}
}
}