Algoritmo de Bully
Em sistemas distribuídos, diversos algoritmos necessitam que um processo funcione como coordenador, inicializador, sequenciador, enfirm, ter um papel especial. No caso de falha deste processo coordenador, pode acontecer o comprometimento das tarefas sendo realizadas, então um novo coordenador deve assumir. Para a escolha do novo coordenador existem alguns algoritmos especiais, chamados algoritmos de eleição. Um destes algoritmos é o Algoritmo de Bully. O algoritmo de Bully, desenvolvido por Garcia-Mollina em 1982, assume que um processo sabe a prioridade de todos os outros processos no sistema. Quando um processo Pi detecta que o coordenador não está mais respondendo a um pedido de serviço, ele inicia uma eleição da seguinte forma:
- Pi envia uma mensagem de Eleição para todos os processos com prioridade maior do que a sua.
- Se nenhum processo responde, Pi vence a eleição e torna-se o coordenador, avisando a todos os outros processos com menor prioridade que ele é o novo coordenador.
A implementação
Este código foi desenvolvido para uma disciplina de Sistemas Distribuídos de uma Pós-Graduação que cursei. Foi desenvolvido usando-se sockets Unix, assim o PID é o valor da prioridade, quanto maior o PID maior a prioridade. Para testar é só executar o script várias vezes e depois matar um deles. Os outros processos iniciarão a eleição na procura pelo processo com maior prioridade (PID maior do que o seu). O processo com PID maior assumirá como coordenador e avisará aos outros que a eleição acabou e que ele é o novo coordenador. O curioso foi que o professor não queria aceitar o programa porque não era desenvolvido em C, a linguagem que ele solicitou. Depois de mostrar o código e o programa em execução ele mudou de idéia e aceitou o programa. E eu tirei A. Interessante exemplo de programação científica usando python.
O código
1 #!/usr/bin/python
2 import socket
3 import os
4 import exceptions
5 import sys
6
7 ## Variaveis globais
8 mySock = 0
9 ## Path onde estao os sockets
10 SPATH='/tmp/bully'
11
12 ## Funcao q manda mensagem
13 def sendMsg(dest,msg):
14 global mySock
15 try:
16 ns = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
17 ns.connect((dest))
18 ns.send(msg+';'+mySock)
19 ns.close()
20 return 1
21 except:
22 return 0
23 ## Funcao q fica se comunicando com o coordenador para ver se esta ativo
24 def verificaCoord(coord):
25 global SPATH
26 global mySock
27
28 while 1:
29 ## Verifica se o coordenador esta ativo
30 status = sendMsg(coord,'1')
31 if status == 0:
32 print 'Coodenador parado '+str(coord)+'. Iniciando Eleicao'
33 list = os.listdir(SPATH)
34 for i in list:
35 if int(i) > int(mySock):
36 status = sendMsg(i,'E')
37 if status == 0:
38 print 'Erro enviando E ao processo '+i
39 break
40 else:
41 print 'Conectou no coordenador '+coord
42 return 0
43
44 def main():
45 ## Variaveis
46 fCord = 1
47 BUF=1024
48
49 global mySock
50 global SPATH
51 try:
52 ## Se recebeu um argumento he um processo recussitado
53 mySock = sys.argv[1]
54 ## Testa se existem processos com prioridade maior
55 list = os.listdir(SPATH)
56 for i in list:
57 if int(i) > int(mySock):
58 status = sendMsg(i,'1')
59 if status == 1:
60 fCord = 0 ## Se houver processos maiores, nao he coordenador
61 continue
62 ## Se for o maior anuncia a todos
63 if fCord == 1:
64 for i in list:
65 if int(i) < int(mySock):
66 status = sendMsg(i,'C')
67 except:
68 ## Nome do socket = pid do processo
69 mySock=str(os.getpid())
70 fCord = 0
71 print 'Iniciando. PID='+str(mySock)
72 ## Verifica se existe o diretorio para os sockets
73 try:
74 os.chdir(SPATH)
75 except:
76 ## Se nao existe cria o diretorio
77 os.mkdir(SPATH)
78 try:
79 ## Cria o socket
80 s = socket.socket(socket.AF_UNIX,socket.SOCK_STREAM)
81 s.bind((mySock)) ## Atribui um nome ao socket
82 s.listen(1) ## Estabelece fila para conexoes
83 ## Inicialmente o menor PID he o coordenador
84 list = os.listdir(SPATH)
85 ## Ordena os pids e pega o primeiro
86 list.sort(cmp)
87 if list:
88 COORD=list[0]
89 else:
90 COORD=mySock
91 ## Nao he o coordenador
92 if COORD != mySock and fCord == 0:
93 ## Divide o processo
94 childPID = os.fork()
95 if childPID == 0:
96 ## O filho permanece testando o coordenador
97 verificaCoord(COORD)
98 #sys.exit(1)
99 else:
100 COORD = mySock
101 print 'Eu sou o coordenador '+str(mySock)
102 ## Todos os processos pais permanecem em wait, aguardando conexao
103 conn, addr = s.accept()
104 data = conn.recv(BUF)
105 while 1:
106 if data:
107 msg = data.split(';')
108 ## Se a mensagem recebida for de eleicao
109 if msg[0] == 'E':
110 print 'Recebido mensagem '+msg[0]+' de PID='+str(msg[1])
111 ## Enviar msg para pid menor parar eleicao
112 sendMsg(msg[1],'PE')
113 #procura se tem alguem maior q ele
114 list = os.listdir(SPATH)
115 maior = 1
116 for i in list:
117 if int(i) > int(mySock):
118 #print 'Enviar para PID '+i
119 status = sendMsg(i,'E')
120 if status == 1:
121 maior = 0
122 #se for o maior envia msg para todos como novo coord.
123 if maior == 1:
124 print 'Eu sou o novo Coordenador '+str(mySock)
125 os.kill(childPID,9)
126 for i in list:
127 if int(i) < int(mySock):
128 print 'enviando C para '+str(i)
129 send = sendMsg(i,'C')
130
131 ## Recebeu msg para parar eleicao
132 if msg[0] == 'PE':
133 print 'Recebida mensagem '+msg[0]+' de PID='+str(msg[1])
134 ## Recebeu msg de verificacao de conexao
135 #if msg[0] == '1':
136 # print 'Recebida mensagem '+msg[0]+' de PID='+str(msg[1])
137 ## Recebeu msg de coordenador
138 if msg[0] == 'C':
139 print 'Novo coordenador '+msg[1]
140 os.kill(childPID,9)
141 childPID = os.fork()
142 if childPID == 0:
143 ## Novo filho gerado monitora o novo coordenador
144 verificaCoord(msg[1])
145 #sys.exit(1)
146 data = conn.recv(BUF)
147 else:
148 conn, addr = s.accept()
149 data = conn.recv(BUF)
150 except:
151 print "Unexpected error:", sys.exc_info()[0]
152 s.close()
153 os.remove(SPATH+'/'+str(mySock))
154 print 'Processo '+str(mySock)+' parando'
155
156
157 ## Invoca a funcao main na inicializacao do programa
158 if __name__ == '__main__':
159 main()
Links