What NATS Does
- Publish/Subscribe: Send messages to subjects, multiple subscribers can consume
- Request/Reply: RPC-style communication with automatic reply correlation
- Queue Groups: Load balance messages across worker instances
- JetStream: Persistent streaming with at-least-once delivery and replay
- Key-Value Store: Distributed KV store built on JetStream
- Object Store: Large object/file storage on JetStream
- Clustering: Horizontal scaling with automatic routing and failover
- Leaf Nodes: Extend NATS to edge locations without full cluster
- Multi-Tenancy: Accounts and users with isolated subjects
- Security: TLS, JWT auth, NKEYS, and operator-based multi-tenancy
Architecture
┌──────────────┐ ┌──────────────┐ ┌──────────────┐
│ Publishers │────▶│ NATS │────▶│ Subscribers │
│ │ │ Server │ │ │
└──────────────┘ │ (Go) │ └──────────────┘
└──────┬───────┘
│
┌─────────┼─────────┐
│ │ │
┌──────┴──┐ ┌───┴───┐ ┌───┴───┐
│JetStream│ │ KV │ │Object │
│(Streams)│ │ Store │ │ Store │
└─────────┘ └───────┘ └───────┘Self-Hosting
Docker Compose
services:
nats:
image: nats:latest
command: -js -m 8222
ports:
- "4222:4222" # Client port
- "6222:6222" # Cluster port
- "8222:8222" # HTTP monitoring
volumes:
- nats-data:/data
volumes:
nats-data:Cluster (3 Nodes)
services:
nats-1:
image: nats:latest
command: -js -cluster_name=mycluster -cluster=nats://0.0.0.0:6222 -routes=nats://nats-2:6222,nats://nats-3:6222
ports:
- "4222:4222"
nats-2:
image: nats:latest
command: -js -cluster_name=mycluster -cluster=nats://0.0.0.0:6222 -routes=nats://nats-1:6222,nats://nats-3:6222
nats-3:
image: nats:latest
command: -js -cluster_name=mycluster -cluster=nats://0.0.0.0:6222 -routes=nats://nats-1:6222,nats://nats-2:6222Messaging Patterns
Publish/Subscribe
// Node.js publisher
import { connect, StringCodec } from "nats";
const nc = await connect({ servers: "nats://localhost:4222" });
const sc = StringCodec();
nc.publish("orders.new", sc.encode(JSON.stringify({ id: 1, total: 99.99 })));// Subscriber
const sub = nc.subscribe("orders.*");
for await (const msg of sub) {
console.log(`Received: ${sc.decode(msg.data)} on ${msg.subject}`);
}Request/Reply
// Client (request)
const response = await nc.request("math.add", sc.encode("1,2"), { timeout: 1000 });
console.log(`Result: ${sc.decode(response.data)}`);
// Server (reply)
const sub = nc.subscribe("math.add");
for await (const msg of sub) {
const [a, b] = sc.decode(msg.data).split(",").map(Number);
msg.respond(sc.encode((a + b).toString()));
}Queue Groups (Load Balancing)
// Multiple workers share a subscription
const sub = nc.subscribe("jobs.*", { queue: "workers" });
// Messages are distributed round-robin among workers in "workers" queueJetStream (Persistent Streaming)
Create Stream
# CLI
nats stream add orders --subjects "orders.*" --storage file --retention workqueue
# Or via code
const jsm = await nc.jetstreamManager();
await jsm.streams.add({
name: "orders",
subjects: ["orders.*"],
storage: StorageType.File,
retention: RetentionPolicy.WorkQueue,
});Publish & Consume
const js = nc.jetstream();
// Publish (persisted to disk)
await js.publish("orders.new", sc.encode(JSON.stringify(order)));
// Durable consumer (remembers position)
const consumer = await js.consumers.get("orders", "order-processor");
const messages = await consumer.consume();
for await (const msg of messages) {
try {
await processOrder(JSON.parse(sc.decode(msg.data)));
msg.ack(); // Mark as processed
} catch (e) {
msg.nak(); // Retry later
}
}Key-Value Store
const js = nc.jetstream();
const kv = await js.views.kv("config");
// Put
await kv.put("database.host", sc.encode("postgres.internal"));
await kv.put("database.port", sc.encode("5432"));
// Get
const entry = await kv.get("database.host");
console.log(sc.decode(entry.value)); // postgres.internal
// Watch for changes
const watcher = await kv.watch();
for await (const e of watcher) {
console.log(`${e.key} = ${sc.decode(e.value)}`);
}NATS vs Alternatives
| Feature | NATS | Kafka | RabbitMQ | Redis Streams |
|---|---|---|---|---|
| Open Source | Yes (Apache-2.0) | Yes (Apache-2.0) | Yes (MPL-2.0) | Yes (BSD) |
| Throughput | 10M+ msg/s | 1M+ msg/s | 50K msg/s | 1M msg/s |
| Latency | <1ms | ~10ms | ~1ms | <1ms |
| Persistence | JetStream | Built-in | Built-in | Built-in |
| Exactly-once | JetStream | Yes | Yes | No |
| Request/Reply | Native | Manual | RPC support | Pub/sub |
| Clustering | Auto | Zookeeper/KRaft | Manual | Redis Cluster |
| RAM per instance | ~5MB | ~1GB | ~200MB | ~100MB |
常见问题
Q: NATS 和 Kafka 怎么选? A: Kafka 适合高吞吐量的大数据流处理(日志、事件溯源)和严格有序性要求。NATS 更适合微服务间通信、请求/响应和低延迟需求。NATS 更轻量,Kafka 功能更全面。
Q: JetStream 的性能如何? A: JetStream 在 SSD 上可以达到 200K+ msg/s 的持久化写入。虽然低于纯内存模式(10M+ msg/s),但对于大多数生产场景足够了。
Q: 适合 IoT 场景吗? A: 非常适合。NATS 的 Leaf Nodes 功能允许边缘设备连接到本地 NATS,再通过加密通道同步到中心集群。单个 NATS 服务器可以处理数百万连接。
来源与致谢
- GitHub: nats-io/nats-server — 19.5K+ ⭐ | Apache-2.0
- 官网: nats.io