diff --git a/dissect/target/plugins/os/windows/notifications.py b/dissect/target/plugins/os/windows/notifications.py index 9b0f97dab5..d8524d30ee 100644 --- a/dissect/target/plugins/os/windows/notifications.py +++ b/dissect/target/plugins/os/windows/notifications.py @@ -97,26 +97,42 @@ APPDB_MAGIC = b"DNPW" NUM_APPDB_CHUNKS = 256 -AppDBChunkRecord = create_extended_descriptor([UserRecordDescriptorExtension])( - "windows/notification/appdb_chunk", +AppDBRecord = create_extended_descriptor([UserRecordDescriptorExtension])( + "windows/notification/appdb", [ ("datetime", "timestamp"), + ("varint", "version"), ("varint", "next_notification_id"), + ], +) + +AppDBPushRecord = create_extended_descriptor([UserRecordDescriptorExtension])( + "windows/notification/appdb/push", + [ ("datetime", "push_ts1"), ("datetime", "push_ts2"), + ("varint", "chunk_num"), ("uri", "push_uri"), - ("varint", "badge_id"), + ], +) + +AppDBBadgeRecord = create_extended_descriptor([UserRecordDescriptorExtension])( + "windows/notification/appdb/badge", + [ ("datetime", "badge_ts"), + ("varint", "badge_id"), + ("varint", "chunk_num"), ("string", "badge_data"), ], ) AppDBTileRecord = create_extended_descriptor([UserRecordDescriptorExtension])( - "windows/notification/appdb_tile", + "windows/notification/appdb/tile", [ - ("varint", "id"), ("datetime", "arrival_time"), ("datetime", "expiry_time"), + ("varint", "id"), + ("varint", "chunk_num"), ("varint", "type"), ("varint", "index"), ("string", "name"), @@ -125,11 +141,12 @@ ) AppDBToastRecord = create_extended_descriptor([UserRecordDescriptorExtension])( - "windows/notification/appdb_toast", + "windows/notification/appdb/toast", [ - ("varint", "id"), ("datetime", "arrival_time"), ("datetime", "expiry_time"), + ("varint", "id"), + ("varint", "chunk_num"), ("varint", "type"), ("varint", "index"), ("string", "name1"), @@ -220,37 +237,79 @@ def check_compatible(self) -> None: def _get_appdb_chunk_record( self, chunk: c_appdb.Chunk, - timestamp: Optional[datetime.datetime], user: WindowsUserRecord, - ) -> AppDBChunkRecord: - push_timestamp1 = None - if ts := chunk.Push.Timestamp1: - push_timestamp1 = wintimestamp(ts) - push_timestamp2 = None - if ts := chunk.Push.Timestamp2: - push_timestamp2 = wintimestamp(ts) - push_uri = chunk.Push.Uri.split(b"\x00")[0] - push_uri = push_uri.decode("utf-8", errors="surrogateescape") - - badge_ts = None - if ts := chunk.BadgeXml.Timestamp: - badge_ts = wintimestamp(ts) - badge_data = chunk.BadgeXml.Data.decode("utf-8", errors="surrogateescape") - - return AppDBChunkRecord( - timestamp=timestamp, + ) -> AppDBRecord: + chunk_timestamp = None + if ts := chunk.Header.Timestamp: + chunk_timestamp = wintimestamp(ts) + + return AppDBRecord( + timestamp=chunk_timestamp, + version=chunk.Header.Version, next_notification_id=chunk.Header.NextNotificationId, - push_ts1=push_timestamp1, - push_ts2=push_timestamp2, - push_uri=push_uri, - badge_id=chunk.BadgeXml.Id, - badge_ts=badge_ts, - badge_data=badge_data, - _target=self.target, - _user=user, ) - def _get_appdb_tile_records(self, chunk: c_appdb.Chunk, user: WindowsUserRecord) -> list[AppDBTileRecord]: + def _get_appdb_push_record( + self, + chunk: c_appdb.Chunk, + chunk_num: int, + user: WindowsUserRecord, + ) -> Optional[AppDBPushRecord]: + badge_record = None + push_uri = chunk.Push.Uri.split(b"\x00")[0] + push_uri = push_uri.decode("utf-8", errors="surrogateescape") + + if push_uri: + push_timestamp1 = None + if ts := chunk.Push.Timestamp1: + push_timestamp1 = wintimestamp(ts) + push_timestamp2 = None + if ts := chunk.Push.Timestamp2: + push_timestamp2 = wintimestamp(ts) + + badge_record = AppDBPushRecord( + push_ts1=push_timestamp1, + push_ts2=push_timestamp2, + chunk_num=chunk_num, + push_uri=push_uri, + _target=self.target, + _user=user, + ) + + return badge_record + + def _get_appdb_badge_record( + self, + chunk: c_appdb.Chunk, + chunk_num: int, + user: WindowsUserRecord, + ) -> Optional[AppDBBadgeRecord]: + badge_record = None + badge_id = chunk.Badge.Id + + if badge_id: + badge_ts = None + if ts := chunk.Badge.Timestamp: + badge_ts = wintimestamp(ts) + badge_data = chunk.Badge.Data.decode("utf-8", errors="surrogateescape") + + badge_record = AppDBBadgeRecord( + badge_id=badge_id, + badge_ts=badge_ts, + chunk_num=chunk_num, + badge_data=badge_data, + _target=self.target, + _user=user, + ) + + return badge_record + + def _get_appdb_tile_records( + self, + chunk: c_appdb.Chunk, + chunk_num: int, + user: WindowsUserRecord, + ) -> list[AppDBTileRecord]: tile_records = [] num_tiles = len(chunk.Tiles) @@ -271,9 +330,10 @@ def _get_appdb_tile_records(self, chunk: c_appdb.Chunk, user: WindowsUserRecord) tile_xml = tile_xml.decode("utf-8", errors="surrogateescape") tile_record = AppDBTileRecord( - id=tile.UniqueId, arrival_time=tile_arrival_time, expiry_time=tile_expiry_time, + id=tile.UniqueId, + chunk_num=chunk_num, type=tile.Type, index=tile.Index, name=name, @@ -285,7 +345,12 @@ def _get_appdb_tile_records(self, chunk: c_appdb.Chunk, user: WindowsUserRecord) tile_records.append(tile_record) return tile_records - def _get_appdb_toast_records(self, chunk: c_appdb.Chunk, user: WindowsUserRecord) -> list[AppDBToastRecord]: + def _get_appdb_toast_records( + self, + chunk: c_appdb.Chunk, + chunk_num: int, + user: WindowsUserRecord, + ) -> list[AppDBToastRecord]: toast_records = [] num_toasts = len(chunk.Toasts) @@ -307,9 +372,10 @@ def _get_appdb_toast_records(self, chunk: c_appdb.Chunk, user: WindowsUserRecord toast_xml = toast_xml.decode("utf-8", errors="surrogateescape") toast_record = AppDBToastRecord( - id=toast.UniqueId, arrival_time=toast_arrival_time, expiry_time=toast_expiry_time, + id=toast.UniqueId, + chunk_num=chunk_num, type=toast.Type, index=toast.Index, name1=name1, @@ -322,17 +388,13 @@ def _get_appdb_toast_records(self, chunk: c_appdb.Chunk, user: WindowsUserRecord toast_records.append(toast_record) return toast_records - @export(record=[AppDBChunkRecord, AppDBTileRecord, AppDBToastRecord]) + @export(record=[AppDBRecord, AppDBPushRecord, AppDBBadgeRecord, AppDBTileRecord, AppDBToastRecord]) def appdb(self) -> Iterator[GroupedRecord]: for user, appdb_file in self.appdb_files: with appdb_file.open(mode="rb") as fp: - timestamp = None - for chunk_no in range(NUM_APPDB_CHUNKS): + for chunk_num in range(NUM_APPDB_CHUNKS): chunk = c_appdb.Chunk(fp) - if chunk_no == 0: - timestamp = wintimestamp(chunk.Header.Timestamp) - if chunk.Info.InUse == 0: continue elif chunk.Info.InUse != 1: @@ -343,14 +405,22 @@ def appdb(self) -> Iterator[GroupedRecord]: ) continue - chunk_record = self._get_appdb_chunk_record(chunk, timestamp, user) - tile_records = self._get_appdb_tile_records(chunk, user) - toast_records = self._get_appdb_toast_records(chunk, user) + if chunk_num == 0: + yield self._get_appdb_chunk_record(chunk, user) + + push_record = self._get_appdb_push_record(chunk, chunk_num, user) + if push_record: + yield push_record + + badge_record = self._get_appdb_badge_record(chunk, chunk_num, user) + if badge_record: + yield badge_record + + for tile_record in self._get_appdb_tile_records(chunk, chunk_num, user): + yield tile_record - yield GroupedRecord( - "windows/notification/appdb", - [chunk_record] + tile_records + toast_records, - ) + for toast_record in self._get_appdb_toast_records(chunk, chunk_num, user): + yield toast_record @export(record=[WpnDatabaseNotificationRecord, WpnDatabaseNotificationHandlerRecord]) def wpndatabase(self): diff --git a/tests/test_plugins_os_windows_notifications.py b/tests/test_plugins_os_windows_notifications.py index 01826518e2..4b11e2d919 100644 --- a/tests/test_plugins_os_windows_notifications.py +++ b/tests/test_plugins_os_windows_notifications.py @@ -1,3 +1,5 @@ +from collections import defaultdict + from flow.record.fieldtypes import datetime from dissect.target.filesystem import VirtualFilesystem @@ -17,66 +19,69 @@ def test_notifications_appdb(target_win_users: Target, fs_win: VirtualFilesystem test_file = absolute_path("data/plugins/os/windows/notifications/appdb.dat.v3.gz") appdb_file = fsutil.join(USER_DIR, NOTIFICATIONS_DIR, "appdb.dat") fs_win.map_file(appdb_file, test_file, compression="gzip") - target_win_users.add_plugin(NotificationsPlugin) - records = list(target_win_users.notifications.appdb()) - assert len(records) == 18 + records_by_type = defaultdict(list) + for record in target_win_users.notifications.appdb(): + record_type = record._desc.name + records_by_type[record_type].append(record) + + records_by_type_chunk = defaultdict(lambda: defaultdict(list)) + for record_type, records in records_by_type.items(): + if record_type != "windows/notification/appdb": + for record in records: + chunk_num = record.chunk_num + records_by_type_chunk[record_type][chunk_num].append(record) + + assert len(records_by_type.get("windows/notification/appdb", [])) == 1 + assert len(records_by_type.get("windows/notification/appdb/push", [])) == 1 + assert len(records_by_type.get("windows/notification/appdb/badge", [])) == 2 + assert len(records_by_type.get("windows/notification/appdb/tile", [])) == 48 + assert len(records_by_type.get("windows/notification/appdb/toast", [])) == 35 + + appdb_record = records_by_type["windows/notification/appdb"][0] + assert appdb_record.timestamp == datetime("2016-06-02T10:00:34.019495+00:00") + assert appdb_record.version == 3 + assert appdb_record.next_notification_id == 517 + + push_record = records_by_type_chunk["windows/notification/appdb/push"][8][0] + assert push_record.push_ts1 == datetime("2016-06-19T07:37:38+00:00") + assert push_record.push_ts2 == datetime("2016-05-20T07:41:45.883432+00:00") + assert push_record.push_uri == ( + "https://db5.notify.windows.com/?token=AwYAAAA%2bwMdZymXtvB9uG3YbJZX4U" + "CXwsLBJA7it1REPu58SjiQ8%2bnxg%2bfk8vKU%2bQQPG5ZglOuCq%2fkArOGxBJr9z7G" + "K%2bQFwyrTcaOyptsKNF2f%2fllCPmmGwXsFAFjS%2fkdC678PQ%3d" + ) - record = records[0] - assert len(record.records) == 6 + badge_record = records_by_type_chunk["windows/notification/appdb/badge"][3][0] + assert badge_record.badge_ts is None + assert badge_record.badge_id == 0x7A + assert badge_record.badge_data.startswith(" None: test_file = absolute_path("data/plugins/os/windows/notifications/wpndatabase.db")