当前位置: 首页 > >

python作业图书管理系统-Python使用Redis实现作业调度系统(超简单)

发布时间:

概述


Redis是一个开源,先进的key-value存储,并用于构建高性能,可扩展的Web应用程序的完美解决方案。


Redis从它的许多竞争继承来的三个主要特点:


Redis数据库完全在内存中,使用磁盘仅用于持久性。


相比许多键值数据存储,Redis拥有一套较为丰富的数据类型。


Redis可以将数据复制到任意数量的从服务器。


Redis 优势


异常快速:Redis的速度非常快,每秒能执行约11万集合,每秒约81000+条记录。


支持丰富的数据类型:Redis支持最大多数开发人员已经知道像列表,集合,有序集合,散列数据类型。这使得它非常容易解决各种各样的问题,因为我们知道哪些问题是可以处理通过它的数据类型更好。


操作都是原子性:所有Redis操作是原子的,这保证了如果两个客户端同时访问的Redis服务器将获得更新后的值。


多功能实用工具:Redis是一个多实用的工具,可以在多个用例如缓存,消息,队列使用(Redis原生支持发布/订阅),任何短暂的数据,应用程序,如Web应用程序会话,网页命中计数等。


步入主题:


Redis作为内存数据库的一个典型代表,已经在很多应用场景中被使用,这里仅就Redis的pub/sub功能来说说怎样通过此功能来实现一个简单的作业调度系统。这里只是想展现一个简单的想法,所以还是有很多需要考虑的东西没有包括在这个例子中,比如错误处理,持久化等。


下面是实现上的想法


MyMaster:集群的master节点程序,负责产生作业,派发作业和获取执行结果。


MySlave:集群的计算节点程序,每个计算节点一个,负责获取作业并运行,并将结果发送会master节点。


channel CHANNEL_DISPATCH:每个slave节点订阅一个channel,比如“CHANNEL_DISPATCH_[idx或机器名]”,master会向此channel中publish被dispatch的作业。


channel CHANNEL_RESULT:用来保存作业结果的channel,master和slave共享此channel,master订阅此channel来获取作业运行结果,每个slave负责将作业执行结果发布到此channel中。


Master代码


#!/usr/bin/env python


# -*- coding: utf-8 -*-


import time


import threading


import random


import redis


REDIS_HOST = "localhost"


REDIS_PORT = 6379


REDIS_DB = 0


CHANNEL_DISPATCH = "CHANNEL_DISPATCH"


CHANNEL_RESULT = "CHANNEL_RESULT"


class MyMaster():


def __init__(self):


pass


def start(self):


MyServerResultHandleThread().start()


MyServerDispatchThread().start()


class MyServerDispatchThread(threading.Thread):


def __init__(self):


threading.Thread.__init__(self)


def run(self):


r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)


for i in range(1, 100):


channel = CHANNEL_DISPATCH + "_" + str(random.randint(1, 3))


print("Dispatch job %s to %s" % (str(i), channel))


ret = r.publish(channel, str(i))


if ret == 0:


print("Dispatch job %s failed." % str(i))


time.sleep(5)


class MyServerResultHandleThread(threading.Thread):


def __init__(self):


threading.Thread.__init__(self)


def run(self):


r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)


p = r.pubsub()


p.subscribe(CHANNEL_RESULT)


for message in p.listen():


if message["type"] != "message":


continue


print("Received finished job %s" % message["data"])


if __name__ == "__main__":


MyMaster().start()


time.sleep(10000)


说明


MyMaster类 - master主程序,用来启动dispatch和resulthandler的线程


MyServerDispatchThread类 - 派发作业线程,产生作业并派发到计算节点


MyServerResultHandleThread类 - 作业运行结果处理线程,从channel里获取作业结果并显示


Slave代码


#!/usr/bin/env python


# -*- coding: utf-8 -*-


from datetime import datetime


import time


import threading


import random


import redis


REDIS_HOST = "localhost"


REDIS_PORT = 6379


REDIS_DB = 0


CHANNEL_DISPATCH = "CHANNEL_DISPATCH"


CHANNEL_RESULT = "CHANNEL_RESULT"


class MySlave():


def __init__(self):


pass


def start(self):


for i in range(1, 4):


MyJobWorkerThread(CHANNEL_DISPATCH + "_" + str(i)).start()


class MyJobWorkerThread(threading.Thread):


def __init__(self, channel):


threading.Thread.__init__(self)


self.channel = channel


def run(self):


r = redis.StrictRedis(host=REDIS_HOST, port=REDIS_PORT, db=REDIS_DB)


p = r.pubsub()


p.subscribe(self.channel)


for message in p.listen():


if message["type"] != "message":


continue


print("%s: Received dispatched job %s " % (self.channel, message["data"]))


print("%s: Run dispatched job %s " % (self.channel, message["data"]))


time.sleep(2)


print("%s: Send finished job %s " % (self.channel, message["data"]))


ret = r.publish(CHANNEL_RESULT, message["data"])


if ret == 0:


print("%s: Send finished job %s failed." % (self.channel, message["data"]))


if __name__ == "__main__":


MySlave().start()


time.sleep(10000)


说明


MySlave类 - slave节点主程序,用来启动MyJobWorkerThread的线程


MyJobWorkerThread类 - 从channel里获取派发的作业并将运行结果发送回master


测试


首先运行MySlave来定义派发作业channel。


然后运行MyMaster派发作业并显示执行结果。


有关Python使用Redis实现作业调度系统(超简单),小编就给大家介绍这么多,希望对大家有所帮助!






相关资源:用Redis做一个简单的消息队列



友情链接: