1. 背景

分布式跑批数据,但是跑批数据很大的情况下,需要将数据库中的任务分批处理

2. 方案

(1)跑批时每个任务向redis 有序集合中插入记录

redis> zadd redis_zset 100 192.168.0.1

(integer) 1

redis> zadd redis_zset 100 192.168.0.2

(integer) 1

redis>  zadd redis_zset 100 192.168.0.3

(integer) 1

redis> ZRANGE redis_zset 0 -1

1) "192.168.0.1"

2) "192.168.0.2"

3) "192.168.0.3"

redis> zcard redis_zset

(integer) 3

redis> ZREVRANK redis_zset 192.168.0.1

(integer) 2

redis> ZREVRANK redis_zset 192.168.0.2

(integer) 0

redis> ZREVRANK redis_zset 192.168.0.3

(integer) 1

redis zset 命令见 https://www.runoob.com/redis/redis-sorted-sets.html  注意区分ZREVRANK和zrank命令

实际KEY:  redis_zset

  1. 然后等待2s ,有序集合中插入值

  2. 获取每个ip在有序集合中的序号
  3. 按序号分别查询数据库中的数据,分别处理,将数据平均分成三等分

    mysql> select status,count(*) from table1 where status='0';

    +--------+----------+

    | status | count(*) |

    +--------+----------+

    0      |   739119 |

    +--------+----------+

    mysql> select count(*) from table1 where status='0' and mod(id,3)=0;

    +----------+

    | count(*) |

    +----------+

    |   246357 |

    +----------+

    1 row in set (0.33 sec)

    mysql> select count(*) from table1 where status='0' and mod(id,3)=1;

    +----------+

    | count(*) |

    +----------+

    |   246356 |

    +----------+

    1 row in set (0.30 sec)

    mysql> select count(*) from table1 where status='0' and mod(id,3)=2;

    +----------+

    | count(*) |

    +----------+

    |   246357 |

    +----------+

    1 row in set (0.30 sec)

  4. 手动执行时需要注意执行顺序
    192.168.0.1
    192.168.0.2
    192.168.0.3
    

  5. 其他跑批任务也可参考该算法将数据分开执行
  6. 关键代码实现
def load_balancing(self, redis_zset_key, value):
        '''
        mysql负载
        :param redis_zset_key:
        :param value:
        :return:
        '''
        mod_len = self.redisCli.zcard(redis_zset_key)
        mod_index = self.redisCli.zrevrank(redis_zset_key, value)
        return mod_len, mod_index
@staticmethod
    def get_host_ip():
        '''
        获取本机内网ip
        :return:
        '''
        #获取本机电脑名
        myname = socket.getfqdn(socket.gethostname())
        #获取本机ip
        myaddr = socket.gethostbyname(myname)
        return myaddr
sql = self.query_no_check_dlp_file_sql
mod_len, mod_value = self.load_balance()
if mod_len and (mod_value or mod_value==0) and mod_len != 0:
     sql = "%s and %s" % (sql, "mod(id,%s)=%s" % (mod_len, mod_value))
return sql

记录一下

Logo

更多推荐