Algoritmo de Bully
Em sistemas distribuídos, diversos algoritmos necessitam que um processo funcione como coordenador, inicializador, sequenciador, enfirm, ter um papel especial. No caso de falha deste processo coordenador, pode acontecer o comprometimento das tarefas sendo realizadas, então um novo coordenador deve assumir. Para a escolha do novo coordenador existem alguns algoritmos especiais, chamados algoritmos de eleição. Um destes algoritmos é o Algoritmo de Bully. O algoritmo de Bully, desenvolvido por Garcia-Mollina em 1982, assume que um processo sabe a prioridade de todos os outros processos no sistema. Quando um processo Pi detecta que o coordenador não está mais respondendo a um pedido de serviço, ele inicia uma eleição da seguinte forma:
- Pi envia uma mensagem de Eleição para todos os processos com prioridade maior do que a sua.
- Se nenhum processo responde, Pi vence a eleição e torna-se o coordenador, avisando a todos os outros processos com menor prioridade que ele é o novo coordenador.
A implementação
Este código foi desenvolvido para uma disciplina de Sistemas Distribuídos de uma Pós-Graduação que cursei. Foi desenvolvido usando-se sockets Unix, assim o PID é o valor da prioridade, quanto maior o PID maior a prioridade. Para testar é só executar o script várias vezes e depois matar um deles. Os outros processos iniciarão a eleição na procura pelo processo com maior prioridade (PID maior do que o seu). O processo com PID maior assumirá como coordenador e avisará aos outros que a eleição acabou e que ele é o novo coordenador. O curioso foi que o professor não queria aceitar o programa porque não era desenvolvido em C, a linguagem que ele solicitou. Depois de mostrar o código e o programa em execução ele mudou de idéia e aceitou o programa. E eu tirei A. Interessante exemplo de programação científica usando python.
O código
1 #!/usr/bin/python
2 import socket
3 import os
4 import sys
5
6 ## Variaveis globais
7 mySock = 0
8 ## Path onde estao os sockets
9 SPATH='/tmp/bully'
10
11 ## Funcao q manda mensagem
12 def sendMsg(dest,msg):
13 global mySock
14 try:
15 ns = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
16 ns.connect((dest))
17 ns.send(msg+';'+mySock)
18 ns.close()
19 return 1
20 except:
21 return 0
22 ## Funcao q fica se comunicando com o coordenador para ver se esta ativo
23 def verificaCoord(coord):
24 global SPATH
25 global mySock
26
27 while True:
28 ## Verifica se o coordenador esta ativo
29 status = sendMsg(coord,'1')
30 if status == 0:
31 print 'Coodenador parado '+str(coord)+'. Iniciando Eleicao'
32 list = os.listdir(SPATH)
33 for i in list:
34 if int(i) > int(mySock):
35 status = sendMsg(i,'E')
36 if status == 0:
37 print 'Erro enviando E ao processo '+i
38 break
39 else:
40 print 'Conectou no coordenador '+coord
41 return 0
42
43 def main():
44 ## Variaveis
45 fCord = 1
46 BUF=1024
47
48 global mySock
49 global SPATH
50 try:
51 ## Se recebeu um argumento he um processo recussitado
52 mySock = sys.argv[1]
53 ## Testa se existem processos com prioridade maior
54 list = os.listdir(SPATH)
55 for i in list:
56 if int(i) > int(mySock):
57 status = sendMsg(i,'1')
58 if status == 1:
59 fCord = 0 ## Se houver processos maiores, nao he coordenador
60 continue
61 ## Se for o maior anuncia a todos
62 if fCord == 1:
63 for i in list:
64 if int(i) < int(mySock):
65 status = sendMsg(i,'C')
66 except:
67 ## Nome do socket = pid do processo
68 mySock=str(os.getpid())
69 fCord = 0
70 print 'Iniciando. PID='+str(mySock)
71 ## Verifica se existe o diretorio para os sockets
72 try:
73 os.chdir(SPATH)
74 except:
75 ## Se nao existe cria o diretorio
76 os.mkdir(SPATH)
77 try:
78 ## Cria o socket
79 s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
80 s.bind((mySock)) ## Atribui um nome ao socket
81 s.listen(1) ## Estabelece fila para conexoes
82 ## Inicialmente o menor PID he o coordenador
83 list = os.listdir(SPATH)
84 ## Ordena os pids e pega o primeiro
85 list.sort(cmp)
86 if list:
87 COORD=list[0]
88 else:
89 COORD=mySock
90 ## Nao he o coordenador
91 if COORD != mySock and fCord == 0:
92 ## Divide o processo
93 childPID = os.fork()
94 if childPID == 0:
95 ## O filho permanece testando o coordenador
96 verificaCoord(COORD)
97 #sys.exit(1)
98 else:
99 COORD = mySock
100 print 'Eu sou o coordenador '+str(mySock)
101 ## Todos os processos pais permanecem em wait, aguardando conexao
102 conn, addr = s.accept()
103 data = conn.recv(BUF)
104 while True:
105 if data:
106 msg = data.split(';')
107 ## Se a mensagem recebida for de eleicao
108 if msg[0] == 'E':
109 print 'Recebido mensagem '+msg[0]+' de PID='+str(msg[1])
110 ## Enviar msg para pid menor parar eleicao
111 sendMsg(msg[1],'PE')
112 #procura se tem alguem maior q ele
113 list = os.listdir(SPATH)
114 maior = 1
115 for i in list:
116 if int(i) > int(mySock):
117 #print 'Enviar para PID '+i
118 status = sendMsg(i,'E')
119 if status == 1:
120 maior = 0
121 #se for o maior envia msg para todos como novo coord.
122 if maior == 1:
123 print 'Eu sou o novo Coordenador '+str(mySock)
124 os.kill(childPID,9)
125 for i in list:
126 if int(i) < int(mySock):
127 print 'enviando C para '+str(i)
128 send = sendMsg(i,'C')
129
130 ## Recebeu msg para parar eleicao
131 if msg[0] == 'PE':
132 print 'Recebida mensagem '+msg[0]+' de PID='+str(msg[1])
133 ## Recebeu msg de verificacao de conexao
134 #if msg[0] == '1':
135 # print 'Recebida mensagem '+msg[0]+' de PID='+str(msg[1])
136 ## Recebeu msg de coordenador
137 if msg[0] == 'C':
138 print 'Novo coordenador '+msg[1]
139 os.kill(childPID,9)
140 childPID = os.fork()
141 if childPID == 0:
142 ## Novo filho gerado monitora o novo coordenador
143 verificaCoord(msg[1])
144 #sys.exit(1)
145 data = conn.recv(BUF)
146 else:
147 conn, addr = s.accept()
148 data = conn.recv(BUF)
149 except:
150 print "Unexpected error:", sys.exc_info()[0]
151 s.close()
152 os.remove(SPATH+'/'+str(mySock))
153 print 'Processo '+str(mySock)+' parando'
154
155
156 ## Invoca a funcao main na inicializacao do programa
157 if __name__ == '__main__':
158 main()
Links