Skip to content

Commit

Permalink
Add a plugin to parse notifications from Windows appdb.dat
Browse files Browse the repository at this point in the history
  • Loading branch information
pyrco committed Sep 14, 2023
1 parent 946d31a commit bea1e9c
Showing 1 changed file with 291 additions and 3 deletions.
294 changes: 291 additions & 3 deletions dissect/target/plugins/os/windows/notifications.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,150 @@
import datetime
from typing import Iterator, Optional
from uuid import UUID

from dissect.cstruct import cstruct
from dissect.sql import sqlite3
from dissect.util.ts import wintimestamp
from flow.record import GroupedRecord

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.descriptor_extensions import UserRecordDescriptorExtension
from dissect.target.helpers.record import create_extended_descriptor
from dissect.target.helpers.record import RecordDescriptor, create_extended_descriptor
from dissect.target.plugin import Plugin, export

appdb_def = """
typedef struct
{
BYTE Magic[4]; // Always b"DNPW"
DWORD Version; // Versions 1 (win8) and 3 (win10) seem to exist
QWORD LastNotificationDate;
DWORD NextNotificationId;
DWORD Unknown;
char Padding[8];
} ChunkHeader; // size: 0x20
typedef struct
{
WORD InUse; // ??
WORD NextTileWrite; // ??
WORD NextToastWrite; // ??
BYTE Flags[2];
} ChunkInfo;
typedef struct
{
QWORD Timestamp1; // ??
QWORD Timestamp2; // Is this time to refresh?
char Uri[1024]; // Is this the correct size?
char Padding[0x818 - 0x410];
} PushDescriptor; // size: 0x818
typedef struct
{
DWORD Id;
DWORD Zero; // ??
QWORD Timestamp;
WORD Unknown;
WORD DataLength;
char Data[DataLength];
char Padding[0x118 - 0x14 - DataLength];
} BadgeContent; // size: 0x118
typedef struct
{
DWORD UniqueId; // ??
DWORD Zero;
QWORD TS_Expiry;
QWORD TS;
BYTE Type; // ??
BYTE Index;
WORD ContentLength;
wchar_t Name[18];
} TileDescriptor; // size: 0x40
typedef struct
{
DWORD UniqueId; // ??
DWORD Zero;
QWORD TS_Expiry;
QWORD TS;
BYTE Type; // ??
BYTE Index;
WORD ContentLength;
wchar_t Name1[17];
wchar_t Name2[17];
} ToastDescriptor; // size: 0x60
typedef struct
{
char Content[0x1400];
} DataXML; // 0x1400
typedef struct
{
ChunkHeader Header; // Only populated for first chunk, else zeroed
ChunkInfo Info;
PushDescriptor Push;
BadgeContent BadgeXml;
TileDescriptor Tiles[5]; // start @ 0x958
DataXML TileXml[5];
// For the in use chunks, 0x14 ToastDiscriptors have an Index, but there
// is space for more. Maybe this is used in case of deleted entries?
ToastDescriptor Toasts[0x14]; // start @ 0x6e98
char Padding1[0x1e00]; // start @ 0x7618
DataXML ToastXml[0x14]; // start @ 0x9418
char Padding2[0x13f8]; // start @ 0x22418
} Chunk; // size: 0x23810
"""

c_appdb = cstruct(endian="<")
c_appdb.load(appdb_def)

APPDB_MAGIC = b"DNPW"
NUM_APPDB_CHUNKS = 256

AppDBChunkRecord = create_extended_descriptor([UserRecordDescriptorExtension])(
"windows/notification/appdb_chunk",
[
("datetime", "last_notification_date"),
("varint", "next_notification_id"),
("datetime", "push_ts1"),
("datetime", "push_ts2"),
("uri", "push_uri"),
("varint", "badge_id"),
("datetime", "badge_ts"),
("string", "badge_data"),
],
)

AppDBTileRecord = create_extended_descriptor([UserRecordDescriptorExtension])(
"windows/notification/appdb_tile",
[
("varint", "id"),
("datetime", "ts"),
("datetime", "expiry_ts"),
("varint", "type"),
("varint", "index"),
("string", "name"),
("string", "content"),
],
)

AppDBToastRecord = create_extended_descriptor([UserRecordDescriptorExtension])(
"windows/notification/appdb_toast",
[
("varint", "id"),
("datetime", "ts"),
("datetime", "expiry_ts"),
("varint", "type"),
("varint", "index"),
("string", "name1"),
("string", "name2"),
("string", "content"),
],
)

WpnDatabaseNotificationRecord = create_extended_descriptor([UserRecordDescriptorExtension])(
"windows/notification/wpndatabase",
[
Expand Down Expand Up @@ -63,11 +198,164 @@ def __init__(self, target):
self.wpndb_files.append((user_details.user, wpndb_file))

if appdb_file.exists():
self.appdb_files.append((user_details.user, appdb_file))
with appdb_file.open(mode="rb") as fp:
chunk = c_appdb.Chunk(fp)
if chunk.Header.Magic == APPDB_MAGIC:
version = chunk.Header.Version
if version == 3:
self.appdb_files.append((user_details.user, appdb_file))
else:
self.target.log.warning(
"Skipping %s: unsupported version %s.",
appdb_file,
version,
)
if version != 1:
self.target.log.warning(
"Unknown appdb version %s in file %s, "
"please consider providing us with a sample.",
version, appdb_file
)


def check_compatible(self) -> None:
if not self.wpndb_files and not self.appdb_files:
raise UnsupportedPluginError("No wpndatabase.db or appdb.dat files found")
raise UnsupportedPluginError("No or incompatible wpndatabase.db or appdb.dat files found")

def _get_appdb_chunk_record(
self,
chunk: c_appdb.Chunk,
last_notification_date: Optional[datetime.datetime],
user: RecordDescriptor,
) -> AppDBChunkRecord:
push_timestamp1 = None
if ts := chunk.Push.Timestamp1:
push_timestamp1 = wintimestamp(ts)
push_timestamp2 = None
if ts := chunk.Push.Timestamp1:
push_timestamp2 = wintimestamp(ts)
push_uri = chunk.Push.Uri.split(b"\x00")[0]
push_uri = push_uri.decode("utf-8", errors="surrogateescape")

badge_ts = None
if ts := chunk.BadgeXml.Timestamp:
badge_ts = wintimestamp(ts)
badge_data = chunk.BadgeXml.Data.decode("utf-8", errors="surrogateescape")

return AppDBChunkRecord(
last_notification_date=last_notification_date,
next_notification_id=chunk.Header.NextNotificationId,
push_ts1=push_timestamp1,
push_ts2=push_timestamp2,
push_uri=push_uri,
badge_id=chunk.BadgeXml.Id,
badge_ts=badge_ts,
badge_data=badge_data,
_target=self.target,
_user=user,
)

def _get_appdb_tile_records(self, chunk: c_appdb.Chunk, user: RecordDescriptor) -> list[AppDBTileRecord]:
tile_records = []
num_tiles = len(chunk.Tiles)

for tile_no in range(num_tiles):
tile = chunk.Tiles[tile_no]

if tile.UniqueId:
tile_ts = None
if ts := tile.TS:
tile_ts = wintimestamp(ts)
tile_expiry_ts = None
if ts := tile.TS_Expiry:
tile_expiry_ts = wintimestamp(ts)
name = tile.Name.strip("\x00")

xml_size = tile.ContentLength
tile_xml = chunk.TileXml[tile_no].Content[:xml_size]
tile_xml = tile_xml.decode("utf-8", errors="surrogateescape")

tile_record = AppDBTileRecord(
id=tile.UniqueId,
ts=tile_ts,
expiry_ts=tile_expiry_ts,
type=tile.Type,
index=tile.Index,
name=name,
content=tile_xml,
_target=self.target,
_user=user,
)

tile_records.append(tile_record)
return tile_records

def _get_appdb_toast_records(self, chunk: c_appdb.Chunk, user: RecordDescriptor) -> list[AppDBToastRecord]:
toast_records = []
num_toasts = len(chunk.Toasts)

for toast_no in range(num_toasts):
toast = chunk.Toasts[toast_no]

if toast.UniqueId:
toast_ts = None
if ts := toast.TS:
toast_ts = wintimestamp(ts)
toast_expiry_ts = None
if ts := toast.TS_Expiry:
toast_expiry_ts = wintimestamp(ts)
name1 = toast.Name1.strip("\x00")
name2 = toast.Name2.strip("\x00")

xml_size = toast.ContentLength
toast_xml = chunk.ToastXml[toast_no].Content[:xml_size]
toast_xml = toast_xml.decode("utf-8", errors="surrogateescape")

toast_record = AppDBToastRecord(
id=toast.UniqueId,
ts=toast_ts,
expiry_ts=toast_expiry_ts,
type=toast.Type,
index=toast.Index,
name1=name1,
name2=name2,
content=toast_xml,
_target=self.target,
_user=user,
)

toast_records.append(toast_record)
return toast_records

@export(record=GroupedRecord)
def appdb(self) -> Iterator[GroupedRecord]:
for user, appdb_file in self.appdb_files:
with appdb_file.open(mode="rb") as fp:
last_notification_date = None
for chunk_no in range(NUM_APPDB_CHUNKS):
chunk = c_appdb.Chunk(fp)

if chunk_no == 0:
last_notification_date = wintimestamp(chunk.Header.LastNotificationDate)

if chunk.Info.InUse == 0:
continue
elif chunk.Info.InUse != 1:
self.target.log.warning(
"Unknown field value %s for chunk.Info.InUse, "
"please consider providing us with a sample.",
chunk.Info.InUse,
)
continue

chunk_record = self._get_appdb_chunk_record(chunk, last_notification_date, user)
tile_records = self._get_appdb_tile_records(chunk, user)
toast_records = self._get_appdb_toast_records(chunk, user)

yield GroupedRecord(
"windows/notification/appdb",
[chunk_record] + tile_records + toast_records,
)

@export(record=[WpnDatabaseNotificationRecord, WpnDatabaseNotificationHandlerRecord])
def wpndatabase(self):
Expand Down

0 comments on commit bea1e9c

Please sign in to comment.