New perfect server interface

This commit is contained in:
anwinged 2012-04-22 07:06:05 +00:00
parent a32af3237f
commit 493352ac4b
6 changed files with 278 additions and 267 deletions

View File

@ -44,6 +44,7 @@ class MainFrame (wx.Frame):
# WARNING: wxPython code generation isn't supported for this widget yet. # WARNING: wxPython code generation isn't supported for this widget yet.
self.m_params = wxpg.PropertyGridManager(self, self.m_params = wxpg.PropertyGridManager(self,
style = wxpg.PG_TOOLBAR) style = wxpg.PG_TOOLBAR)
self.m_params.AddPage('fp')
bSizer4.Add(self.m_params, 1, wx.EXPAND |wx.ALL, 1) bSizer4.Add(self.m_params, 1, wx.EXPAND |wx.ALL, 1)
bSizer3.Add(bSizer4, 1, wx.EXPAND, 5) bSizer3.Add(bSizer4, 1, wx.EXPAND, 5)

View File

@ -29,14 +29,13 @@ class MainFrame(forms.MainFrame):
def __init__(self): def __init__(self):
forms.MainFrame.__init__(self, None) forms.MainFrame.__init__(self, None)
self.server = s = server.LocalServer() s = server.LocalServer()
self.server.LoadTasksDescriptions() s.LoadModels()
ds = s.GetTasksDescriptions() models = s.GetModels()
models = []
for d in ds:
models.extend(d.GetModelsDescriptions())
model = models[0]
s.Start() s.Start()
self.server = s
model = models[0]
self.m_user_models.Bind(wx.EVT_TREE_SEL_CHANGED, self.m_user_models.Bind(wx.EVT_TREE_SEL_CHANGED,
self.OnModelActivated) self.OnModelActivated)
@ -51,8 +50,6 @@ class MainFrame(forms.MainFrame):
self.Bind(wx.EVT_CLOSE, self.OnClose) self.Bind(wx.EVT_CLOSE, self.OnClose)
self.Bind(wx.EVT_IDLE, self.OnIdle) self.Bind(wx.EVT_IDLE, self.OnIdle)
self.m_params.AddPage('fp')
ov = threading.Thread(target = self.Overseer) ov = threading.Thread(target = self.Overseer)
ov.daemon = 1 ov.daemon = 1
ov.start() ov.start()
@ -66,26 +63,30 @@ class MainFrame(forms.MainFrame):
def Overseer(self): def Overseer(self):
try: try:
um = self.m_user_models
cycle_count = 0
while True: while True:
if True: wx.MutexGuiEnter()
wx.MutexGuiEnter() print 'cycle {:-8} '.format(cycle_count)
#print '-- cycle --' cycle_count += 1
um = self.m_user_models item = um.GetRootItem()
#um.Freeze() while item.IsOk():
item = um.GetRootItem() data = um.GetPyData(item)
while item.IsOk(): if data:
md = um.GetPyData(item) jid = data[1]
job = md.job if md else None
if job and job.IsRunning(): if jid != None and self.server.IsJobChanged(jid):
t = os.path.basename(job.taskd.execpath) tid = self.server.GetJobTID(jid)
p = job.percent * 100 meta = self.server.GetTaskMeta(tid)
#print t, p t = os.path.basename(meta['exec'])
um.SetItemText(item, str(job.GetState()), 1) state = self.server.GetJobState(jid)
um.SetItemText(item, '{}: {:.2F}%'.format(t, p), 2) um.SetItemText(item, str(state[0]), 1)
item = um.GetNext(item) um.SetItemText(item, '{}: {:%}'.format(t, state[1]), 2)
#um.Thaw() print jid, state
wx.MutexGuiLeave()
time.sleep(0.5) item = um.GetNext(item)
wx.MutexGuiLeave()
time.sleep(0.1)
except Exception, e: except Exception, e:
print 'Error in overseer: ', e print 'Error in overseer: ', e
@ -98,9 +99,10 @@ class MainFrame(forms.MainFrame):
data = task.DataDefinition(model) data = task.DataDefinition(model)
child = um.AppendItem(root, 'Default') child = um.AppendItem(root, 'Default')
um.SetPyData(child, data) jid = self.server.CreateJob()
um.SetPyData(child, [data, jid])
def SelectUserModel(self, model_def): def SelectUserModel(self, model_def, jid):
def SelectProperty(param_type): def SelectProperty(param_type):
""" """
@ -142,7 +144,7 @@ class MainFrame(forms.MainFrame):
item = event.GetItem() item = event.GetItem()
data = self.m_user_models.GetPyData(item) data = self.m_user_models.GetPyData(item)
if data: if data:
self.SelectUserModel(data) self.SelectUserModel(data[0], data[1])
def OnParamChanging(self, event): def OnParamChanging(self, event):
#value = event.GetValue() #value = event.GetValue()
@ -159,26 +161,26 @@ class MainFrame(forms.MainFrame):
param = prop.GetClientData() param = prop.GetClientData()
um = self.m_user_models um = self.m_user_models
id = um.GetSelection() id = um.GetSelection()
md = um.GetItemPyData(id) data, jid = um.GetItemPyData(id)
md[param] = value data[param] = value
def OnTest(self, event): def OnTest(self, event):
um = self.m_user_models um = self.m_user_models
id = um.GetSelection() id = um.GetSelection()
md = um.GetItemPyData(id) data, jid = um.GetItemPyData(id)
#wx.MessageBox(md.PackParams())
md.Flush() self.server.LaunchJob(jid, data)
#wx.MessageBox('test')
def OnDuplicate(self, event): def OnDuplicate(self, event):
um = self.m_user_models um = self.m_user_models
id = um.GetSelection() id = um.GetSelection()
title = um.GetItemText(id) title = um.GetItemText(id)
parent = um.GetItemParent(id) parent = um.GetItemParent(id)
md = um.GetItemPyData(id) md, jid = um.GetItemPyData(id)
child = um.AppendItem(parent, title + ' Copy') child = um.AppendItem(parent, title + ' Copy')
um.SetPyData(child, md.Copy()) jid = self.server.CreateJob()
um.SetPyData(child, [md.Copy(), jid])
self.SetStatusText('Copy for "{}" created'.format(title), 0) self.SetStatusText('Copy for "{}" created'.format(title), 0)
def OnIdle(self, event): def OnIdle(self, event):

View File

@ -31,23 +31,19 @@ def WriteToLog(msg):
print msg print msg
def GenerateId(data):
import hashlib
title = data['title']
author = data['author']
id = hashlib.md5(title + author).hexdigest()
return id
class LocalServer: class LocalServer:
""" def __init__(self, conf = 'tasks.conf', workers = 2):
""" """
def __init__(self): """
self.max_workers = 2 self.conf = conf # файл с конфигурацией задач
self.task_descrs = [] self.workers = workers # количество потоков выполнения
self.jobs_queue = [] self.tasks_meta = {} # идентификаор задачи
self.log = None self.models = [] # список моделей
self.running = False self.next_job_id = 0 # очередной идентификатор работы
self.queue_lock = threading.Lock() self.jobs = {} # очередб работ
self.log = None #
self.running = False #
self.queue_lock = threading.Lock()
# init actions # init actions
@ -67,25 +63,14 @@ class LocalServer:
self.log.write(msg + '\n') self.log.write(msg + '\n')
print msg print msg
def Start(self):
self.running = True
for i in xrange(self.max_workers):
worker = Worker(self.jobs_queue, self.queue_lock, self.running)
worker.start()
def Stop(self):
self.running = False
def TestTaskData(self, data): def TestTaskData(self, data):
pass pass
def LoadTasksDescriptions(self, source = 'tasks.conf'): def LoadModels(self):
""" self.tasks_meta = {}
""" self.models = []
self.task_descrs = []
self.WriteToLog('tasks interrogation starts') self.WriteToLog('tasks interrogation starts')
for line in open(source, 'r'): for line in open(self.conf, 'r'):
try: try:
# нормализуем указанный путь # нормализуем указанный путь
line = os.path.normpath(line) line = os.path.normpath(line)
@ -93,14 +78,29 @@ class LocalServer:
# считываем данные через shell (важно для скриптовых языков) # считываем данные через shell (важно для скриптовых языков)
textdata = subprocess.check_output([line, '-i'], shell = True, textdata = subprocess.check_output([line, '-i'], shell = True,
cwd = os.path.dirname(line)) cwd = os.path.dirname(line))
# загружаем данные описания задачи # загружаем данные описания задачи
data = json.loads(textdata) data = json.loads(textdata)
# провряем их на корректность # провряем их на корректность
self.TestTaskData(data) self.TestTaskData(data)
# пакуем все в объект-описание задачи
task_descr = task.TaskDescription(self, line, data) # вычисляем псевдоуникальный идентификатор модели
# добавляем в список описаний tid = hash(data['meta'])
self.task_descrs.append(task_descr) # сохраняем описание задачи
self.tasks_meta[tid] = {
'title': data.get('title', ''),
'author': data.get('author', ''),
'meta': data['meta'],
'exec': line
}
# выделяем описания моделей
ms = data.get('models', {})
for label, data in ms.iteritems():
model_descr = task.DataDescription(None, label, data, tid)
# добавляем в список описаний
self.models.append(model_descr)
self.WriteToLog('Task from "{}" asked'.format(line)) self.WriteToLog('Task from "{}" asked'.format(line))
except IOError, e: except IOError, e:
self.WriteToLog('file "{}" not found'.format(line)) self.WriteToLog('file "{}" not found'.format(line))
@ -109,166 +109,244 @@ class LocalServer:
except ValueError, e: except ValueError, e:
self.WriteToLog('file "{}" not opened, error "{}")'.format(line, e)) self.WriteToLog('file "{}" not opened, error "{}")'.format(line, e))
def GetTasksDescriptions(self): def GetModels(self):
""" return self.models
Return list with task descriptions
""" def GetTaskMeta(self, tid):
return self.task_descrs return self.tasks_meta.get(tid)
#--------------------------------------------------------------------------
def CreateJob(self):
jid = self.next_job_id
self.next_job_id += 1
with self.queue_lock:
self.jobs[jid] = Job()
return jid
def GetJobsCount(self): def GetJobsCount(self):
pass return len(self.jobs)
def GetJob(self, index): def GetJobState(self, jid):
pass job = self.jobs.get(jid)
if job != None:
return job.GetState()
def AddJob(self, taskd, datadump): def IsJobChanged(self, jid):
job = Job(taskd, datadump) job = self.jobs.get(jid)
with self.queue_lock: if job != None:
self.jobs_queue.append(job) return job.IsChanged()
WriteToLog('Job added') else:
return job False
def GetJobResult(self, jid):
job = self.jobs.get(jid)
if job != None:
return job.GetResult()
def GetJobTID(self, jid):
job = self.jobs.get(jid)
if job != None:
return job.tid
def LaunchJob(self, jid, data_def):
job = self.jobs.get(jid)
if job != None:
tid = data_def.DD.tid
datadump = data_def.PackParams()
job.Launch(tid, datadump)
return True
def StopJob(self, jid):
job = self.jobs.get(jid)
if job != None:
job.Stop()
#--------------------------------------------------------------------------
def Start(self):
self.running = True
for i in xrange(self.workers):
worker = Worker(self)
worker.start()
def Stop(self):
self.running = False
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
class Worker(threading.Thread): class Worker(threading.Thread):
number = 0 number = 0
def __init__(self, queue, lock, runflag): def __init__(self, server):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.queue = queue self.server = server
self.lock = lock
self.runflag = runflag
self.daemon = True self.daemon = True
WriteToLog('worker started')
self.id = Worker.number self.id = Worker.number
Worker.number += 1 Worker.number += 1
WriteToLog('worker started')
def FindNextJob(self):
with self.server.queue_lock:
for jid, job in self.server.jobs.iteritems():
# если нашли ожидающую вызова работу
if job.state == JOB_READY:
job.state = JOB_RUNNING # пометим, как запущенную
WriteToLog('Job ({}) found'.format(jid))
return job
return None
def ProcessJob(self, job):
try:
execpath = self.server.GetTaskMeta(job.tid)['exec']
# запускаем процесс на выполнение
proc = subprocess.Popen([execpath, '-r'], shell = True,
stdin = subprocess.PIPE, stdout = subprocess.PIPE,
stderr = subprocess.STDOUT, cwd = os.path.dirname(execpath))
job.proc = proc
# передаем стартовые параметры
proc.stdin.write(job.datadump + '\n')
proc.stdin.flush()
# пока процесс не завершится (или его не прибьют)
while proc.poll() == None:
msg = proc.stdout.readline()
self.ProcessMessage(job, msg)
if not self.server.running:
proc.kill()
raise KeyError
except Exception, e:
WriteToLog('Job loop failed: ' + str(e))
job.Finish(JOB_STOPPED)
else:
job.Finish(JOB_COMPLETED, 1.0)
def ProcessMessage(self, job, msg):
try:
# разбираем полученный ответ
data = json.loads(msg)
# извлекаем оттуда ответ
ans = data['answer']
# ответ получен ок или предупреждение
# записываем значение прогресса, если имеется
if ans == 'ok' or ans == 'warning':
job.percent = data.get('value', 0.0)
# в ответе пришел результат вычислений
# помещаем в секцию результата
elif ans == 'result':
job.result = data['result']
# произошла ошибка
elif ans == 'error':
WriteToLog('Error! ' + msg)
# недокументированный ответ приложения
else:
pass
# возможно, комментарий прольет свет на проблему
job.comment = data.get('comment', '')
# почему изменяем флаг состояния здесь в конце?
# потому как только после правильной обработки сообщения
# мы можем быть уверены, что состояние действительно изменилось
job.ChangeState()
except KeyError as e:
pass
except ValueError as e:
pass
def Cycle(self): def Cycle(self):
job = None
# найти следующее готовое к выполнению задание # найти следующее готовое к выполнению задание
with self.lock: job = self.FindNextJob()
for j in self.queue:
if not j.IsBusy():
job = j
job.SetBusy()
break
# и, если нашли, приступаем к выполнению # и, если нашли, приступаем к выполнению
if job: if job:
WriteToLog("{} started!".format(self.id)) WriteToLog("{} started!".format(self.id))
job.Start(self.runflag) self.ProcessJob(job)
WriteToLog("{} finished!".format(self.id)) WriteToLog("{} finished!".format(self.id))
else: else:
time.sleep(1) time.sleep(1)
def run(self): def run(self):
while True: while True:
if not self.runflag: if not self.server.running:
return return
self.Cycle() self.Cycle()
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
JOB_READY = 0 JOB_READY = 0
JOB_BUSY = 1 JOB_RUNNING = 1
JOB_RUNNING = 2 JOB_STOPPED = 2
JOB_STOPPED = 3 JOB_COMPLETED = 3
JOB_COMPLETED = 4
JOB_DROPPED = 5
class Job: class Job:
def __init__(self, taskd, datadump): def __init__(self):
self.taskd = taskd self.tid = None
self.datad = datadump self.datadump = None
self.state = JOB_READY self.state = JOB_STOPPED # состояние выполнения работы
self.percent = 0.0 self.percent = -1.0 # прогресс (от 0.0 до 1.0 или -1.0)
self.comment = '' self.comment = '' # комментарий к ходу выполнения
self.result = None self.result = None # результат вычислений
self.proc = None self.proc = None # ссылка на субпроцесс
self.client_data = None self.state_id = 0
self.last_state_id = 0
def ProcessMsg(self, msg): def ChangeState(self):
# разбираем полученный ответ self.state_id += 1
data = json.loads(msg)
# извлекаем оттуда ответ
ans = data['answer']
# ответ получен ок или предупреждение
# записываем значение прогресса, если имеется
if ans == 'ok' or ans == 'warning':
self.percent = data.get('value', 0.0)
# в ответе пришел результат вычислений
# помещаем в секцию результата
elif ans == 'result':
self.result = data['result']
# произошла ошибка
elif ans == 'error':
WriteToLog('Error! ' + msg)
# недокументированный ответ приложения
else:
pass
# возможно, комментарий прольет свет на проблему
self.comment = data.get('comment', '')
def GetState(self):
self.last_state_id = self.state_id
return (self.state, self.percent, self.comment)
def Start(self, runflag): def IsChanged(self):
try: return self.state_id != self.last_state_id
self.state = JOB_RUNNING
execpath = self.taskd.execpath
# запускаем процесс на выполнение
self.proc = subprocess.Popen([execpath, '-r'], shell = True,
stdin = subprocess.PIPE, stdout = subprocess.PIPE,
stderr = subprocess.STDOUT, cwd = os.path.dirname(execpath))
# передаем стартовые параметры
istream = self.proc.stdin
ostream = self.proc.stdout
istream.write(self.datad + '\n')
istream.flush()
# пока процесс не завершится (или его не прибьют)
while self.proc.poll() == None:
try:
msg = ostream.readline()
#msg = msg.strip()
self.ProcessMsg(msg)
if not runflag: def Launch(self, tid, datadump):
self.Stop() self.tid = tid
# todo вписать исключения, которые относятся к JSON & dict self.datadump = datadump
except Exception, e: self.state = JOB_READY
#WriteToLog('Income msg failed: ' + str(e)) self.ChangeState()
pass
self.state = JOB_COMPLETED
except Exception, e:
WriteToLog('Job loop failed: ' + str(e))
self.state = JOB_STOPPED
def SetBusy(self):
self.state = JOB_BUSY
def IsBusy(self):
return self.state != JOB_READY
def IsRunning(self):
return self.state == JOB_BUSY or self.state == JOB_RUNNING
def IsFinished(self):
return self.state == JOB_COMPLETED or self.state == JOB_STOPPED
def IsComplete(self):
return self.GetStatus() == JOB_COMPLETE
def Stop(self): def Stop(self):
WriteToLog('Try to kill') WriteToLog('Try to kill')
if self.proc and self.proc.poll() == None: if self.proc and self.proc.poll() == None:
self.proc.kill() self.proc.kill()
self.ChangeState()
WriteToLog('Job killed') WriteToLog('Job killed')
def GetState(self): def Finish(self, state, percent = None):
return self.state self.proc = None
self.state = state
if percent:
self.percent = percent
self.ChangeState()
def GetResult(self): def GetResult(self):
return self.result return self.result
#-------------------------------------------------------------------------------
import time, random
from pprint import pprint
def main(): def main():
pass s = LocalServer(workers = 2)
s.LoadModels()
s.Start()
models = s.GetModels()
model = models[0]
md = task.DataDefinition(model)
md['d'] = 10
md['r'] = 3.14
slots = [ s.CreateJob() for i in xrange(5) ]
for jid in slots:
md['n'] = random.randint(20, 30)
print jid, md['n']
s.LaunchJob(jid, md)
time.sleep(30)
for jid in slots:
pprint(s.GetJobResult(jid))
print ''
if __name__ == '__main__': if __name__ == '__main__':
main() main()

View File

@ -12,29 +12,7 @@
# -*- coding: UTF-8 -*- # -*- coding: UTF-8 -*-
import copy import copy
import json
class TaskDescription:
"""
Description of the task. Task runs on server.
"""
def __init__(self, server, execpath, data):
"""
``server`` is owner of task process
``execpath`` - path to task executable
``data`` is parsed data presentation about models, methods
and meta information
"""
self.server = server
self.execpath = execpath
self.data = data
self.models = []
for label, data in self.data['models'].iteritems():
self.models.append(DataDescription(None, label, data, self))
def GetModelsDescriptions(self):
return self.models
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
@ -68,14 +46,15 @@ class Parameter:
#------------------------------------------------------------------------------- #-------------------------------------------------------------------------------
class DataDescription: class DataDescription:
def __init__(self, parent, label, data, taskd): def __init__(self, parent, label, data, tid):
self.parent = parent self.parent = parent
self.label = label self.label = label
self.data = data self.data = data
self.taskd = taskd self.tid = tid
# создание описаний параметров # создание описаний параметров
self.pdata = self.data.get('params', {}) self.pdata = self.data.get('params', {})
# заменяем текстовое описание на объект-параметр
for label in self.pdata: for label in self.pdata:
par = Parameter(self.pdata[label]) par = Parameter(self.pdata[label])
self.pdata[label] = par self.pdata[label] = par
@ -83,7 +62,7 @@ class DataDescription:
self.specs = [] self.specs = []
# рекурсивное создание описаний спецификаций # рекурсивное создание описаний спецификаций
for label, data in self.data.get('spec', {}).iteritems(): for label, data in self.data.get('spec', {}).iteritems():
self.specs.append(DataDescription(self, label, data, self.taskd)) self.specs.append(DataDescription(self, label, data, self.tid))
def GetLabel(self): def GetLabel(self):
return self.label return self.label
@ -142,50 +121,3 @@ class DataDefinition:
package.reverse() package.reverse()
return json.dumps(package) return json.dumps(package)
def Flush(self):
server = self.DD.taskd.server
datadump = self.PackParams()
self.job = server.AddJob(self.DD.taskd, datadump)
#-------------------------------------------------------------------------------
import server, json, time
from pprint import pprint
def main():
s = server.LocalServer()
s.LoadTasksDescriptions()
ds = s.GetTasksDescriptions()
models = []
for d in ds:
models.extend(d.GetModelsDescriptions())
model = models[0]
mdef = DataDefinition(model)
pprint(mdef.DD.data)
mdef['r'] = 3.14
mdef['n'] = 5
mdef['d'] = 0
mdef2 = mdef.Copy()
mdef2['d'] = 30
p = mdef.PackParams()
pprint(p)
p = mdef2.PackParams()
pprint(p)
#mdef.Flush()
mdef2.Flush()
time.sleep(1)
mdef2.job.Stop()
time.sleep(5)
print mdef2.job.GetState()
print 'RESULT'
#pprint(mdef.job.result)
pprint(mdef2.job.result)
if __name__ == '__main__':
main()

View File

@ -1,6 +1,7 @@
{ {
"title": "Example task", "title": "Example task",
"author": "Anton Vakhrushev", "author": "Anton Vakhrushev",
"meta": "av-example-task",
"models": { "models": {

View File

@ -42,10 +42,7 @@ def result(r):
return json.dumps({ return json.dumps({
"answer": "result", "answer": "result",
"result": { "result": {
"table": { "table": [[ {"x": "double"}, {"y": "double"} ]] + r
"head": [{"x": "double"}, {"y": "double"}],
"body": r
}
} }
}) })
@ -81,7 +78,7 @@ def main():
res.append([l, y]) res.append([l, y])
write(answer(l / r)) write(answer(l / r))
l += h l += h
time.sleep(0.2) #time.sleep(0.1)
write(result(res)) write(result(res))