diff --git a/.github/workflows/scanoss.yml b/.github/workflows/scanoss.yml index 0116825..e5e2252 100644 --- a/.github/workflows/scanoss.yml +++ b/.github/workflows/scanoss.yml @@ -21,7 +21,7 @@ jobs: - name: Run SCANOSS Code Scan id: scanoss-code-scan-step - uses: scanoss/code-scan-action@v1 + uses: scanoss/code-scan-action@v1.3.1 with: policies: copyleft, undeclared, depTrack api.url: https://api.scanoss.com/scan/direct diff --git a/.gitignore b/.gitignore index 6e3073b..76e3ea1 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ .scanoss +.env diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 37887bc..2272343 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,5 +1,5 @@ repos: - repo: https://github.com/scanoss/pre-commit-hooks - rev: v0.2.0 + rev: v0.3.0 hooks: - id: scanoss-check-undeclared-code diff --git a/copyleft_renamed.c b/copyleft_renamed.c new file mode 100644 index 0000000..0618862 --- /dev/null +++ b/copyleft_renamed.c @@ -0,0 +1,106 @@ + +#include +#include +#include +#include +#include +#include +#include +#include + + + + + + + +#include "log.h" + +static const char *level_names[] = { + "TRACE", "DEBUG", "INFO", "WARN", "ERROR", "FATAL"}; + +// Default log level is INFO +static int LEVEL = LOG_INFO; + +// Default log file is STDERR +static FILE *LOG_FILE = NULL; + +char *format_timestamp() +{ + // I. Format timestamp + // Always use UTC time + char *out = malloc(64); + time_t t = time(NULL); + struct tm *utc = gmtime(&t); + strftime(out, 64, "%d-%m-%y %H:%M:%S", utc); + return out; +} + +/** + * Formats UTC timestamp in common log format. + */ +char *format_ts_common_log() +{ + //%d/%b/%Y:%H:%M:%S %z + char *out = malloc(64); + time_t t = time(NULL); + struct tm *utc = gmtime(&t); + strftime(out, 64, "%d/%b/%Y:%H:%M:%S %z", utc); + return out; +} + +void __logger(int level, const char *file, int line, const char *func, const char *format, ...) +{ + if (level < LEVEL) + { + return; + } + + if (LOG_FILE == NULL) + { + LOG_FILE = stderr; + } + + char *buf = format_timestamp(); + va_list args; + + // II. Format log + fprintf(LOG_FILE, "%s %lu %-5s %s:%s:%d: ", buf, (unsigned long) pthread_self(), level_names[level], file, func, line); + va_start(args, format); + vfprintf(LOG_FILE, format, args); + va_end(args); + fprintf(LOG_FILE, "\n"); + fflush(LOG_FILE); + free(buf); + + // Exit with error if log level is FATAL + if (level == LOG_FATAL) + { + exit(EXIT_FAILURE); + } +} + +bool log_level_is_enabled(int level) +{ + return level >= LEVEL; +} + +void log_set_level(int level) +{ + LEVEL = level; +} + +void log_set_file(char *filename) +{ + LOG_FILE = fopen(filename, "a+"); + if (LOG_FILE == NULL) + { + fprintf(stderr, "ERROR SETTING LOG FILE: %s\n", filename); + exit(EXIT_FAILURE); + } +} + +void log_close_file() +{ + fclose(LOG_FILE); +} diff --git a/copyright.c b/copyright_2.c similarity index 100% rename from copyright.c rename to copyright_2.c diff --git a/scanner_output.wfp b/scanner_output.wfp new file mode 100644 index 0000000..52fab22 --- /dev/null +++ b/scanner_output.wfp @@ -0,0 +1,522 @@ +file=a0dcf87de50e713e3c2df8ba11af3e09,47675,scanner_test.py +fh2=0b1d309be50003dbcdead1bd997573bf +6=d5e54c33,b03faabe +7=23bfe641 +8=8f1a259c +9=96e0f2da +10=86238bbc,57fd17f4 +11=3df15217 +13=49b901c6,76c8f7b2,5e608a23 +14=d18cc549,a6a72e20 +16=86ebf3c6 +17=6127c6f8 +18=85d7719c,b0bcdb7d,8b4fa021 +20=b05b10fa,7e399b18,d6e25bf1,45de0420 +21=32c89370,66902c56,8c6aa9e2 +26=90f0290e +29=0b666e32 +33=7a025e9f +34=57d27587 +35=1c34d464,79e9dfd3 +36=0e96e2eb +40=0dee4739 +41=adb8118d +42=4b19a8b6,9782d84c,c6f81d14 +44=8ef04ec3,271c6291 +46=c1529683,7afb5ee7,10315833,5054ff97 +48=5839db5b +49=8349beaf,fdd3cb4d +52=e6489ad6 +54=e0e40f69 +57=d0af6f49 +58=78159433 +59=65e6cd70 +62=9c03f371 +65=e3985afa +68=6ea5f039 +74=3049beef +75=b44fe0a4 +79=316c77de +83=4bf1142a +85=94f966b9 +86=d7310131 +89=2ddcc91a,a597f502 +93=2755a899 +94=a3bddab8,44060c00,de30f920 +95=a82b4260 +100=3246854d +102=b2429b4c +105=7e90e7ed +106=6877931f +108=dd5ef75d +112=8cc4103f +114=bac854cf +115=01a612db +116=00a43385 +117=cf506a85 +119=1ab7b302 +121=aad047dd,558e49e5 +124=16e30ebc,adc2c932 +125=c4fa2424 +127=511b8700 +129=bb4b0f55,dfc5d78a +132=60dd5f96,2cc75f46,13959953 +133=9201440d +135=b5e67ebb,9b2f0861 +139=a0506805 +141=ac3f579e,4c0b12c5,2ad3d789,2b8c3d45 +145=52e7fbd4,22df64d0 +146=34423d5a +150=7bcb4ab7,048b8dc8 +157=6666a8ae,ee6dbdd0 +160=0fd70ddc +166=c4a1ec06 +167=aeb11d26 +170=03fe9beb +175=a6726ba5 +176=8e2d95e6 +178=06bf7107 +179=d7300346 +184=ce9aa267,8d8684f0,4efdf794 +188=0bb55409,aab22912,3c5c0ae6 +192=9201e2e7 +194=6106da63 +197=e511ba33 +201=8e5937dc +203=82876042,e640d478 +206=59fb3548,40510274,250487d4 +208=752b320a +210=712fb9aa +212=08186f62,aad2d3c1 +216=14d9482b +219=c174edb3 +223=51b6f0a9 +226=f6b6c5a3 +228=5a05653e +229=5db18555 +236=0b2234db +238=82e2e46f +239=4ad07aae +240=1bf5768c +245=a2b39ab7 +246=5a48fc71 +250=906f6a2e +251=e081c40b +253=059d0de6,8390a7be +254=a4a05a40,015edae5 +255=a3200e84 +256=83a1d467 +257=5811c662 +261=a7487bcb,420f84ab +265=a0e91286 +268=3b7df665 +270=6dc77836,338c9326 +271=167eb731 +275=9e03b714 +277=88277452,1e472fc4 +280=c200c579,8c68e2af +286=770f4a16 +289=7e6fe5fa,727c28a2 +295=ce381d04 +296=f37e1796,7e6fe5fa +298=727c28a2 +302=6add98bf +305=7e6fe5fa +307=727c28a2 +311=1297ee7a +313=3cc45057 +316=6ba836e2,27e7a8b7 +317=99d829c7 +321=e1f046ae,041f84b3,714b919f +322=57fb6bfc,2ffd0a7e +323=ada8fded +324=97637511 +326=b90442b8 +327=fa54b3ec,afd06036 +331=f3925c25 +334=f5930966,71472ea9 +336=50805798 +338=7000baa0,1d8e2d7c +339=9c55fc46 +343=4b155012 +344=3347be5d +345=7c2655e7 +348=69aa00d6 +351=c3c27f2a,d965072b,07b8b4bd +352=4d6d76f4 +355=8982afba +356=abb74977,d2b5f681 +357=dbb5e342 +359=6157d8cd +363=4f52166f,850e517c,adb2b134 +365=eb634a0a +367=bd80c0cc +371=1ba2ccc2,f3925c25 +374=f5930966,71472ea9 +376=50805798 +379=7000baa0 +380=07e7de28 +383=57f67bd4 +384=6ca9dcbe +385=753372e4 +386=44a3c4f5,a77c5627 +389=92051b14 +390=42226efc +393=80c5b624 +395=6c4f2dd1,e02f4e23 +396=202e70de,efddf148 +398=4270f7bd +402=c81b494e,5f23f961,3d3c6789 +405=734d59da,9484db9b +407=69eb3f78,f618ede1 +408=b2330d72 +409=84eb3480,cac1cb1b +412=5d5c84ea +415=fabc47be +416=628a1501,9491f989,2a38e0c4 +418=e91196a1 +423=e2e255e9,9868b0e5 +424=320375e5 +425=f3b520d4,42c5b1c6 +427=44838a80,85dac7ed +430=59ae9050 +433=7ccdce1d +434=0e1d3f13 +435=683e1058,c0081b65,60418d58 +437=d5767614 +438=33dd7a40 +443=59ae9050 +445=ef092d22 +446=6722b9ab +449=44d8b3bd +450=c1de5404 +453=2e04fa51 +454=3e905a14 +455=d90f75e6 +459=3c2f293b +462=ef4ce778 +463=cad17638,0b2c5608 +464=79bb6e31 +467=39213b45,5585ff91 +469=2d958587 +471=657c47f9 +476=963c7b49,929cd990,5f97ee42 +478=c73e976d +479=e84f654e +480=c36c3031,b304569a +483=f3925c25 +484=46e540e1 +486=1d3aceac,167aa9e0 +487=84d94aa8,eca0e43c +490=44d8b3bd,38813a3c,f4c49a2a,c1de5404 +493=2e04fa51 +495=b0778ae4,139d4fd3 +496=e4158f60,caaeb296 +499=afd06036 +505=51e02810,183cf97c +510=dd3b7dae +511=fcd5dbb8,1b80a625,a2af212a +512=afa41d5d +514=c5142c7d,44bf8ec0 +515=08d2c721,faafadab +516=263c0266,1f9c6603 +518=c95e7643,ffabfd4d +519=23361443 +523=ea3e9c5a +524=392641bc +528=c1720ef8 +530=689866d0 +532=f4d596f3 +533=8447ec33 +535=cb0ab083 +536=c09679a0,367ab778 +538=7bdbb55f,273ef4ec +539=1db23b37,5c00c80e +540=a8a319cf +542=367ab778 +544=9e4fdaeb +545=8cbfd9cb +550=ae19ac95,21b21de6 +551=4f005f20,539b6cb6 +554=a9a8b617 +557=26042104 +558=8fa32a4e +561=dc3f1ce7,b53071b1 +564=01934efc +565=128c7587 +567=5ae24bf7 +573=8db602ad,c6edbd14,875b2a1b,b7e2c9c4 +574=89a354ec +577=393ab3db +578=9e56b68a +582=4a17250d,8256abbb +584=0a6a51b5,fcd3ef24 +586=9b3d8c85 +587=50c46971 +590=99d829c7 +594=e1f046ae,1d9c6018,ba6eaae3 +595=57fb6bfc +596=4bfd23bb +598=01fcb1bc,30d702a7,fa54b3ec,afd06036 +601=f3925c25 +602=e0105d46 +604=cf2e1de6,f4cfa415 +606=d810da7c,52d80595,fffe18f3 +608=0e276d78 +609=9c55fc46 +611=68133539,753db8ec +613=7c2655e7 +616=69aa00d6 +619=60ef3284,07b8b4bd +620=4d6d76f4 +623=8982afba +624=abb74977,d2b5f681 +626=ebfe7ece +627=8973cf92,6157d8cd +633=4f52166f,91c25c6f,dce29a33 +637=fc1f6b19 +638=b217b61c +640=fd635321,f3925c25 +641=e0105d46 +643=cf2e1de6,f4cfa415 +645=d810da7c,52d80595,982f09d9,f4110124,48cd190c +648=354c38f0,77536ab9 +651=a8c07575,d53362c7 +653=c48eba00 +654=2d958587 +659=b49861a5 +661=e89fa69a,b9757482 +662=a717eb2a,ca5b4af9 +664=e0acfebc +665=e6d7b59f +668=e0105d46 +673=d697c991,e5c8d9c0 +677=5c64403c,57f67bd4 +678=6ca9dcbe +679=753372e4 +680=44a3c4f5,a77c5627 +683=92051b14 +684=42226efc +688=b8ebaab4,2f863197 +689=202e70de,efddf148 +691=4270f7bd +695=c81b494e,5f23f961,3d3c6789 +698=e71fcbc4 +700=b2330d72 +702=84eb3480,cac1cb1b +705=5d5c84ea +708=cb63bb6f +709=00a5f2c3 +712=701f246d +715=9868b0e5 +716=320375e5 +717=f3b520d4,42c5b1c6,3fbc1210 +718=85dac7ed +721=59ae9050 +724=7ccdce1d +725=0e1d3f13 +726=683e1058,c0081b65,60418d58 +728=d5767614 +729=33dd7a40 +734=59ae9050 +736=ef092d22 +737=6722b9ab +740=44d8b3bd +741=c1de5404 +745=2e04fa51 +746=3e905a14 +747=d90f75e6 +751=3c2f293b +754=ef4ce778 +755=cad17638,0b2c5608 +756=79bb6e31 +759=39213b45,5585ff91 +761=2d958587 +763=657c47f9 +768=b7cda4b7,a519bd22,92a4b73e +772=4713e2a9,110f6449,7155b2ad,6fc2b3b0 +774=714b919f,7108b188 +775=fa54b3ec,afd06036 +778=f3925c25 +779=e0105d46 +781=16360dc4 +783=86e69fe6 +786=741f845e +787=3c82af39 +789=2ab6ba59 +790=9bb236f8,8fdf1f3e,dcd50cfb +791=358afbba,d17dcce4 +793=78b77e7f +795=8973cf92,6157d8cd +799=238b602c,b94a9dc8 +803=0d0d5b00,870c6625 +805=9884af4e +807=f3925c25 +808=e0105d46 +810=f4cfa415 +811=73a5c58c +813=9cc2fca2 +814=bad1277a +817=b2c96b17,a8c07575,d53362c7 +818=c48eba00 +820=6601b947,2d958587 +824=8973cf92,6157d8cd +826=e267643c +830=bd1ae01d,ea8371c7,5e6c060b,d6fdd6ec +834=fd635321,f3925c25 +837=13d14d44 +838=4a66a952 +839=eb201124,78dee9d1 +842=86bc315c,a6656016 +847=b15cbb45 +849=da4dec01,a7536f10 +854=34fedbb0 +858=7f4b6d40 +860=2a681682,d57f3411 +861=42fadf2c +864=828c97fc,c2b5dea9 +866=ac3d8922 +868=b5216089 +869=3b23ef55,7a5e9315 +870=2544deb9 +871=694a90e5,9fc2b7bb +874=03a15614,e6eca4fe,59d38dc3,494adde5 +876=f18187ea +878=6db8ba98 +882=2678e207 +885=428cf1b0 +887=779b7a70 +889=c4c6cc9b,143b4d6b +895=af782e46 +899=09757990 +902=06290f75 +903=3a4c05e5 +906=a48db797 +909=7abb04d6 +912=9f0f11d6 +915=d57f3411,694a90e5,9fc2b7bb +918=03a15614,e6eca4fe,786f5fb5,15037717 +919=cba546ba,98d68159 +922=2678e207 +925=428cf1b0 +927=cee424f7,a9208c0f,c19d83e3 +928=a0d8a058 +936=4069ab69,85b0f6df +939=cbd8b88c +941=3acd3534 +942=85b8f6bf +943=cb0ab083 +944=e322b493,a0698c79 +946=e2941628,37d0768c,8670e5f8,5c00c80e +947=e2941628 +949=0a51164b +951=bcd0db8f,93d01acd +956=8cbfd9cb +957=1fb34160 +960=0781f8f6 +961=3493dbb2 +962=714b919f,54444234 +963=fa54b3ec,afd06036 +966=f3925c25 +969=13d14d44 +970=4a66a952 +971=eb201124,78dee9d1 +975=86bc315c,1d8e2d7c +976=9c55fc46 +978=948e1d13 +979=6ded3316,d3628008 +981=7c2655e7 +983=69aa00d6 +984=de960966 +985=d2b5f681 +987=b4486eaf +988=6d3156cd,d54ac002 +989=8973cf92,6157d8cd +993=4f52166f,fd443769 +995=6961f84d,d6fdd6ec +997=d8e936a4 +999=c292b23f,f3925c25 +1002=13d14d44 +1003=4a66a952 +1004=eb201124,78dee9d1 +1007=86bc315c +1009=45793b4b +1010=5f23f961,3d3c6789 +1014=734d59da +1015=26444174 +1018=c6fff0c2,705dd46e,9ec8ca1a +1019=42fadf2c +1022=860474d9 +1024=fdb60da8,685187ab,a449683a +1025=6a025ccd,53099866 +1026=7a5e9315 +1028=ceee728c +1031=78159130,e75a67a8 +1033=4398cb96,b2aa147c,f18187ea +1037=ad6baca2 +1038=83a88ab6 +1041=3f59d0a6,6722b9ab +1044=44d8b3bd +1045=c1de5404 +1048=2e04fa51 +1049=3e905a14 +1050=9ec8ca1a +1055=ec6278de +1056=a97cdaf0,4d4a1a8c +1061=9ab977b3,9396b4b8 +1068=27794054 +1069=1beabf58,e48c6f1a +1070=a6ad9f22 +1071=44e74622 +1073=36213122 +1076=c4c6cc9b +1077=85b0f6df +1078=7268b696 +1080=85b8f6bf +1082=cb0ab083 +1083=e322b493,a0698c79 +1085=e2941628,37d0768c,8670e5f8,5c00c80e +1086=e2941628 +1088=0a51164b +1090=bcd0db8f,93d01acd +1095=8cbfd9cb,ce69da2d,8ea91249,4f1d0533 +1099=db09e2d2,0d0d5b00,870c6625 +1101=73099719 +1105=811f967f,f4cfa415 +1107=73a5c58c +1109=9cc2fca2 +1110=bad1277a +1113=06494f9c,113b6db4,a1badae7 +1119=36783886,0c624b2a +1121=7d50f424,23a1298d +1126=56c978a8 +1127=f4cfa415,2e028e6a +1129=4341fe4b,fffe18f3 +1133=11d34906 +1134=b494858c +1137=a1badae7 +1142=36783886,0c624b2a +1144=79fa780b +1146=a059bf1c,6120c5e7 +1149=f90ec7d3,759587a2 +1150=0d47050e,acdbaf9b,b031969e +1152=50805798 +1154=7000baa0 +1155=07e7de28 +1158=57f67bd4 +1159=6ca9dcbe +1160=753372e4 +1161=44a3c4f5,a77c5627 +1164=92051b14 +1165=42226efc +1169=8ed707c7 +1170=80c5b624 +1171=6c4f2dd1,e02f4e23 +1173=5d09b3da,ea2e50f0,9484db9b +1177=88af4acb,7f657026 +1178=30a1ca9f,c832dd16 +1180=a02d1ca4 +1184=8786b946,a78a2a9c,113b6db4,a1badae7 +1190=83f51750 +1191=0e6113b7,d7343f82,18ed7b96 +1196=3675ea18 diff --git a/scanner_test.py b/scanner_test.py new file mode 100644 index 0000000..a8eb1ca --- /dev/null +++ b/scanner_test.py @@ -0,0 +1,1197 @@ +""" +SPDX-License-Identifier: MIT + + Copyright (c) 2021, SCANOSS + + Permission is hereby granted, free of charge, to any person obtaining a copy + of this software and associated documentation files (the "Software"), to deal + in the Software without restriction, including without limitation the rights + to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + copies of the Software, and to permit persons to whom the Software is + furnished to do so, subject to the following conditions: + + The above copyright notice and this permission notice shall be included in + all copies or substantial portions of the Software. + + THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + THE SOFTWARE. +""" + +import datetime +import json +import os +import sys +from pathlib import Path +from typing import Any, Dict, List, Optional + +import importlib_resources +from progress.bar import Bar +from progress.spinner import Spinner +from pypac.parser import PACFile +from scanoss.file_filters import FileFilters + +from . import __version__ +from .csvoutput import CsvOutput +from .cyclonedx import CycloneDx +from .scancodedeps import ScancodeDeps +from .scanoss_settings import ScanossSettings +from .scanossapi import ScanossApi +from .scanossbase import ScanossBase +from .scanossgrpc import ScanossGrpc +from .scanpostprocessor import ScanPostProcessor +from .scantype import ScanType +from .spdxlite import SpdxLite +from .threadeddependencies import SCOPE, ThreadedDependencies +from .threadedscanning import ThreadedScanning + +FAST_WINNOWING = False +try: + from scanoss_winnowing.winnowing import Winnowing + + FAST_WINNOWING = True +except (ModuleNotFoundError, ImportError): + FAST_WINNOWING = False + from .winnowing import Winnowing + +WFP_FILE_START = "file=" +MAX_POST_SIZE = 64 * 1024 # 64k Max post size + + +class Scanner(ScanossBase): + """ + SCANOSS scanning class + Handle the scanning of files, snippets and dependencies + """ + + def __init__( # noqa: PLR0913, PLR0915 + self, + wfp: str = None, + scan_output: str = None, + output_format: str = "plain", + debug: bool = False, + trace: bool = False, + quiet: bool = False, + api_key: str = None, + url: str = None, + flags: str = None, + nb_threads: int = 5, + post_size: int = 32, + timeout: int = 180, + no_wfp_file: bool = False, + all_extensions: bool = False, + all_folders: bool = False, + hidden_files_folders: bool = False, + scan_options: int = 7, + sc_timeout: int = 600, + sc_command: str = None, + grpc_url: str = None, + obfuscate: bool = False, + ignore_cert_errors: bool = False, + proxy: str = None, + grpc_proxy: str = None, + ca_cert: str = None, + pac: PACFile = None, + retry: int = 5, + hpsm: bool = False, + skip_size: int = 0, + skip_extensions=None, + skip_folders=None, + strip_hpsm_ids=None, + strip_snippet_ids=None, + skip_md5_ids=None, + scan_settings: "ScanossSettings | None" = None, + req_headers: dict = None, + use_grpc: bool = False, + ): + """ + Initialise scanning class, including Winnowing, ScanossApi, ThreadedScanning + """ + super().__init__(debug, trace, quiet) + if skip_folders is None: + skip_folders = [] + if skip_extensions is None: + skip_extensions = [] + self.wfp = wfp if wfp else "scanner_output.wfp" + self.scan_output = scan_output + self.output_format = output_format + self.no_wfp_file = no_wfp_file + self.isatty = sys.stderr.isatty() + self.all_extensions = all_extensions + self.all_folders = all_folders + self.hidden_files_folders = hidden_files_folders + self.scan_options = scan_options + self._skip_snippets = ( + True if not scan_options & ScanType.SCAN_SNIPPETS.value else False + ) + self.hpsm = hpsm + self.skip_folders = skip_folders + self.skip_size = skip_size + self.skip_extensions = skip_extensions + self.req_headers = req_headers + ver_details = Scanner.version_details() + + self.winnowing = Winnowing( + debug=debug, + quiet=quiet, + skip_snippets=self._skip_snippets, + all_extensions=all_extensions, + obfuscate=obfuscate, + hpsm=self.hpsm, + strip_hpsm_ids=strip_hpsm_ids, + strip_snippet_ids=strip_snippet_ids, + skip_md5_ids=skip_md5_ids, + ) + self.scanoss_api = ScanossApi( + debug=debug, + trace=trace, + quiet=quiet, + api_key=api_key, + url=url, + flags=flags, + timeout=timeout, + ver_details=ver_details, + ignore_cert_errors=ignore_cert_errors, + proxy=proxy, + ca_cert=ca_cert, + pac=pac, + retry=retry, + req_headers=self.req_headers, + ) + sc_deps = ScancodeDeps( + debug=debug, + quiet=quiet, + trace=trace, + timeout=sc_timeout, + sc_command=sc_command, + ) + grpc_api = ScanossGrpc( + url=grpc_url, + debug=debug, + quiet=quiet, + trace=trace, + api_key=api_key, + ver_details=ver_details, + ca_cert=ca_cert, + proxy=proxy, + pac=pac, + grpc_proxy=grpc_proxy, + req_headers=self.req_headers, + ignore_cert_errors=ignore_cert_errors, + use_grpc=use_grpc, + ) + self.threaded_deps = ThreadedDependencies( + sc_deps, grpc_api, debug=debug, quiet=quiet, trace=trace + ) + self.nb_threads = nb_threads + if nb_threads and nb_threads > 0: + self.threaded_scan = ThreadedScanning( + self.scanoss_api, + debug=debug, + trace=trace, + quiet=quiet, + nb_threads=nb_threads, + ) + else: + self.threaded_scan = None + self.max_post_size = ( + post_size * 1024 if post_size > 0 else MAX_POST_SIZE + ) # Set the max post size (default 64k) + self.post_file_count = ( + post_size if post_size > 0 else 32 + ) # Max number of files for any given POST (default 32) + if self._skip_snippets: + self.max_post_size = 8 * 1024 # 8k Max post size if we're skipping snippets + + self.scan_settings = scan_settings + self.post_processor = ( + ScanPostProcessor(scan_settings, debug=debug, trace=trace, quiet=quiet) + if scan_settings + else None + ) + self._maybe_set_api_sbom() + + def _maybe_set_api_sbom(self): + if not self.scan_settings: + return + sbom = self.scan_settings.get_sbom() + if sbom: + self.scanoss_api.set_sbom(sbom) + + @staticmethod + def __count_files_in_wfp_file(wfp_file: str): + """ + Count the number of files in the WFP that need to be processed + Parameters + ---------- + wfp_file: str + WFP file to process + """ + count = 0 + if wfp_file: + with open(wfp_file) as f: + for line in f: + if WFP_FILE_START in line: + count += 1 + return count + + @staticmethod + def version_details() -> str: + """ + Extract the date this version was produced + :return: version creation date string + """ + data = None + try: + f_name = importlib_resources.files(__name__) / "data/build_date.txt" + with importlib_resources.as_file(f_name) as f: + with open(f, "r", encoding="utf-8") as file: + data = file.read().rstrip() + except Exception as e: + Scanner.print_stderr(f"Warning: Problem loading build time details: {e}") + if not data or len(data) == 0: + now = datetime.datetime.now() + data = ( + f'date: {now.strftime("%Y%m%d%H%M%S")}, utime: {int(now.timestamp())}' + ) + return f"tool: scanoss-py, version: {__version__}, {data}" + + def __log_result(self, string, outfile=None): + """ + Logs result to file or STDOUT + """ + if not outfile and self.scan_output: + outfile = self.scan_output + if outfile: + with open(outfile, "a") as rf: + rf.write(string + "\n") + else: + print(string) + + def is_file_or_snippet_scan(self): + """ + Check if file or snippet scanning is enabled + :return: True if enabled, False otherwise + """ + if self.is_file_scan() or self.is_snippet_scan(): + return True + return False + + def is_file_scan(self): + """ + Check if file scanning is enabled + :return: True if enabled, False otherwise + """ + if self.scan_options & ScanType.SCAN_FILES.value: + return True + return False + + def is_snippet_scan(self): + """ + Check if snippet scanning is enabled + :return: True if enabled, False otherwise + """ + if self.scan_options & ScanType.SCAN_SNIPPETS.value: + return True + return False + + def is_dependency_scan(self): + """ + Check if dependency scanning is enabled + :return: True if enabled, False otherwise + """ + if self.scan_options & ScanType.SCAN_DEPENDENCIES.value: + return True + return False + + def scan_folder_with_options( # noqa: PLR0913 + self, + scan_dir: str, + deps_file: str = None, + file_map: dict = None, + dep_scope: SCOPE = None, + dep_scope_include: str = None, + dep_scope_exclude: str = None, + ) -> bool: + """ + Scan the given folder for whatever scaning options that have been configured + :param dep_scope_exclude: comma separated list of dependency scopes to exclude + :param dep_scope_include: comma separated list of dependency scopes to include + :param dep_scope: Enum dependency scope to use + :param scan_dir: directory to scan + :param deps_file: pre-parsed dependency file to decorate + :param file_map: mapping of obfuscated files back into originals + :return: True if successful, False otherwise + """ + + success = True + if not scan_dir: + raise Exception("ERROR: Please specify a folder to scan") + if not os.path.exists(scan_dir) or not os.path.isdir(scan_dir): + raise Exception( + f"ERROR: Specified folder does not exist or is not a folder: {scan_dir}" + ) + if not self.is_file_or_snippet_scan() and not self.is_dependency_scan(): + raise Exception( + f"ERROR: No scan options defined to scan folder: {scan_dir}" + ) + + if self.scan_output: + self.print_msg(f"Writing results to {self.scan_output}...") + if self.is_dependency_scan(): + if not self.threaded_deps.run( + what_to_scan=scan_dir, + deps_file=deps_file, + wait=False, + dep_scope=dep_scope, + dep_scope_include=dep_scope_include, + dep_scope_exclude=dep_scope_exclude, + ): # Kick off a background dependency scan + success = False + if self.is_file_or_snippet_scan(): + if not self.scan_folder(scan_dir): + success = False + if self.threaded_scan: + if not self.__finish_scan_threaded(file_map): + success = False + return success + + def scan_folder(self, scan_dir: str) -> bool: # noqa: PLR0912, PLR0915 + """ + Scan the specified folder producing fingerprints, send to the SCANOSS API and return results + + :param scan_dir: str + Directory to scan + :return True if successful, False otherwise + """ + success = True + if not scan_dir: + raise Exception("ERROR: Please specify a folder to scan") + if not os.path.exists(scan_dir) or not os.path.isdir(scan_dir): + raise Exception( + f"ERROR: Specified folder does not exist or is not a folder: {scan_dir}" + ) + + file_filters = FileFilters( + debug=self.debug, + trace=self.trace, + quiet=self.quiet, + scanoss_settings=self.scan_settings, + all_extensions=self.all_extensions, + all_folders=self.all_folders, + hidden_files_folders=self.hidden_files_folders, + skip_size=self.skip_size, + skip_folders=self.skip_folders, + skip_extensions=self.skip_extensions, + operation_type="scanning", + ) + self.print_msg(f"Searching {scan_dir} for files to fingerprint...") + spinner = None + if not self.quiet and self.isatty: + spinner = Spinner("Fingerprinting ") + save_wfps_for_print = not self.no_wfp_file or not self.threaded_scan + wfp_list = [] + scan_block = "" + scan_size = 0 + queue_size = 0 + file_count = 0 # count all files fingerprinted + wfp_file_count = 0 # count number of files in each queue post + scan_started = False + + to_scan_files = file_filters.get_filtered_files_from_folder(scan_dir) + for to_scan_file in to_scan_files: + if self.threaded_scan and self.threaded_scan.stop_scanning(): + self.print_stderr( + "Warning: Aborting fingerprinting as the scanning service is not available." + ) + break + self.print_debug(f"Fingerprinting {to_scan_file}...") + if spinner: + spinner.next() + abs_path = Path(scan_dir, to_scan_file).resolve() + wfp = self.winnowing.wfp_for_file(str(abs_path), to_scan_file) + if wfp is None or wfp == "": + self.print_debug(f"No WFP returned for {to_scan_file}. Skipping.") + continue + if save_wfps_for_print: + wfp_list.append(wfp) + file_count += 1 + if self.threaded_scan: + wfp_size = len(wfp.encode("utf-8")) + # If the WFP is bigger than the max post size and we already have something stored in the scan block, + # add it to the queue + if scan_block != "" and (wfp_size + scan_size) >= self.max_post_size: + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = "" + wfp_file_count = 0 + scan_block += wfp + scan_size = len(scan_block.encode("utf-8")) + wfp_file_count += 1 + # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 + if ( + wfp_file_count > self.post_file_count + or scan_size >= self.max_post_size + ): + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = "" + wfp_file_count = 0 + if ( + not scan_started and queue_size > self.nb_threads + ): # Start scanning if we have something to do + scan_started = True + if not self.threaded_scan.run(wait=False): + self.print_stderr( + "Warning: Some errors encounted while scanning. Results might be incomplete." + ) + success = False + # End for loop + if self.threaded_scan and scan_block != "": + self.threaded_scan.queue_add( + scan_block + ) # Make sure all files have been submitted + if spinner: + spinner.finish() + + if file_count > 0: + if save_wfps_for_print: # Write a WFP file if no threading is requested + self.print_debug(f"Writing fingerprints to {self.wfp}") + with open(self.wfp, "w") as f: + f.write("".join(wfp_list)) + else: + self.print_debug(f"Skipping writing WFP file {self.wfp}") + if self.threaded_scan: + success = self.__run_scan_threaded(scan_started, file_count) + else: + Scanner.print_stderr( + f"Warning: No files found to scan in folder: {scan_dir}" + ) + return success + + def __run_scan_threaded(self, scan_started: bool, file_count: int) -> bool: + """ + Start scanning the filtered files but do not wait for it to complete + :param scan_started: If the scan has already started or not + :param file_count: Number of total files to be scanned + :return: True if successful, False otherwise + """ + success = True + self.threaded_scan.update_bar(create=True, file_count=file_count) + if not scan_started: + if not self.threaded_scan.run( + wait=False + ): # Run the scan but do not wait for it to complete + self.print_stderr( + "Warning: Some errors encounted while scanning. Results might be incomplete." + ) + success = False + return success + + def __finish_scan_threaded(self, file_map: Optional[Dict[Any, Any]] = None) -> bool: + """Wait for the threaded scan to complete and process the results + + Args: + file_map: Mapping of obfuscated files back to originals + + Returns: + bool: True if successful, False otherwise + + Raises: + ValueError: If output format is invalid + """ + success: bool = True + scan_responses = None + dep_responses = None + if self.is_file_or_snippet_scan(): + if not self.threaded_scan.complete(): # Wait for the scans to complete + self.print_stderr("Warning: Scanning analysis ran into some trouble.") + success = False + self.threaded_scan.complete_bar() + scan_responses = self.threaded_scan.responses + if self.is_dependency_scan(): + self.print_msg("Retrieving dependency data...") + if not self.threaded_deps.complete(): + self.print_stderr("Warning: Dependency analysis ran into some trouble.") + success = False + dep_responses = self.threaded_deps.responses + + raw_scan_results = self._merge_scan_results( + scan_responses, dep_responses, file_map + ) + + if self.post_processor: + results = self.post_processor.load_results(raw_scan_results).post_process() + else: + results = raw_scan_results + + if self.output_format == "plain": + self.__log_result(json.dumps(results, indent=2, sort_keys=True)) + elif self.output_format == "cyclonedx": + cdx = CycloneDx(self.debug, self.scan_output) + success, _ = cdx.produce_from_json(results) + elif self.output_format == "spdxlite": + spdxlite = SpdxLite(self.debug, self.scan_output) + success = spdxlite.produce_from_json(results) + elif self.output_format == "csv": + csvo = CsvOutput(self.debug, self.scan_output) + success = csvo.produce_from_json(results) + else: + self.print_stderr(f"ERROR: Unknown output format: {self.output_format}") + success = False + return success + + def _merge_scan_results( + self, + scan_responses: Optional[List], + dep_responses: Optional[Dict[str, Any]], + file_map: Optional[Dict[str, Any]], + ) -> Dict[str, Any]: + """Merge scan and dependency responses into a single dictionary""" + results: Dict[str, Any] = {} + + if scan_responses: + for response in scan_responses: + if response is not None: + if file_map: + response = self._deobfuscate_filenames(response, file_map) # noqa: PLW2901 + results.update(response) + + dep_files = dep_responses.get("files", None) if dep_responses else None + if dep_files: + for dep_file in dep_files: + file = dep_file.pop("file", None) + if file: + results[file] = [dep_file] + + return results + + def _deobfuscate_filenames(self, response: dict, file_map: dict) -> dict: + """Convert obfuscated filenames back to original names""" + deobfuscated = {} + for key, value in response.items(): + deobfuscated_name = file_map.get(key, None) + if deobfuscated_name: + deobfuscated[deobfuscated_name] = value + else: + deobfuscated[key] = value + return deobfuscated + + def scan_file_with_options( # noqa: PLR0913 + self, + file: str, + deps_file: str = None, + file_map: dict = None, + dep_scope: SCOPE = None, + dep_scope_include: str = None, + dep_scope_exclude: str = None, + ) -> bool: + """ + Scan the given file for whatever scaning options that have been configured + :param dep_scope: + :param file: file to scan + :param deps_file: pre-parsed dependency file to decorate + :param file_map: mapping of obfuscated files back into originals + :return: True if successful, False otherwise + """ + success = True + if not file: + raise Exception("ERROR: Please specify a file to scan") + if not os.path.exists(file) or not os.path.isfile(file): + raise Exception( + f"ERROR: Specified file does not exist or is not a file: {file}" + ) + if not self.is_file_or_snippet_scan() and not self.is_dependency_scan(): + raise Exception(f"ERROR: No scan options defined to scan file: {file}") + + if self.scan_output: + self.print_msg(f"Writing results to {self.scan_output}...") + if self.is_dependency_scan(): + if not self.threaded_deps.run( + what_to_scan=file, + deps_file=deps_file, + wait=False, + dep_scope=dep_scope, + dep_scope_include=dep_scope_include, + dep_scope_exclude=dep_scope_exclude, + ): # Kick off a background dependency scan + success = False + if self.is_file_or_snippet_scan(): + if not self.scan_file(file): + success = False + if self.threaded_scan: + if not self.__finish_scan_threaded(file_map): + success = False + return success + + def scan_file(self, file: str) -> bool: + """ + Scan the specified file and produce a result + Parameters + ---------- + file: str + File to fingerprint and scan/identify + :return True if successful, False otherwise + """ + success = True + if not file: + raise Exception("ERROR: Please specify a file to scan") + if not os.path.exists(file) or not os.path.isfile(file): + raise Exception( + f"ERROR: Specified files does not exist or is not a file: {file}" + ) + self.print_debug(f"Fingerprinting {file}...") + wfp = self.winnowing.wfp_for_file(file, file) + if wfp is not None and wfp != "": + if self.threaded_scan: + self.threaded_scan.queue_add(wfp) # Submit the WFP for scanning + self.print_debug(f"Scanning {file}...") + if self.threaded_scan: + success = self.__run_scan_threaded(False, 1) + else: + success = False + return success + + def scan_files(self, files: []) -> bool: # noqa: PLR0912, PLR0915 + """ + Scan the specified list of files, producing fingerprints, send to the SCANOSS API and return results + Please note that by providing an explicit list you bypass any exclusions that may be defined on the scanner + :param files: list[str] + List of filenames to scan + :return True if successful, False otherwise + """ + success = True + if not files: + raise Exception( + "ERROR: Please provide a non-empty list of filenames to scan" + ) + + file_filters = FileFilters( + debug=self.debug, + trace=self.trace, + quiet=self.quiet, + scanoss_settings=self.scan_settings, + all_extensions=self.all_extensions, + all_folders=self.all_folders, + hidden_files_folders=self.hidden_files_folders, + skip_size=self.skip_size, + skip_folders=self.skip_folders, + skip_extensions=self.skip_extensions, + operation_type="scanning", + ) + spinner = None + if not self.quiet and self.isatty: + spinner = Spinner("Fingerprinting ") + save_wfps_for_print = not self.no_wfp_file or not self.threaded_scan + wfp_list = [] + scan_block = "" + scan_size = 0 + queue_size = 0 + file_count = 0 # count all files fingerprinted + wfp_file_count = 0 # count number of files in each queue post + scan_started = False + + to_scan_files = file_filters.get_filtered_files_from_files(files) + for file in to_scan_files: + if self.threaded_scan and self.threaded_scan.stop_scanning(): + self.print_stderr( + "Warning: Aborting fingerprinting as the scanning service is not available." + ) + break + self.print_debug(f"Fingerprinting {file}...") + if spinner: + spinner.next() + wfp = self.winnowing.wfp_for_file(file, file) + if wfp is None or wfp == "": + self.print_debug(f"No WFP returned for {file}. Skipping.") + continue + if save_wfps_for_print: + wfp_list.append(wfp) + file_count += 1 + if self.threaded_scan: + wfp_size = len(wfp.encode("utf-8")) + # If the WFP is bigger than the max post size and we already have something stored in the scan block, add it to the queue # noqa: E501 + if scan_block != "" and (wfp_size + scan_size) >= self.max_post_size: + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = "" + wfp_file_count = 0 + scan_block += wfp + scan_size = len(scan_block.encode("utf-8")) + wfp_file_count += 1 + # If the scan request block (group of WFPs) or larger than the POST size or we have reached the file limit, add it to the queue # noqa: E501 + if ( + wfp_file_count > self.post_file_count + or scan_size >= self.max_post_size + ): + self.threaded_scan.queue_add(scan_block) + queue_size += 1 + scan_block = "" + wfp_file_count = 0 + if ( + not scan_started and queue_size > self.nb_threads + ): # Start scanning if we have something to do + scan_started = True + if not self.threaded_scan.run(wait=False): + self.print_stderr( + "Warning: Some errors encounted while scanning. Results might be incomplete." + ) + success = False + + # End for loop + if self.threaded_scan and scan_block != "": + self.threaded_scan.queue_add( + scan_block + ) # Make sure all files have been submitted + if spinner: + spinner.finish() + + if file_count > 0: + if save_wfps_for_print: # Write a WFP file if no threading is requested + self.print_debug(f"Writing fingerprints to {self.wfp}") + with open(self.wfp, "w") as f: + f.write("".join(wfp_list)) + else: + self.print_debug(f"Skipping writing WFP file {self.wfp}") + if self.threaded_scan: + success = self.__run_scan_threaded(scan_started, file_count) + else: + Scanner.print_stderr( + f"Warning: No files found to scan from: {to_scan_files}" + ) + return success + + def scan_files_with_options( + self, files: [], deps_file: str = None, file_map: dict = None + ) -> bool: + """ + Scan the given list of files for whatever scaning options that have been configured + :param files: list of files to scan + :param deps_file: pre-parsed dependency file to decorate + :param file_map: mapping of obfuscated files back into originals + :return: True if successful, False otherwise + """ + success = True + if not files: + raise Exception("ERROR: Please specify a list of files to scan") + if not self.is_file_or_snippet_scan(): + raise Exception( + f"ERROR: file or snippet scan options have to be set to scan files: {files}" + ) + if self.is_dependency_scan() or deps_file: + raise Exception( + "ERROR: The dependency scan option is currently not supported when scanning a list of files" + ) + if self.scan_output: + self.print_msg(f"Writing results to {self.scan_output}...") + if self.is_file_or_snippet_scan(): + if not self.scan_files(files): + success = False + if self.threaded_scan: + if not self.__finish_scan_threaded(file_map): + success = False + return success + + def scan_contents(self, filename: str, contents: bytes) -> bool: + """ + Scan the given contents as a file + + :param filename: filename to associate with the contents + :param contents: file contents + :return: True if successful, False otherwise + """ + success = True + if not filename: + raise Exception("ERROR: Please specify a filename to scan") + if not contents: + raise Exception("ERROR: Please specify a file contents to scan") + + self.print_debug(f"Fingerprinting {filename}...") + wfp = self.winnowing.wfp_for_contents(filename, False, contents) + if wfp is not None and wfp != "": + if self.threaded_scan: + self.threaded_scan.queue_add(wfp) # Submit the WFP for scanning + self.print_debug(f"Scanning {filename}...") + if self.threaded_scan: + success = self.__run_scan_threaded(False, 1) + else: + success = False + if self.threaded_scan: + if not self.__finish_scan_threaded(): + success = False + return success + + def scan_wfp_file(self, file: str = None) -> bool: # noqa: PLR0912, PLR0915 + """ + Scan the contents of the specified WFP file (in the current process) + :param file: Scan the contents of the specified WFP file (in the current process) + :return: True if successful, False otherwise + """ + success = True + wfp_file = ( + file if file else self.wfp + ) # If a WFP file is specified, use it, otherwise us the default + if not os.path.exists(wfp_file) or not os.path.isfile(wfp_file): + raise Exception( + f"ERROR: Specified WFP file does not exist or is not a file: {wfp_file}" + ) + file_count = Scanner.__count_files_in_wfp_file(wfp_file) + cur_files = 0 + cur_size = 0 + batch_files = 0 + wfp = "" + max_component = {"name": "", "hits": 0} + components = {} + self.print_debug(f"Found {file_count} files to process.") + raw_output = "{\n" + file_print = "" + bar = None + if not self.quiet and self.isatty: + bar = Bar("Scanning", max=file_count) + bar.next(0) + with open(wfp_file) as f: + for line in f: + if line.startswith(WFP_FILE_START): + if file_print: + wfp += file_print # Store the WFP for the current file + cur_size = len(wfp.encode("utf-8")) + file_print = line # Start storing the next file + cur_files += 1 + batch_files += 1 + else: + file_print += line # Store the rest of the WFP for this file + l_size = cur_size + len(file_print.encode("utf-8")) + # Hit the max post size, so sending the current batch and continue processing + if l_size >= self.max_post_size and wfp: + self.print_debug( + f'Sending {batch_files} ({cur_files}) of' + f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the ScanOSS API.' + ) + if self.debug and cur_size > self.max_post_size: + Scanner.print_stderr( + f"Warning: Post size {cur_size} greater than limit {self.max_post_size}" + ) + scan_resp = self.scanoss_api.scan( + wfp, max_component["name"] + ) # Scan current WFP and store + if bar: + bar.next(batch_files) + if scan_resp is not None: + for key, value in scan_resp.items(): + raw_output += ' "%s":%s,' % ( + key, + json.dumps(value, indent=2), + ) + for v in value: + if hasattr(v, "get"): + if v.get("id") != "none": + vcv = "%s:%s:%s" % ( + v.get("vendor"), + v.get("component"), + v.get("version"), + ) + components[vcv] = ( + components[vcv] + 1 + if vcv in components + else 1 + ) + if max_component["hits"] < components[vcv]: + max_component["name"] = v.get("component") + max_component["hits"] = components[vcv] + else: + Scanner.print_stderr(f"Warning: Unknown value: {v}") + else: + success = False + batch_files = 0 + wfp = "" + if file_print: + wfp += file_print # Store the WFP for the current file + if wfp: + self.print_debug( + f'Sending {batch_files} ({cur_files}) of' + f' {file_count} ({len(wfp.encode("utf-8"))} bytes) files to the ScanOSS API.' + ) + scan_resp = self.scanoss_api.scan( + wfp, max_component["name"] + ) # Scan current WFP and store + if bar: + bar.next(batch_files) + first = True + if scan_resp is not None: + for key, value in scan_resp.items(): + if first: + raw_output += ' "%s":%s' % (key, json.dumps(value, indent=2)) + first = False + else: + raw_output += ',\n "%s":%s' % ( + key, + json.dumps(value, indent=2), + ) + else: + success = False + raw_output += "\n}" + if bar: + bar.finish() + if self.output_format == "plain": + self.__log_result(raw_output) + elif self.output_format == "cyclonedx": + cdx = CycloneDx(self.debug, self.scan_output) + cdx.produce_from_str(raw_output) + elif self.output_format == "spdxlite": + spdxlite = SpdxLite(self.debug, self.scan_output) + success = spdxlite.produce_from_str(raw_output) + elif self.output_format == "csv": + csvo = CsvOutput(self.debug, self.scan_output) + csvo.produce_from_str(raw_output) + else: + self.print_stderr(f"ERROR: Unknown output format: {self.output_format}") + success = False + + return success + + def scan_wfp_with_options( + self, wfp: str, deps_file: str, file_map: dict = None + ) -> bool: + """ + Scan the given WFP file for whatever scaning options that have been configured + :param wfp: WFP file to scan + :param deps_file: pre-parsed dependency file to decorate + :param file_map: mapping of obfuscated files back into originals + :return: True if successful, False otherwise + """ + success = True + wfp_file = ( + wfp if wfp else self.wfp + ) # If a WFP file is specified, use it, otherwise us the default + if not os.path.exists(wfp_file) or not os.path.isfile(wfp_file): + raise Exception( + f"ERROR: Specified WFP file does not exist or is not a file: {wfp_file}" + ) + + if not self.is_file_or_snippet_scan() and not self.is_dependency_scan(): + raise Exception(f"ERROR: No scan options defined to scan WFP: {wfp}") + + if self.scan_output: + self.print_msg(f"Writing results to {self.scan_output}...") + if self.is_dependency_scan(): + if not self.threaded_deps.run( + deps_file=deps_file, wait=False + ): # Kick off a background dependency scan + success = False + if self.is_file_or_snippet_scan(): + if not self.scan_wfp_file_threaded(wfp_file): + success = False + if self.threaded_scan: + if not self.__finish_scan_threaded(file_map): + success = False + return success + + def scan_wfp_file_threaded(self, file: str = None) -> bool: + """ + Scan the contents of the specified WFP file (threaded) + :param file: WFP file to scan (optional) + return: True if successful, False otherwise + """ + success = True + wfp_file = ( + file if file else self.wfp + ) # If a WFP file is specified, use it, otherwise us the default + if not os.path.exists(wfp_file) or not os.path.isfile(wfp_file): + raise Exception( + f"ERROR: Specified WFP file does not exist or is not a file: {wfp_file}" + ) + cur_size = 0 + queue_size = 0 + file_count = 0 # count all files fingerprinted + wfp_file_count = 0 # count number of files in each queue post + scan_started = False + wfp = "" + scan_block = "" + with open(wfp_file) as f: # Parse the WFP file + for line in f: + if line.startswith(WFP_FILE_START): + if scan_block: + wfp += scan_block # Store the WFP for the current file + cur_size = len(wfp.encode("utf-8")) + scan_block = line # Start storing the next file + file_count += 1 + wfp_file_count += 1 + else: + scan_block += line # Store the rest of the WFP for this file + l_size = cur_size + len(scan_block.encode("utf-8")) + # Hit the max post size, so sending the current batch and continue processing + if ( + wfp_file_count > self.post_file_count + or l_size >= self.max_post_size + ) and wfp: + if self.debug and cur_size > self.max_post_size: + Scanner.print_stderr( + f"Warning: Post size {cur_size} greater than limit {self.max_post_size}" + ) + self.threaded_scan.queue_add(wfp) + queue_size += 1 + wfp = "" + wfp_file_count = 0 + if ( + not scan_started and queue_size > self.nb_threads + ): # Start scanning if we have something to do + scan_started = True + if not self.threaded_scan.run(wait=False): + self.print_stderr( + "Warning: Some errors uncounted while scanning. Results might be incomplete." + ) + success = False + # End for loop + if scan_block: + wfp += scan_block # Store the WFP for the current file + if wfp: + self.threaded_scan.queue_add(wfp) + queue_size += 1 + + if not self.__run_scan_threaded(scan_started, file_count): + success = False + return success + + def scan_wfp(self, wfp: str) -> bool: + """ + Send the specified (single) WFP to ScanOSS for identification + Parameters + ---------- + wfp: str + Winnowing Fingerprint to scan/identify + """ + success = True + if not wfp: + raise Exception("ERROR: Please specify a WFP to scan") + raw_output = "{\n" + scan_resp = self.scanoss_api.scan(wfp) + if scan_resp is not None: + for key, value in scan_resp.items(): + raw_output += ' "%s":%s' % (key, json.dumps(value, indent=2)) + else: + success = False + raw_output += "\n}" + if self.output_format == "plain": + self.__log_result(raw_output) + elif self.output_format == "cyclonedx": + cdx = CycloneDx(self.debug, self.scan_output) + cdx.produce_from_str(raw_output) + elif self.output_format == "spdxlite": + spdxlite = SpdxLite(self.debug, self.scan_output) + success = spdxlite.produce_from_str(raw_output) + elif self.output_format == "csv": + csvo = CsvOutput(self.debug, self.scan_output) + csvo.produce_from_str(raw_output) + else: + self.print_stderr(f"ERROR: Unknown output format: {self.output_format}") + success = False + + return success + + def wfp_contents(self, filename: str, contents: bytes, wfp_file: str = None): + """ + Fingerprint the specified contents as a file + + :param filename: filename to associate with the contents + :param contents: file contents + :param wfp_file: WFP to write results to (optional) + :return: + """ + if not filename: + raise Exception("ERROR: Please specify a filename to scan") + if not contents: + raise Exception("ERROR: Please specify a file contents to scan") + + self.print_debug(f"Fingerprinting {filename}...") + wfp = self.winnowing.wfp_for_contents(filename, False, contents) + if wfp: + if wfp_file: + self.print_stderr(f"Writing fingerprints to {wfp_file}") + with open(wfp_file, "w") as f: + f.write(wfp) + else: + print(wfp) + else: + Scanner.print_stderr(f"Warning: No fingerprints generated for: {wfp_file}") + + def wfp_file(self, scan_file: str, wfp_file: str = None): + """ + Fingerprint the specified file + """ + if not scan_file: + raise Exception("ERROR: Please specify a file to fingerprint") + if not os.path.exists(scan_file) or not os.path.isfile(scan_file): + raise Exception( + f"ERROR: Specified file does not exist or is not a file: {scan_file}" + ) + + self.print_debug(f"Fingerprinting {scan_file}...") + wfp = self.winnowing.wfp_for_file(scan_file, scan_file) + if wfp: + if wfp_file: + self.print_stderr(f"Writing fingerprints to {wfp_file}") + with open(wfp_file, "w") as f: + f.write(wfp) + else: + print(wfp) + else: + Scanner.print_stderr(f"Warning: No fingerprints generated for: {scan_file}") + + def wfp_folder(self, scan_dir: str, wfp_file: str = None): + """ + Fingerprint the specified folder producing fingerprints + """ + if not scan_dir: + raise Exception("ERROR: Please specify a folder to fingerprint") + if not os.path.exists(scan_dir) or not os.path.isdir(scan_dir): + raise Exception( + f"ERROR: Specified folder does not exist or is not a folder: {scan_dir}" + ) + file_filters = FileFilters( + debug=self.debug, + trace=self.trace, + quiet=self.quiet, + scanoss_settings=self.scan_settings, + all_extensions=self.all_extensions, + all_folders=self.all_folders, + hidden_files_folders=self.hidden_files_folders, + skip_size=self.skip_size, + skip_folders=self.skip_folders, + skip_extensions=self.skip_extensions, + operation_type="scanning", + ) + wfps = "" + self.print_msg(f"Searching {scan_dir} for files to fingerprint...") + spinner = None + if not self.quiet and self.isatty: + spinner = Spinner("Fingerprinting ") + + to_fingerprint_files = file_filters.get_filtered_files_from_folder(scan_dir) + for file in to_fingerprint_files: + if spinner: + spinner.next() + abs_path = Path(scan_dir, file).resolve() + self.print_debug(f"Fingerprinting {file}...") + wfps += self.winnowing.wfp_for_file(str(abs_path), file) + if spinner: + spinner.finish() + if wfps: + if wfp_file: + self.print_stderr(f"Writing fingerprints to {wfp_file}") + with open(wfp_file, "w") as f: + f.write(wfps) + else: + print(wfps) + else: + Scanner.print_stderr( + f"Warning: No files found to fingerprint in folder: {scan_dir}" + ) + + +# +# End of ScanOSS Class +# diff --git a/scanoss.json b/scanoss.json index 62f0080..d4c8553 100755 --- a/scanoss.json +++ b/scanoss.json @@ -8,8 +8,9 @@ "bom": { "remove": [ { - "path": "copyright.c", - "purl": "pkg:github/scanoss/engine" + "path": "scanner_test.py", + "purl": "pkg:github/scanoss/scanoss.py", + "comment": "this is not scanoss.py" } ] } diff --git a/scanoss_testing_2.json b/scanoss_testing_2.json new file mode 100755 index 0000000..74776b4 --- /dev/null +++ b/scanoss_testing_2.json @@ -0,0 +1,9 @@ +{ + "settings": { + "skip": { + "patterns": {}, + "sizes": {} + } + }, + "bom": {} +}