Linux安装和配置nsq

news/2024/10/30 13:37:40/

NSQ简介

NSQ 是一个实时分布式消息平台,旨在大规模运行,每天处理数十亿条消息。

它提倡没有单点故障的分布式和分散式拓扑结构,实现容错和高可用性,同时保证可靠的消息传递。请看特点和保证。

在操作上,NSQ很容易配置和部署(所有参数都在命令行上指定,编译的二进制文件没有运行时的依赖性)。为了获得最大的灵活性,它与数据格式无关(消息可以是JSON、MsgPack、协议缓冲区或其他任何形式)。官方的Go和Python库是开箱即用的(还有许多其他的客户端库),如果你有兴趣建立自己的库,有一个协议规范。

特点:

  • 追求简单部署

  • 追求高可用、避免单点故障、无中心设计

  • 确保消息送达

  • 生产者消费者自动发现、消费者连接所有生产者、向消费者推的模式

  • 提供 HTTP 接口

  • 提供几乎所有编程语言的客户端开发包

NSQ是由知名短链接服务商bitly用Go语言开发的实时消息处理系统,具有高性能、高可靠、无视单点故障等优点,是一个非常不错的新兴的消息队列解决方案。

源代码地址:https://github.com/nsqio/nsq

下载和安装nsq

官方安装地址:https://nsq.io/overview/quick_start.html

官方下载地址:https://nsq.io/deployment/installing.html

#创建目录,并进入
mkdir -p /data/software
cd /data/software#下载nsq包
wget https://s3.amazonaws.com/bitly-downloads/nsq/nsq-0.3.8.linux-amd64.go1.6.2.tar.gz#解压
tar -zxvf nsq-0.3.8.linux-amd64.go1.6.2.tar.gz#复制到指定目录
cp -r /data/software/nsq-0.3.8.linux-amd64.go1.6.2 /data/beyond/nsq

启动时可配置文件的参考地址:
https://liushuchun.gitbooks.io/mixapi/content/nsq_config.html

启动相关服务


nohup /data/beyond/nsq/bin/nsqd &
#(守护进程;接收,缓存和投递消息给客户端)  如:nsqd -config=/home/nsq/bin/nsqd.cfgnohup /data/beyond/nsq/bin/nsqlookupd &
#(守护进程;为消费者提供运行时发现服务,来查找指定话题(topic)的生产者 nsqd) nohup /data/beyond/nsq/bin/nsqadmin &
#(提供 Web 页面用来实时的管理你的 NSQ 集群。它通过和 nsqlookupd 实例交流,来确定生产者)

指定端口启动
nohup /data/beyond/nsq/bin/nsqadmin --lookupd-http-address localhost:4160 &

指定配置文件启动
nohup /data/beyond/nsq/bin/nsqlookupd -config=/data/beyond/nsq/conf/nsqlookupd.cfg &

nohup /data/beyond/nsq/bin/nsqadmin -config=/data/beyond/nsq/conf/nsqadmin.cfg &

nohup /data/beyond/nsq/bin/nsqd -config=/data/beyond/nsq/conf/nsqd.cfg &

集群管理
记得谁说过,go调用nsq时,只能在代码里配置多个地址,去轮询调用

测试环境的配置,如:192.168.1.6

nsqd.cfg文件
cat nsqd.cfg
#配置如下## enable verbose logging
verbose = false## unique identifier (int) for this worker (will default to a hash of hostname)
id = 66## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4150"## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4151"## <addr>:<port> to listen on for HTTPS clients
# https_address = "0.0.0.0:4152"## address that will be registered with lookupd (defaults to the OS hostname)
broadcast_address = "192.168.1.6"## cluster of nsqlookupd TCP addresses
nsqlookupd_tcp_addresses = ["192.168.1.6:4160"
]## duration to wait before HTTP client connection timeout
http_client_connect_timeout = "2s"## duration to wait before HTTP client request timeout
http_client_request_timeout = "5s"## path to store disk-backed messages
# data_path = "/var/lib/nsq"## number of messages to keep in memory (per topic/channel)
mem_queue_size = 10000## number of bytes per diskqueue file before rolling
max_bytes_per_file = 104857600## number of messages per diskqueue fsync
sync_every = 2500## duration of time per diskqueue fsync (time.Duration)
sync_timeout = "2s"## duration to wait before auto-requeing a message
msg_timeout = "60s"## maximum duration before a message will timeout
max_msg_timeout = "15m"## maximum size of a single message in bytes
max_msg_size = 1024768## maximum requeuing timeout for a message
max_req_timeout = "1h"## maximum size of a single command body
max_body_size = 5123840## maximum client configurable duration of time between client heartbeats
max_heartbeat_interval = "60s"## maximum RDY count for a client
max_rdy_count = 2500## maximum client configurable size (in bytes) for a client output buffer
max_output_buffer_size = 65536## maximum client configurable duration of time between flushing to a client (time.Duration)
max_output_buffer_timeout = "1s"## UDP <addr>:<port> of a statsd daemon for pushing stats
# statsd_address = "127.0.0.1:8125"## prefix used for keys sent to statsd (%s for host replacement)
statsd_prefix = "nsq.%s"## duration between pushing to statsd (time.Duration)
statsd_interval = "60s"## toggle sending memory and GC stats to statsd
statsd_mem_stats = true## message processing time percentiles to keep track of (float)
e2e_processing_latency_percentiles = [100.0,99.0,95.0
]## calculate end to end latency quantiles for this duration of time (time.Duration)
e2e_processing_latency_window_time = "10m"## path to certificate file
tls_cert = ""## path to private key file
tls_key = ""## set policy on client certificate (require - client must provide certificate,
##  require-verify - client must provide verifiable signed certificate)
# tls_client_auth_policy = "require-verify"## set custom root Certificate Authority
# tls_root_ca_file = ""## require client TLS upgrades
tls_required = false## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")
tls_min_version = ""## enable deflate feature negotiation (client compression)
deflate = true## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)
max_deflate_level = 6## enable snappy feature negotiation (client compression)
snappy = true
nsqlookupd.cfg文件
cat nsqlookupd.cfg
#配置如下## enable verbose logging
verbose = false## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4160"## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4161"## address that will be registered with lookupd (defaults to the OS hostname)
broadcast_address = "192.168.1.6"## duration of time a producer will remain in the active list since its last ping
inactive_producer_timeout = "300s"## duration of time a producer will remain tombstoned if registration remains
tombstone_lifetime = "45s"
nsqadmin.cfg文件
cat nsqadmin.cfg
#配置如下## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4171"## graphite HTTP address
graphite_url = ""## proxy HTTP requests to graphite
proxy_graphite = false## prefix used for keys sent to statsd (%s for host replacement, must match nsqd)
statsd_prefix = "nsq.%s"## format of statsd counter stats
statsd_counter_format = "stats.counters.%s.count"## format of statsd gauge stats
statsd_gauge_format = "stats.gauges.%s"## time interval nsqd is configured to push to statsd (must match nsqd)
statsd_interval = "60s"## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
notification_http_endpoint = ""## nsqlookupd HTTP addresses
nsqlookupd_http_addresses = ["192.168.1.6:4161"
]## nsqd HTTP addresses (optional)
#nsqd_http_addresses = [
#    "192.168.1.6:4151",
#    "192.168.1.7:4151",
#]

测试环境的配置集群,第二台,如:192.168.1.7

注意: nsqlookupd_tcp_addresses 地址配置的是 192.168.1.6

nsqd.cfg文件
cat nsqd.cfg
#配置如下## enable verbose logging
verbose = false## unique identifier (int) for this worker (will default to a hash of hostname)
id = 77## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4150"## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4151"## <addr>:<port> to listen on for HTTPS clients
# https_address = "0.0.0.0:4152"## address that will be registered with lookupd (defaults to the OS hostname)
broadcast_address = "192.168.1.7"## cluster of nsqlookupd TCP addresses
nsqlookupd_tcp_addresses = ["192.168.1.6:4160"
]## duration to wait before HTTP client connection timeout
http_client_connect_timeout = "2s"## duration to wait before HTTP client request timeout
http_client_request_timeout = "5s"## path to store disk-backed messages
# data_path = "/var/lib/nsq"## number of messages to keep in memory (per topic/channel)
mem_queue_size = 10000## number of bytes per diskqueue file before rolling
max_bytes_per_file = 104857600## number of messages per diskqueue fsync
sync_every = 2500## duration of time per diskqueue fsync (time.Duration)
sync_timeout = "2s"## duration to wait before auto-requeing a message
msg_timeout = "60s"## maximum duration before a message will timeout
max_msg_timeout = "15m"## maximum size of a single message in bytes
max_msg_size = 1024768## maximum requeuing timeout for a message
max_req_timeout = "1h"## maximum size of a single command body
max_body_size = 5123840## maximum client configurable duration of time between client heartbeats
max_heartbeat_interval = "60s"## maximum RDY count for a client
max_rdy_count = 2500## maximum client configurable size (in bytes) for a client output buffer
max_output_buffer_size = 65536## maximum client configurable duration of time between flushing to a client (time.Duration)
max_output_buffer_timeout = "1s"## UDP <addr>:<port> of a statsd daemon for pushing stats
# statsd_address = "127.0.0.1:8125"## prefix used for keys sent to statsd (%s for host replacement)
statsd_prefix = "nsq.%s"## duration between pushing to statsd (time.Duration)
statsd_interval = "60s"## toggle sending memory and GC stats to statsd
statsd_mem_stats = true## message processing time percentiles to keep track of (float)
e2e_processing_latency_percentiles = [100.0,99.0,95.0
]## calculate end to end latency quantiles for this duration of time (time.Duration)
e2e_processing_latency_window_time = "10m"## path to certificate file
tls_cert = ""## path to private key file
tls_key = ""## set policy on client certificate (require - client must provide certificate,
##  require-verify - client must provide verifiable signed certificate)
# tls_client_auth_policy = "require-verify"## set custom root Certificate Authority
# tls_root_ca_file = ""## require client TLS upgrades
tls_required = false## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")
tls_min_version = ""## enable deflate feature negotiation (client compression)
deflate = true## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)
max_deflate_level = 6## enable snappy feature negotiation (client compression)
snappy = true

其他环境的配置,如: 172.28.15.1

nsqd.cfg 文件
cat nsqd.cfg 
#配置如下## enable verbose logging
verbose = false## unique identifier (int) for this worker (will default to a hash of hostname)
id = 66 ## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4150"## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4151"## <addr>:<port> to listen on for HTTPS clients
# https_address = "0.0.0.0:4152"## address that will be registered with lookupd (defaults to the OS hostname)
broadcast_address = "172.28.15.1"## cluster of nsqlookupd TCP addresses
nsqlookupd_tcp_addresses = ["172.28.15.1:4160"
]## duration to wait before HTTP client connection timeout
http_client_connect_timeout = "2s"## duration to wait before HTTP client request timeout
http_client_request_timeout = "5s"## path to store disk-backed messages
# data_path = "/var/lib/nsq"## number of messages to keep in memory (per topic/channel)
mem_queue_size = 10000## number of bytes per diskqueue file before rolling
max_bytes_per_file = 104857600## number of messages per diskqueue fsync
sync_every = 2500## duration of time per diskqueue fsync (time.Duration)
sync_timeout = "2s"## duration to wait before auto-requeing a message
msg_timeout = "60s"## maximum duration before a message will timeout
max_msg_timeout = "15m"## maximum size of a single message in bytes
max_msg_size = 1024768## maximum requeuing timeout for a message
max_req_timeout = "1h"## maximum size of a single command body
max_body_size = 5123840## maximum client configurable duration of time between client heartbeats
max_heartbeat_interval = "60s"## maximum RDY count for a client
max_rdy_count = 2500## maximum client configurable size (in bytes) for a client output buffer
max_output_buffer_size = 65536## maximum client configurable duration of time between flushing to a client (time.Duration)
max_output_buffer_timeout = "1s"## UDP <addr>:<port> of a statsd daemon for pushing stats
# statsd_address = "127.0.0.1:8125"## prefix used for keys sent to statsd (%s for host replacement)
statsd_prefix = "nsq.%s"## duration between pushing to statsd (time.Duration)
statsd_interval = "60s"## toggle sending memory and GC stats to statsd
statsd_mem_stats = true## message processing time percentiles to keep track of (float)
e2e_processing_latency_percentiles = [100.0,99.0,95.0
]## calculate end to end latency quantiles for this duration of time (time.Duration)
e2e_processing_latency_window_time = "10m"## path to certificate file
tls_cert = ""## path to private key file
tls_key = ""## set policy on client certificate (require - client must provide certificate,
##  require-verify - client must provide verifiable signed certificate)
# tls_client_auth_policy = "require-verify"## set custom root Certificate Authority
# tls_root_ca_file = ""## require client TLS upgrades
tls_required = false## minimum TLS version ("ssl3.0", "tls1.0," "tls1.1", "tls1.2")
tls_min_version = ""## enable deflate feature negotiation (client compression)
deflate = true## max deflate compression level a client can negotiate (> values == > nsqd CPU usage)
max_deflate_level = 6## enable snappy feature negotiation (client compression)
snappy = true
nsqlookupd.cfg文件
cat nsqlookupd.cfg
#配置如下## enable verbose logging
verbose = false## <addr>:<port> to listen on for TCP clients
tcp_address = "0.0.0.0:4160"## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4161"## address that will be registered with lookupd (defaults to the OS hostname)
broadcast_address = "172.28.15.1"## duration of time a producer will remain in the active list since its last ping
inactive_producer_timeout = "300s"## duration of time a producer will remain tombstoned if registration remains
tombstone_lifetime = "45s"
nsqadmin.cfg文件

cat nsqadmin.cfg
#配置如下## <addr>:<port> to listen on for HTTP clients
http_address = "0.0.0.0:4171"## graphite HTTP address
graphite_url = ""## proxy HTTP requests to graphite
proxy_graphite = false## prefix used for keys sent to statsd (%s for host replacement, must match nsqd)
statsd_prefix = "nsq.%s"## format of statsd counter stats
statsd_counter_format = "stats.counters.%s.count"## format of statsd gauge stats
statsd_gauge_format = "stats.gauges.%s"## time interval nsqd is configured to push to statsd (must match nsqd)
statsd_interval = "60s"## HTTP endpoint (fully qualified) to which POST notifications of admin actions will be sent
notification_http_endpoint = ""## nsqlookupd HTTP addresses
nsqlookupd_http_addresses = ["172.28.15.1:4161"
]## nsqd HTTP addresses (optional)
#nsqd_http_addresses = [
#    "192.168.1.6:4151",
#    "192.168.1.7:4151",
#]

http://www.ppmy.cn/news/263545.html

相关文章

投影仪买那个牌子好?投影仪如何选择

去选购投影仪时&#xff0c;还是不要太关注于品牌。龙生九子&#xff0c;各有不同&#xff0c;一个品牌出品的投影仪也会有不同&#xff0c;各有侧重。因此投影仪们哪个牌子好&#xff1f;这个问题还是先看看自己的实际需求&#xff0c;了解自己的需求了&#xff0c;再看投影仪…

该怎么选择投影仪,家用投影仪那个牌子好?

现在的市场&#xff0c;投影仪逐渐替代了家用彩电。人们更喜欢在家里用大屏观影&#xff0c;体验家庭影院的感觉。那么想选择投影仪时&#xff0c;该怎么选择&#xff0c;有什么推荐吗&#xff1f;最近经常收到这样的信息。 在这里我用极米H3S、当贝F3、大眼橙X11&#xff0c;三…

15. 算法之排序算法

前言 排序是在软件开发中经常遇到的需求。比如基于订单的创建时间倒排&#xff0c;基于金额大小排序等等&#xff0c;那么这些排序底层是怎么写的呢&#xff0c;本节&#xff0c;我们就常用排序算法展开介绍。 1. 冒泡排序 1.1 算法思想 冒泡排序是最基础的排序算法。冒泡排…

【手撕Spring源码】深度理解SpringBoot

文章目录 Tomcat内嵌容器Tomcat 基本结构创建Tomcat内嵌容器内嵌Tomcat集成Spring 容器 Boot 自动配置什么是自动配置类自动配置类原理Aop自动配置DataSource自动配置MyBatis自动配置事务自动配置MVC自动配置条件装配 附&#xff1a;注解小总EnableConfigurationPropertiesCond…

自编码器(Autoencoder)应用场景

自编码器&#xff08;Autoencoder&#xff09;是一种无监督学习模型&#xff0c;通常用于特征学习、降维和生成任务。自编码器由编码器&#xff08;Encoder&#xff09;和解码器&#xff08;Decoder&#xff09;两部分组成。编码器将输入数据压缩成较低维度的隐向量&#xff0c…

神经网络单元测试 1 ~14单元

第1讲 单元测试 ‌1. 被誉为“人工智能之父”的科学家是__图灵_。 ‎ 2 ‌下面说法中&#xff0c;正确的是___我们现在实现的几乎都是弱人工智能_____。 ‏ 3 ‎AI的英文缩写是___ Artificial Intelligence_____。 ‎ 4 ‍研究人工智能的目的是让机器___模拟、延伸和扩展人的智…

【MySQL数据库 | 第五篇】DDl操作数据库

目录 &#x1f914;DDL介绍&#xff1a; &#x1f914;语法详解&#xff1a; 1.查询&#xff1a; 1.查询所有数据库&#xff1a; 示例&#xff1a;查询自带数据库 2.查询当前数据库&#xff1a; 2.创建&#xff1a; 示例&#xff1a;创建一个名字叫做itcast的数据库&…

28. 找出字符串中第一个匹配项的下标

2023.6.7 KMP给我人脑cpu干烧了┭┮﹏┭┮ 第一阶段&#xff1a;最长相等前后缀的引入给暴力解法带来的改善 暴力解法&#xff1a; needle每次向前推进一位&#xff0c;然后判断是否与haystack中的相应字串对应&#xff0c;如图所示 当已知needle中不配对位 前面字串 的 最长相…