分布式跑批数据方案
1. 背景分布式跑批数据,但是跑批数据很大的情况下,需要将数据库中的任务分批处理2. 方案(1)跑批时每个任务向redis 有序集合中插入记录redis> zadd cy_test100192.168.0.1(integer)1redis> zadd cy_test100192.168.0.2(integer)1redis> zadd cy_test100192.168.0.3(
1. 背景
分布式跑批数据,但是跑批数据很大的情况下,需要将数据库中的任务分批处理
2. 方案
(1)跑批时每个任务向redis 有序集合中插入记录
|
redis zset 命令见 https://www.runoob.com/redis/redis-sorted-sets.html 注意区分ZREVRANK和zrank命令
实际KEY: redis_zset
-
然后等待2s ,有序集合中插入值
- 获取每个ip在有序集合中的序号
-
按序号分别查询数据库中的数据,分别处理,将数据平均分成三等分
mysql> select status,count(*) from table1 where status=
'0'
;
+--------+----------+
| status | count(*) |
+--------+----------+
|
0
|
739119
|
+--------+----------+
mysql> select count(*) from table1 where status=
'0'
and mod(id,
3
)=
0
;
+----------+
| count(*) |
+----------+
|
246357
|
+----------+
1
row in set (
0.33
sec)
mysql> select count(*) from table1 where status=
'0'
and mod(id,
3
)=
1
;
+----------+
| count(*) |
+----------+
|
246356
|
+----------+
1
row in set (
0.30
sec)
mysql> select count(*) from table1 where status=
'0'
and mod(id,
3
)=
2
;
+----------+
| count(*) |
+----------+
|
246357
|
+----------+
1
row in set (
0.30
sec)
- 手动执行时需要注意执行顺序
192.168.0.1
192.168.0.2
192.168.0.3
- 其他跑批任务也可参考该算法将数据分开执行
- 关键代码实现
def load_balancing(self, redis_zset_key, value):
'''
mysql负载
:param redis_zset_key:
:param value:
:return:
'''
mod_len = self.redisCli.zcard(redis_zset_key)
mod_index = self.redisCli.zrevrank(redis_zset_key, value)
return mod_len, mod_index
@staticmethod
def get_host_ip():
'''
获取本机内网ip
:return:
'''
#获取本机电脑名
myname = socket.getfqdn(socket.gethostname())
#获取本机ip
myaddr = socket.gethostbyname(myname)
return myaddr
sql = self.query_no_check_dlp_file_sql
mod_len, mod_value = self.load_balance()
if mod_len and (mod_value or mod_value==0) and mod_len != 0:
sql = "%s and %s" % (sql, "mod(id,%s)=%s" % (mod_len, mod_value))
return sql
记录一下
更多推荐
所有评论(0)