As we saw in a previous blog post, OSSEC is UDP based. This is great for performance, and can scale to 1000s of nodes. However, it means there is an inherent problem of reliability. UDP is a connection-less protocol, hence the OSSEC agent has no guaranteed way of knowing that a particular event has been delivered to the OSSEC server. Instead, the architecture relies on heartbeats and keepalives. However, there is still a potential for lost events no matter how short the interval between keepalives. In this article we explore a simple python based broker solution that introduces some (but not complete) reliability into the OSSEC architecture, at the cost of performance.
The first requirement of the broker solution is that it absolutely does not touch any existing code from the current OSSEC solution. It must interfere as little as possible with the current solution, so that if there any updates or changes in OSSEC the broker can either continue to work as normal, or at least be removed and allow OSSEC to work as originally intended. To achieve this, the broker is also going to be split into two components: a TCP server which is installed on the same machine as the OSSEC server, and a proxy-like solution which is installed on the same machine as the OSSEC client.
The general idea is that the OSSEC client is configured to send it’s traffic to 127.0.0.1 rather than directly to the server. The broker client intercepts the UDP packets (which are kept encrypted and compressed, maintaining end to end security), and before sending them on to the OSSEC server, it checks via TCP (reliably) if the broker server is still reachable and if the ossec-remoted process is still alive. If the broker server responds, the the broker client “releases” the packets and forwards them on to the original OSSEC server. If no answer is received from the broker server, the broker client assumes the server is down and buffers the original UDP packets into a queue. After a while, the OSSEC agent will realise the server is down and pause operations (other than keepalives) When the server comes back online the broker client replays back all the packets that have been buffered, so no events would be lost. The general architecture is as follows:

Starting from the client, we have the following code, commented so one can follow along:
### example is taken and modified from python doc website from socketserver | |
# server.py | |
# to run the example: $ python server.py | |
#!/usr/bin/python | |
import threading | |
import socket | |
import SocketServer | |
import multiprocessing | |
# This class handles UDP requests that come in on the server. Here we do the main work | |
# of the program, that is, we check if the OSSEC server is alive and processing logs | |
# Each thread sends a TCP request to the OSSEC BROKER SERVER, and if it gets a reply, | |
# forwards the original UDP packet from the OSSEC agent to the OSSEC server | |
# | |
# If it fails, i.e. gets no answer, then it places the data into a queue for later replay | |
# | |
# If it succeeds, i.e. it gets an answer from the OSSEC BROKER SERVER, it sends the packet onwards, | |
# and then waits for 5 seconds for a potential reply from the server. If it gets a reply, this is | |
# forwarded back to the agent. Otherwise the thread is killed off to prevent leakage | |
class ThreadedUDPRequestHandler( | |
SocketServer.BaseRequestHandler): | |
def handle(self): | |
# queue is defined as global so that it is accessbile across all threads | |
global queue | |
# get sent data, and remove extra spaces | |
data = self.request[0].strip() | |
# get port number | |
port = self.client_address[1] | |
# get the communicate socket | |
client_socket = self.request[1] | |
### get client host ip address | |
client_address = (self.client_address[0]) | |
### proof of multithread – more for debug than anything else | |
cur_thread = threading.current_thread() | |
print "thread %s" %cur_thread.name | |
print "received call from client address :%s" %client_address | |
print "received data from port [%s]: %s" %(port,data) | |
# try to connect to OSSEC BROKER SERVER | |
try: | |
broker_server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
broker_server.settimeout(5) | |
## note the hardcoded IP address, this should be changed | |
broker_server.connect(("192.168.10.177", 1514)) | |
print "sending PING to broker ???" | |
# send a ping to the broker server | |
broker_server.send("ping") | |
# get the response | |
broker_response = broker_server.recv(1024) | |
broker_server.close() | |
# if the broker server connection fails, put the original OSSEC agent packet into | |
# an in-memory queue, and exit | |
except: | |
print "did not get PONG from server —- " | |
queue.put(data) | |
return | |
# assuming we got a response, we check if all is ok | |
# if all is ok, the server will respond with "pong" | |
if broker_response == "pong": | |
# if the server responded, we first check if there are packets in the queue waiting to be sent | |
# if there are, we send them out to the OSSEC server over UDP | |
while not queue.empty(): | |
# we have messages to replay | |
replay_data = queue.get() | |
print "replaying data on to server ————>" | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
s.sendto(replay_data, ('192.168.10.177',1514)) | |
print "got PONG from broker ***" | |
# once the queue has been emptied, we continue on to send | |
# the original packet to the OSSEC server | |
try: | |
print "sending data on to server ————>" | |
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
# OSSEC agent doesnt always expect an answer back from the server, so we set a very short timeout | |
# this may need to be tuned if used over a noisy or high latnecy network link | |
s.settimeout(5) | |
s.sendto(data, ('192.168.10.177',1514)) | |
### assemble a response message to client | |
data, addr = s.recvfrom(32768) | |
print "sending data on to client <————" | |
client_socket.sendto(data, self.client_address) | |
# if socket timeout, it means that the agent sent a packet which did not require a response, move along | |
except socket.timeout: | |
print "timeout….moving on" | |
# if the server doesn't send "PONG", it means the server is rechable but ossec remoted is down, so again place the | |
# packet into a queue | |
else: | |
print "did not get PONG from server —- " | |
# place into queue for later replay | |
queue.put(data) | |
class ThreadedUDPServer( | |
SocketServer.ThreadingMixIn, | |
SocketServer.UDPServer): | |
pass | |
if __name__ == "__main__": | |
# first setup the info for the connections to be made: host and port | |
# these should always be localhost and 1514 respectively | |
HOST, PORT = "localhost", 1514 | |
global queue | |
queue = multiprocessing.Queue() | |
# second, startup the Threaded UDP Server on above defined host and port | |
# and define that ThreadedUDPRequestHandler will handle requests | |
server = ThreadedUDPServer((HOST, PORT), | |
ThreadedUDPRequestHandler) | |
# serve forever – as in keep on serving requests, do not kill off after one | |
server.serve_forever() | |
# Start a thread with the server — | |
# that thread will then start one | |
# more thread for each request | |
server_thread = threading.Thread(target=server.serve_forever) | |
# Exit the server thread when the main thread terminates | |
server_thread.daemon = True | |
server_thread.start() | |
server.shutdown() |
The server is significantly simpler, shown below:
import SocketServer | |
import subprocess | |
class TCPHandler(SocketServer.BaseRequestHandler): | |
def handle(self): | |
# self.request is the TCP socket connected to the client | |
self.data = self.request.recv(1024).strip() | |
# for logging purposes, write connection information to screen | |
print "Connection from: %s : %s" % (self.client_address[0], self.data) | |
# we should receive a simple ping, if it's anything else, it's probably not for us… | |
if self.data == "ping": | |
# check if the ossec-remoted process still is alive… | |
proc = subprocess.Popen(['ps', '-elf'], stdout=subprocess.PIPE) | |
processes_list=proc.communicate()[0] | |
# if we find the process named "ossec-remoted" in the list, it should be all fine, so in that | |
# case we send back a pong, otherwise not… | |
if "ossec-remoted" in processes_list: | |
# just send back pong | |
self.request.sendall("pong") | |
else: | |
self.request.sendall("ossec_not_found") | |
if __name__ == "__main__": | |
# main program loop, simply server the threaded TCP server forever… | |
HOST, PORT = "0.0.0.0", 1514 | |
# Create the server, binding to localhost on port 1514 | |
server = SocketServer.TCPServer((HOST, PORT), TCPHandler) | |
# Activate the server; this will keep running until you | |
# interrupt the program with Ctrl-C | |
server.serve_forever() |
Kicking the tires and testing
We use the same troubleshooting and techniques we used in the previous blog post.
First we setup the server, which is also quite straightforward. We just run the ossec_broker_server.py file, and of course ensure that the ossec process is actually running properly. Next, the client. We start off by starting the python client on the windows machine (assuming python is installed), and pointing the OSSEC agent to 127.0.0.1:
We immediately see some output on the ossec broker client, something like so:
We should also check the OSSEC agent logs to make sure it connected successfully to 127.0.0.1:
So far so good… we have communication between the OSSEC agent and the OSSEC server, through the broker. Now, time to test a network interruption. If we simply stop the ossec broker server (simulating such an interruption), we should see the OSSEC agent fail to keep communicating with the OSSEC server:
Now, during this interruption (but before the agent keepalives force a lock on the event viewer, so within a minute in default installs…) we generate some events:
These events would normally be lost, because the agent has not yet had time to realise there is a disconnection. So we now turn the server back on, and check the OSSEC archive logs to check if the above events were delivered anyways:
Success! 🙂 There are some improvements to be made, but the principle is sound, if one can look past the added overhead introduced to accommodate reliability.