Code: Select all
"""
icpg.py
Interesting Chess Position Generator
Read games and analyze positions with engine and save interesting positions
based on user defined criteria via score thresholds against engine bestscore1
and bestscore2 from mulitpv 2 analysis results.
Requirements:
python 3
python-chess v0.26.0
Analysis engine that supports multipv and movetime
Dev log:
v0.4 beta
* Added --positional flag, used to generate positions with low score
difference between bestscore1 and bestscore2
* Parse the game directly instead of saving it to memory first. This would
start the analysis early specially for big pgn files.
* When flag --pin is set and there is pinned piece of the side not to move,
save it to interesting.epd only if the pinned piece is not a pawn.
Python-chess currently is not detecting all pinned pawns properly.
v0.3 beta
* Added --pin flag to save only interesting positions when a piece of
not stm is pinned.
* minscorediffcheck is no longer an option but is calculated as:
minscorediffcheck = minbest1score3 - maxbest2score3
v0.2 beta
* Added --log flag to enable logging
* Relocate engine initialization to the main() was in analyze_game(), this
would avoid Lc0 engine from duplicating in memory. Lc0 would not quit
from uci quit command after analyzing a game.
* When analyzing engine is Lc0, set SmartPruningFactor to 0, this would
avoid pruning the analysis time.
* Added --skipdraw flag to skip games with draw results
v0.1 beta
* Parse moves in the game in reverse.
* Exit search early when stm score is way below minimum score threshold
* Save interesting positions depending on the engine multipv scores and
user defined score thresholds
"""
import argparse
import logging
import chess.pgn
import chess.engine
VERSION = 'v0.4 beta'
def interesting_pos(board, bs1, bs2, mib1s1, mib1s2, mib1s3, mab2s1, mab2s2, mab2s3):
"""
board: board position
bs1: bestscore1 from multipv 1
bs2: bestscore2 from multipv 2
mib1s1: minimum best score1, threshold 1
mib1s2: minimum score2, threshold 2
mib1s3: minimum score3, threshold 3
mib1s1 > mib1s2 > mib1s3
mab2s1: maximum best score2, threshold 1
mab2s2: maximum best score2, threshold 2
mab2s3: maximum best score2, threshold 3
mab2s1 > mab2s2 > mab2s3
"""
if bs1 >= mib1s1:
# mate score
if bs1 >= 30000 and bs2 <= 2*mab2s1:
return True
if bs2 <= mab2s1:
return True
elif bs1 >= mib1s2:
if bs2 <= mab2s2:
return True
elif bs1 >= mib1s3:
if bs2 <= mab2s3:
return True
print('Not an interesting pos: {}'.format(board.fen()))
return False
def positional_pos(board, bs1, bs2, mib1s1, mib1s2, mib1s3, mab2s1, mab2s2, mab2s3):
"""
* The engine bestscore1 is not winning and bestscore2 is not lossing
* The score gap between bestscore1 and bestcore2 is small
"""
if bs1 >= mib1s1 and bs1 <= mib1s1 + 50:
if bs2 <= mab2s1 and bs2 >= mab2s1 - 50:
return True
if bs1 >= mib1s2 and bs1 <= mib1s1:
if bs2 <= mab2s2 and bs2 >= mab2s2 - 50:
return True
if bs1 >= mib1s3 and bs1 <= mib1s2:
if bs2 <= mab2s3 and bs2 >= mab2s3 - 50:
return True
print('Not positional: {}'.format(board.fen()))
return False
def abs_pinned(board, color):
""" Returns true if one or more pieces of color color is pinned """
for sq in chess.SQUARES:
if board.is_pinned(color, sq) \
and board.piece_at(sq) != chess.Piece(chess.PAWN, color):
return True
return False
def analyze_game(game, engine, enginefn, hash_val, thread_val,
analysis_start_move_num,
outepdfn, gcnt, engname, mintime=1.0, maxtime=2.0,
minscorediffcheck=25, minbest1score1=2000,
minbest1score2=1000, minbest1score3=500,
maxbest2score1=300, maxbest2score2=200,
maxbest2score3=100, weightsfile=None, skipdraw=True,
pin=False, positional=False):
""" """
limit = chess.engine.Limit(time=maxtime)
# Copy orig game header to our epd output
ev = game.headers['Event']
si = game.headers['Site']
da = game.headers['Date']
ro = game.headers['Round']
wp = game.headers['White']
bp = game.headers['Black']
res = game.headers['Result']
# If result of this game is a draw and skipdraw is true, we skip it
if skipdraw and res == '1/2-1/2':
return
c0_val = wp + ' - ' + bp + ', ' + ev + ', ' + si + ', ' + da + ', R' + ro
poscnt = 0
# Parse move in reverse
game_end = game.end()
curboard = game_end.board()
while curboard:
board = curboard
fmvn = board.fullmove_number
stm = board.turn
if fmvn == 1 and stm == chess.WHITE:
print('startpos')
break
if fmvn < analysis_start_move_num:
print('move start limit is reached, exit from this game')
break
g_move = board.pop()
curboard = board
# Print the fen before g_move is made on the board
poscnt += 1
print()
print('game {} / position {}'.format(gcnt, poscnt))
print(board.fen())
print(board)
print('game move: {}\n'.format(curboard.san(g_move)))
# Skip this position if --pin is set and no one of the not stm piece is pinned
if pin and not abs_pinned(board, board.turn ^ 1):
print('Skip this position no one of the pieces of the not stm is pinned')
print(board.fen())
print()
continue
# If side to move is in check, skip this position
if board.is_check():
print()
print(board.fen())
print('Skip this position, stm is in check\n')
continue
# Run engine in multipv 2
print('{} is searching at multipv {} ...'.format(engname, 2))
bm1, bm2, depth = None, None, None
raw_pv = None
bestmovechanges = 0 # Start comparing bestmove1 at depth 4
tmpmove, oldtmpmove = None, None
with engine.analysis(board, limit, multipv=2) as analysis:
for info in analysis:
try:
multipv = info['multipv']
depth = info['depth']
if info['score'].is_mate():
s = info['score'].relative.score(mate_score=32000)
else:
s = info['score'].relative.score()
pv = info['pv'][0:5]
t = info['time']
if multipv == 1:
bm1 = pv[0]
bs1 = s
raw_pv = pv
# Exit early if score is below half of minbest1score3
if t >= mintime and bs1 < minbest1score3/2:
print('Exit search early, current best score is only {} and it is still below half of minbest1score3 or {}/2={}'.format(
bs1, minbest1score3, minbest1score3//2))
break
# Record bestmove move changes to determine position complexity
if 'depth' in info and 'pv' in info \
and 'score' in info \
and not 'lowerbound' in info \
and not 'upperbound' in info \
and depth >= 4:
tmpmove = info['pv'][0]
if tmpmove is not None and tmpmove != oldtmpmove:
assert oldtmpmove is not None, 'oldtmp move is None at depth {}'.format(depth)
bestmovechanges += 1
elif multipv == 2:
bm2 = pv[0]
bs2 = s
# Save analysis time by exiting it if score difference
# between bestscore1 and bestcore2 is way below the
# minimum score difference based from user defined
# score thresholds
if t >= mintime and bs1 - bs2 < minscorediffcheck:
print('Exit search early, scorediff of {} is below minscorediff of {}'.format(
bs1 - bs2, minscorediffcheck))
break
oldtmpmove = tmpmove
except:
pass
print('Search is done!!'.format(engname))
print('game move : {}'.format(g_move))
print('complexity : {}'.format(bestmovechanges))
print('best move 1 : {}, best score 1: {}'.format(bm1, bs1))
print('best move 2 : {}, best score 2: {}'.format(bm2, bs2))
print('scorediff : {}'.format(bs1 - bs2))
# Don't save positions if score is already bad
if bs1 < minbest1score3:
print('Skip this position, score {} is below minbest1score3 of {}'.format(bs1, minbest1score3))
continue
# If complexity is 1 or less and if bestmove1 is a capture, skip this position
if board.is_capture(bm1) and bestmovechanges <= 1:
print('Skip this position, bm1 is a capture and position complexity is below 2')
continue
if bs1 - bs2 < minbest1score3 - maxbest2score3:
print('Skip this position, actual min score diff of {} is below user defined min score diff of {}'.format(
bs1 - bs2, minbest1score3 - maxbest2score3))
continue
# Save epd if criteria is satisfied
is_save = False
if positional:
if positional_pos(board, bs1, bs2, minbest1score1, minbest1score2,
minbest1score3, maxbest2score1, maxbest2score2,
maxbest2score3):
is_save = True
else:
if interesting_pos(board, bs1, bs2, minbest1score1, minbest1score2,
minbest1score3, maxbest2score1, maxbest2score2,
maxbest2score3):
is_save = True
if is_save:
print('Save this position!!')
ae_oper = 'Analyzing engine: ' + engname
complexity_oper = 'Complexity: ' + str(bestmovechanges)
bs2_oper = 'bestscore2: ' + str(bs2)
new_epd = board.epd(
bm = bm1,
ce = bs1,
sm = g_move,
acd = depth,
acs = int(t),
fmvn = board.fullmove_number,
hmvc = board.halfmove_clock,
pv = raw_pv,
c0 = c0_val,
c1 = complexity_oper,
c2 = bs2_oper,
c3 = ae_oper)
print(new_epd)
with open(outepdfn, 'a') as f:
f.write('{}\n'.format(new_epd))
def main():
parser = argparse.ArgumentParser(prog='Interesting Chess Position Generator {}'.format(VERSION),
description='Generates interesting position using engine and ' +
'some criteria', epilog='%(prog)s')
parser.add_argument('-i', '--inpgn', help='input pgn file',
required=True)
parser.add_argument('-o', '--outepd', help='output epd file, default=interesting.epd',
default='interesting.epd', required=False)
parser.add_argument('-e', '--engine', help='engine file or path',
required=True)
parser.add_argument('-t', '--threads', help='engine threads (default=1)',
default=1, type=int, required=False)
parser.add_argument('-a', '--hash', help='engine hash in MB (default=128)',
default=128, type=int, required=False)
parser.add_argument('-w', '--weight', help='weight file for NN engine',
required=False)
parser.add_argument('-n', '--mintime', help='analysis minimum time in sec (default=2.0)',
default=2.0, type=float, required=False)
parser.add_argument('-x', '--maxtime', help='analysis maximum time in sec (default=10.0)',
default=10.0, type=float, required=False)
parser.add_argument('--skipdraw', help='a flag to skip games with draw results',
action='store_true')
parser.add_argument('--log', help='a flag to save logs in a file',
action='store_true')
parser.add_argument('--pin', help='a flag when enabled will only save interesting' +
'position if not stm piece is pinned', action='store_true')
parser.add_argument('--positional', help='a flag to save positional positions',
action='store_true')
args = parser.parse_args()
pgnfn = args.inpgn
outepdfn = args.outepd
thread_val = args.threads
hash_val = args.hash
enginefn = args.engine
weightsfile = args.weight
mintime = args.mintime
maxtime = args.maxtime
skipdraw = args.skipdraw
pin = args.pin
positional = args.positional
print(positional)
start_move = 16 # Stop the analysis when this move no. is reached
# Adjust score thresholds to save interesting positions
# (1) Positional score threshold, if flag --positional is set
if positional:
minbest1score1 = 100 # cp, stm is winning
minbest1score2 = 50 # cp, stm has decisive advantage
minbest1score3 = 0 # cp, stm has moderate
maxbest2score1 = 50 # cp, stm 2nd top move max score threshold 1
maxbest2score2 = 0 # cp, stm 2nd top move max score threshold 2
maxbest2score3 = -50 # cp, stm 2nd top move max score threshold 3
minscorediffcheck = minbest1score3 - maxbest2score3
else:
minbest1score1 = 1000 # cp, stm is winning
minbest1score2 = 600 # cp, stm has decisive advantage
minbest1score3 = 200 # cp, stm has moderate
maxbest2score1 = 300 # cp, stm 2nd top move max score threshold 1
maxbest2score2 = 200 # cp, stm 2nd top move max score threshold 2
maxbest2score3 = 100 # cp, stm 2nd top move max score threshold 3
minscorediffcheck = minbest1score3 - maxbest2score3
print('pgn file: {}\n'.format(pgnfn))
print('Conditions:')
print('mininum time : {}s'.format(mintime))
print('maximum time : {}s'.format(maxtime))
print('mininum score diff check : {}'.format(minscorediffcheck))
print('mininum best 1 score 1 : {}'.format(minbest1score1))
print('mininum best 1 score 2 : {}'.format(minbest1score2))
print('mininum best 1 score 3 : {}'.format(minbest1score3))
print('maximum best 2 score 1 : {}'.format(maxbest2score1))
print('maximum best 2 score 2 : {}'.format(maxbest2score2))
print('maximum best 2 score 3 : {}'.format(maxbest2score3))
print('stm is not in check : {}'.format('Yes'))
print('stop analysis move number : {}'.format(start_move))
# Define analyzing engine
engine = chess.engine.SimpleEngine.popen_uci(enginefn)
engname = engine.id['name']
if args.log:
logfn = '_'.join(engname.split()) + '_icpg_log.txt'
logging.basicConfig(level=logging.DEBUG, filename=logfn,
filemode='w', format='%(asctime)s [%(levelname)s] %(message)s')
# Set Lc0 SmartPruningFactor to 0 to avoid analysis time pruning
if 'lc0' in engname.lower():
try:
engine.configure({"SmartPruningFactor": 0})
except:
pass
else:
try:
engine.configure({"Hash": hash_val})
except:
pass
try:
engine.configure({"Threads": thread_val})
except:
pass
# For NN engine that uses uci option WeightsFile similar to Lc0
if weightsfile is not None:
try:
engine.configure({"WeightsFile": weightsfile})
except:
pass
# Read pgn file and analyze positions in the game
gcnt = 0
with open(pgnfn, 'r') as pgn:
game = chess.pgn.read_game(pgn)
while game:
gcnt += 1
analyze_game(game,
engine,
enginefn,
hash_val,
thread_val,
start_move,
outepdfn,
gcnt,
engname,
mintime=mintime,
maxtime=maxtime,
minscorediffcheck=minscorediffcheck,
minbest1score1=minbest1score1,
minbest1score2=minbest1score2,
minbest1score3=minbest1score3,
maxbest2score1=maxbest2score1,
maxbest2score2=maxbest2score2,
maxbest2score3=maxbest2score3,
weightsfile=weightsfile,
skipdraw=skipdraw,
pin=pin,
positional=positional)
game = chess.pgn.read_game(pgn)
engine.quit()
if __name__ == '__main__':
main()
This has complexity of 4. Generally when the game between bestscore1 and bestscore2 is low, the engine would change its mind often. This also requires a higher maximum time. I use 30s for this case.
[d]r5k1/p4ppp/bpp1p3/2PR4/5P2/PN6/1P2PP1P/R5K1 b - - bm cxd5; ce 14; sm exd5; acd 31; acs 30; fmvn 18; hmvc 0; pv cxd5 cxb6 axb6 Nd4 Kf8; c0 "Moroni, Luca Jr - Wei, Yi, Aeroflot Open A 2019, Moscow RUS, 2019.02.20, R1.1"; c1 "Complexity: 4"; c2 "bestscore2: -67"; c3 "Analyzing engine: Stockfish 10 64 POPCNT";
With 0 complexity.
[d]3r2r1/3q1pk1/3p2pp/p1pP3n/1pBbP2P/5Q2/PP2RPPB/3R2K1 w - - bm e5; ce 76; sm Rxd4; acd 26; acs 30; fmvn 29; hmvc 11; pv e5; c0 "Zoler, Dan - Abergel, Thal, TCh-ISR 2019, Israel ISR, 2019.01.04, R1.1"; c1 "Complexity: 0"; c2 "bestscore2: -4"; c3 "Analyzing engine: Stockfish 10 64 POPCNT";
This one looks like tactical. Complexity is 2.
[d]3r4/2q2pk1/1p2n1pp/p2RP3/5P2/P5P1/1P5P/3Q1BK1 w - - bm Rxd8; ce 132; sm Rxd8; acd 29; acs 30; fmvn 38; hmvc 1; pv Rxd8 Nxd8 Qd5 Ne6 Bc4; c0 "Fedoseev, Vladimir3 - Petrosyan, Manuel, Aeroflot Open A 2019, Moscow RUS, 2019.02.20, R1.2"; c1 "Complexity: 2"; c2 "bestscore2: 16"; c3 "Analyzing engine: Stockfish 10 64 POPCNT";