Analyzers¶
Analyzers are the attack signatures that power Grapl’s realtime detection logic.
Though implementing analyzers is simple, we can build extremely powerful and efficient logic to catch all sorts of attacker behaviors.
The Analyzer Base Class¶
To implement an Analyzer we must inherit from the Analyzer abstract base class.
A = TypeVar("A", bound="Analyzer")
class Analyzer(abc.ABC):
def __init__(self, dgraph_client: GraphClient) -> None:
self.dgraph_client = dgraph_client
@classmethod
def build(cls: Type[A], dgraph_client: GraphClient) -> A:
return cls(dgraph_client)
@abc.abstractmethod
def get_queries(self) -> OneOrMany[Queryable]:
pass
@abc.abstractmethod
def on_response(self, response: Viewable, output: Any):
pass
Analyzer.build¶
Returns an instance of your analyzer. This allows you to move dependency management
out of your __init__
.
cls
- the Class for your analyzer, which you should use for construction.
graph_client
- an instance of a GraphClient
@classmethod
def build(cls: Type[A], graph_client: GraphClient) -> A:
return cls(dgraph_client)
Analyzer.get_queries¶
get_queries
is where you define any of your graph signatures, either one or multiple.
All queries returned must have the same type for the root node.
returns
- all signatures to be matched against.
@abc.abstractmethod
def get_queries(self) -> OneOrMany[Queryable]:
pass
Analyzer.on_response¶
on_response
is called if any of the sigantures from get_queries
matched a graph.
This method is where you can perform any subsequent logic that you couldn’t fit into your query, such as hitting an external threatfeed API, performing a count, etc.
response
- Guaranteed to be the Viewable type associated with the Queryable(s) returned
by get_queries
output
- Provides a send
method that takes an ExecutionHit
@abc.abstractmethod
def on_response(self, response: Viewable, output: Any):
pass
SuspiciousSvchost Example¶
Heres an example - we’re going to write some logic to look for suspicious executions
of svchost
.
class SuspiciousSvchost(Analyzer):
def get_queries(self) -> OneOrMany[ProcessQuery]:
invalid_parents = [
Not("services.exe"),
Not("smss.exe"),
Not("ngentask.exe"),
Not("userinit.exe"),
Not("GoogleUpdate.exe"),
Not("conhost.exe"),
Not("MpCmdRun.exe"),
]
return (
ProcessQuery()
.with_process_name(eq=invalid_parents)
.with_children(
ProcessQuery().with_process_name(eq="svchost.exe")
)
)
def on_response(self, response: ProcessView, output: Any):
output.send(
ExecutionHit(
analyzer_name="Suspicious svchost",
node_view=response,
risk_score=75,
)
)
We’ve got a very straightforward Analyzer here. We don’t need any custom build
or init,
and our on_response
contains no logic other than sending out an ExecutionHit.
def get_queries(self) -> OneOrMany[ProcessQuery]:
invalid_parents = [
Not("services.exe"),
Not("smss.exe"),
Not("ngentask.exe"),
Not("userinit.exe"),
Not("GoogleUpdate.exe"),
Not("conhost.exe"),
Not("MpCmdRun.exe"),
]
return (
ProcessQuery()
.with_process_name(eq=invalid_parents)
.with_children(
ProcessQuery().with_process_name(eq="svchost.exe")
)
)
The query is straightforward. We have a curated whitelist of parent processes for svchost.exe.
Any process that is not one of those is considered “invalid”.
ProcessQuery() # Any Process
.with_process_name(eq=invalid_parents) # With an invalid parent process name
.with_children( # With any child processes
ProcessQuery()
.with_process_name(eq="svchost.exe") # With the process name "svchost.exe".
)
Our query is therefor read as:
Any Process, with a process_name that exactly matches invalid_parents
, with any child process,
where the child process_name that exactly matches svchost.exe
.
Adding Context¶
We may want to add some optional context to our query, without requiring that context
for our Analyzer to match. We can do this easily in our on_response
implenentation.
In the on_response
method the response
is going to be the root node of what our query
matched - in our case, this will be some invalid parent of svchost.exe.
Some interesting context might be to get the binary path of that svchost.exe and the parent process of our invalid_parent.
def on_response(self, response: ProcessView, output: Any):
# Let's get the parent of our invalid_parent
response.get_parent()
# And the binary paths for any suspect child processes
for child in response.children:
if child.get_bin_file():
child.bin_file.get_file_path()
output.send(
ExecutionHit(
analyzer_name="Suspicious svchost",
node_view=response,
risk_score=75,
)
)
Unlike with the queries in get_queries', which have to be an exact match, our context is purely optional. We grab the information if it's available, but if it isn't we'll just move on.
If the information is there we’ll have so much more information when this triggers, almost certainly enough to triage this without much investigation.